]> mj.ucw.cz Git - home-hw.git/blob - telegram/burrow-telegram.py
Rainbow case: TODO
[home-hw.git] / telegram / burrow-telegram.py
1 #!/usr/bin/env python
2 # A simple daemon for sending Telegram notifications about failures in the Burrow
3 # (c) 2022 Martin Mareš <mj@ucw.cz>
4
5 from aiogram import Bot, Dispatcher, executor, types
6 import asyncio
7 import aiomqtt
8 from configparser import ConfigParser
9 from datetime import datetime, timedelta
10 import logging
11 from logging.handlers import SysLogHandler
12 import signal
13 import ssl
14 import sys
15
16 config = ConfigParser()
17 config.read('/usr/local/etc/burrow-telegram')
18 API_TOKEN = config['telegram']['api_token']
19 CHATS = list(map(int, config['telegram']['chats'].split(' ')))
20 USE_SYSLOG = True
21
22 if USE_SYSLOG:
23     formatter = logging.Formatter(fmt="%(message)s")        # systemd will handle the rest
24     log_handler = SysLogHandler('/dev/log', facility=SysLogHandler.LOG_LOCAL1)
25     log_handler.ident = 'burrow-telegram: '
26 else:
27     formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
28     log_handler = logging.StreamHandler(stream=sys.stdout)
29 log_handler.setFormatter(formatter)
30 logger = logging.getLogger()
31 logger.setLevel(logging.INFO)
32 logger.addHandler(log_handler)
33
34 bot = Bot(token=API_TOKEN)
35 dispatcher = Dispatcher(bot)
36
37
38 @dispatcher.message_handler(commands=['start', 'help'])
39 async def send_welcome(message: types.Message):
40     logger.info(f'Start from {message.chat}')
41     await message.reply("Brum!\nI'm BurrowBot!\n")
42
43
44 @dispatcher.message_handler()
45 async def echo(message: types.Message):
46     print(message)
47     if message.text.startswith("xyzzy"):
48         await send_msg("Nothing happens.")
49     # await message.answer(message.text)
50
51
52 BUCKET_CAPACITY = 5
53 BUCKET_REFRESH = timedelta(seconds=10)
54 bucket = BUCKET_CAPACITY
55 bucket_last_fill = datetime.now()
56
57
58 async def send_msg(text):
59     global bucket, bucket_last_fill
60     logger.info(f'Sending message: {text}')
61
62     if bucket < BUCKET_CAPACITY:
63         now = datetime.now()
64         while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
65             bucket_last_fill += BUCKET_REFRESH
66             bucket += 1
67             logger.debug(f'Bucket refill: {bucket}')
68
69     if bucket > 0:
70         bucket -= 1
71         logger.debug(f'Bucket drain: {bucket}')
72
73         for chat in CHATS:
74             await bot.send_message(chat, text + '\n')
75
76     else:
77         logger.info('Bucket empty :(')
78
79
80 hyst_state = {}
81
82 def hysteresis(key, value, low, high):
83     if key in hyst_state:
84         old_state = hyst_state[key]
85     else:
86         old_state = 0
87     if value is None:
88         new_state = 0
89     elif old_state <= 0:
90         if value >= high:
91             new_state = 1
92         else:
93             new_state = -1
94     else:
95         if value <= low:
96             new_state = -1
97         else:
98             new_state = 1
99     hyst_state[key] = new_state
100     return new_state
101
102
103 last_temp_state = 1
104 last_boiler_err = 0
105
106
107 async def mqtt_process_msg(topic, val):
108     global last_temp_state, last_temp_warning
109     global last_boiler_err
110     now = datetime.now()
111
112     logger.debug(f'MQTT: {topic} -> {val}')
113
114     if topic == 'burrow/temp/catarium':
115         temp, when = map(float, val.split(' '))
116         if when < now.timestamp() - 3600:
117             temp_state = 0
118         else:
119             temp_state = hysteresis('catarium-temp', temp, 22, 23)
120         if temp_state != last_temp_state:
121             last_temp_state = temp_state
122             if temp_state == 0:
123                 await send_msg('Teplotní čidlo v pracovně nefunguje.')
124             else:
125                 await send_msg(f'Teplota v pracovně: {temp} °C')
126
127     if topic == 'burrow/heating/error':
128         err = int(val.split(' ')[0])
129         if err != last_boiler_err:
130             last_boiler_err = err
131             await send_msg(f'Chyba kotle: {err}')
132
133
134 async def mqtt_loop():
135     sctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
136     sctx.verify_mode = ssl.CERT_REQUIRED
137     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
138     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
139
140     mqtt = aiomqtt.Client(client_id='telegram', hostname="burrow-mqtt", port=8883, tls_context=sctx)
141     await mqtt.connect()
142
143     async with mqtt.messages() as messages:
144         await mqtt.subscribe("burrow/heating/#")
145         await mqtt.subscribe("burrow/temp/#")
146         async for msg in messages:
147             await mqtt_process_msg(msg.topic.value, msg.payload.decode())
148
149
150 async def mqtt_watcher():
151     while True:
152         try:
153             logger.info("Starting MQTT")
154             await mqtt_loop()
155         except aiomqtt.MqttError as error:
156             logger.error(f"MQTT error: {error}")
157         await asyncio.sleep(10)
158
159
160 async def fortunes():
161     await asyncio.sleep(5*60)
162     while True:
163         proc = await asyncio.create_subprocess_exec('/usr/games/fortune', stdout=asyncio.subprocess.PIPE)
164         out, err = await proc.communicate()
165         if proc.returncode == 0:
166             await send_msg(out.decode())
167         else:
168             logger.error(f'fortune failed with return code {proc.returncode}')
169         await asyncio.sleep(24*60*60)
170
171
172 async def main():
173     loop = asyncio.get_event_loop()
174     coros = [
175         loop.create_task(mqtt_watcher()),
176         loop.create_task(dispatcher.start_polling(timeout=60, relax=0.01, fast=True, allowed_updates=None)),
177         loop.create_task(fortunes()),
178     ]
179     for coro in asyncio.as_completed(coros):
180         done = await coro
181         done.result()       # The coroutine probably died of an exception, which is raised here.
182
183
184 asyncio.run(main())