]> mj.ucw.cz Git - home-hw.git/blob - telegram/burrow-telegram.py
11de9290d577d85ed62bd6266a57b0051fef8b76
[home-hw.git] / telegram / burrow-telegram.py
1 #!/usr/bin/env python
2 # A simple daemon for sending Telegram notifications about failures in the Burrow
3 # (c) 2022 Martin Mareš <mj@ucw.cz>
4
5 from aiogram import Bot, Dispatcher, executor, types
6 import asyncio
7 import asyncio_mqtt
8 from configparser import ConfigParser
9 from datetime import datetime, timedelta
10 import logging
11 import signal
12 import ssl
13 import sys
14
15 config = ConfigParser()
16 config.read('/usr/local/etc/burrow-telegram')
17 API_TOKEN = config['telegram']['api_token']
18 CHATS = list(map(int, config['telegram']['chats'].split(' ')))
19
20 # formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
21 formatter = logging.Formatter(fmt="%(message)s")        # systemd will handle the rest
22 log_handler = logging.StreamHandler(stream=sys.stdout)
23 log_handler.setFormatter(formatter)
24 logger = logging.getLogger()
25 logger.setLevel(logging.INFO)
26 logger.addHandler(log_handler)
27
28 bot = Bot(token=API_TOKEN)
29 dispatcher = Dispatcher(bot)
30
31
32 @dispatcher.message_handler(commands=['start', 'help'])
33 async def send_welcome(message: types.Message):
34     logger.info(f'Start from {message.chat}')
35     await message.reply("Brum!\nI'm BurrowBot!\n")
36
37
38 @dispatcher.message_handler()
39 async def echo(message: types.Message):
40     print(message)
41     if message.text.startswith("xyzzy"):
42         await send_msg("Nothing happens.")
43     # await message.answer(message.text)
44
45
46 BUCKET_CAPACITY = 5
47 BUCKET_REFRESH = timedelta(seconds=10)
48 bucket = BUCKET_CAPACITY
49 bucket_last_fill = datetime.now()
50
51
52 async def send_msg(text):
53     global bucket, bucket_last_fill
54     logger.info(f'Sending message: {text}')
55
56     if bucket < BUCKET_CAPACITY:
57         now = datetime.now()
58         while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
59             bucket_last_fill += BUCKET_REFRESH
60             bucket += 1
61             logger.debug(f'Bucket refill: {bucket}')
62
63     if bucket > 0:
64         bucket -= 1
65         logger.debug(f'Bucket drain: {bucket}')
66
67         for chat in CHATS:
68             await bot.send_message(chat, text + '\n')
69
70     else:
71         logger.info('Bucket empty :(')
72
73
74 hyst_state = {}
75
76 def hysteresis(key, value, low, high):
77     if key in hyst_state:
78         old_state = hyst_state[key]
79     else:
80         old_state = 0
81     if value is None:
82         new_state = 0
83     elif old_state <= 0:
84         if value >= high:
85             new_state = 1
86         else:
87             new_state = -1
88     else:
89         if value <= low:
90             new_state = -1
91         else:
92             new_state = 1
93     hyst_state[key] = new_state
94     return new_state
95
96
97 last_temp_state = 1
98 last_boiler_err = 0
99
100
101 async def mqtt_process_msg(topic, val):
102     global last_temp_state, last_temp_warning
103     global last_boiler_err
104     now = datetime.now()
105
106     logger.debug(f'MQTT: {topic} -> {val}')
107
108     if topic == 'burrow/temp/catarium':
109         temp, when = map(float, val.split(' '))
110         if when < now.timestamp() - 3600:
111             temp_state = 0
112         else:
113             temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
114         if temp_state != last_temp_state:
115             last_temp_state = temp_state
116             if temp_state == 0:
117                 await send_msg('Teplotní čidlo v pracovně nefunguje.')
118             else:
119                 await send_msg(f'Teplota v pracovně: {temp} °C')
120
121     if topic == 'burrow/heating/error':
122         err = int(val.split(' ')[0])
123         if err != last_boiler_err:
124             last_boiler_err = err
125             await send_msg(f'Chyba kotle: {err}')
126
127
128 async def mqtt_loop():
129     sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
130     sctx.verify_mode = ssl.CERT_REQUIRED
131     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
132     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
133
134     async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
135         async with mqtt.unfiltered_messages() as messages:
136             await mqtt.subscribe("burrow/heating/#")
137             await mqtt.subscribe("burrow/temp/#")
138             async for msg in messages:
139                 await mqtt_process_msg(msg.topic, msg.payload.decode())
140
141
142 async def mqtt_watcher():
143     while True:
144         try:
145             logger.info("Starting MQTT")
146             await mqtt_loop()
147         except asyncio_mqtt.MqttError as error:
148             logger.error(f"MQTT error: {error}")
149         await asyncio.sleep(10)
150
151
152 async def fortunes():
153     await asyncio.sleep(5*60)
154     while True:
155         proc = await asyncio.create_subprocess_exec('/usr/games/fortune', stdout=asyncio.subprocess.PIPE)
156         out, err = await proc.communicate()
157         if proc.returncode == 0:
158             await send_msg(out.decode())
159         else:
160             logger.error(f'fortune failed with return code {proc.returncode}')
161         await asyncio.sleep(24*60*60)
162
163
164 async def main():
165     loop = asyncio.get_event_loop()
166     coros = [
167         loop.create_task(mqtt_watcher()),
168         loop.create_task(dispatcher.start_polling(timeout=60, relax=0.01, fast=True, allowed_updates=None)),
169         loop.create_task(fortunes()),
170     ]
171     for coro in asyncio.as_completed(coros):
172         done = await coro
173         done.result()       # The coroutine probably died of an exception, which is raised here.
174
175
176 asyncio.run(main())