]> mj.ucw.cz Git - home-hw.git/blob - telegram/burrow-telegram.py
608fd119466ec59e3fe8e0754f6781f3cf2f626c
[home-hw.git] / telegram / burrow-telegram.py
1 #!/usr/bin/env python
2 # A simple daemon for sending Telegram notifications about failures in the Burrow
3 # (c) 2022 Martin Mareš <mj@ucw.cz>
4
5 from aiogram import Bot, Dispatcher, executor, types
6 import asyncio
7 import asyncio_mqtt
8 from configparser import ConfigParser
9 from datetime import datetime, timedelta
10 import logging
11 import signal
12 import ssl
13 import sys
14
15 config = ConfigParser()
16 config.read('/usr/local/etc/burrow-telegram')
17 API_TOKEN = config['telegram']['api_token']
18 CHATS = map(int, config['telegram']['chats'].split(' '))
19
20 formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
21 log_handler = logging.StreamHandler(stream=sys.stdout)
22 log_handler.setFormatter(formatter)
23 logger = logging.getLogger()
24 logger.setLevel(logging.INFO)
25 logger.addHandler(log_handler)
26
27 bot = Bot(token=API_TOKEN)
28 dispatcher = Dispatcher(bot)
29
30
31 @dispatcher.message_handler(commands=['start', 'help'])
32 async def send_welcome(message: types.Message):
33     logger.info(f'Start from {message.chat}')
34     await message.reply("Brum!\nI'm BurrowBot!\n")
35
36
37 @dispatcher.message_handler()
38 async def echo(message: types.Message):
39     print(message)
40     # await message.answer(message.text)
41
42
43 BUCKET_CAPACITY = 5
44 BUCKET_REFRESH = timedelta(seconds=10)
45 bucket = BUCKET_CAPACITY
46 bucket_last_fill = datetime.now()
47
48
49 async def send_msg(text):
50     global bucket, bucket_last_fill
51     logger.info(f'Sending message: {text}')
52
53     if bucket < BUCKET_CAPACITY:
54         now = datetime.now()
55         while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
56             bucket_last_fill += BUCKET_REFRESH
57             bucket += 1
58             logger.debug(f'Bucket refill: {bucket}')
59
60     if bucket > 0:
61         bucket -= 1
62         logger.debug(f'Bucket drain: {bucket}')
63
64         for chat in CHATS:
65             await bot.send_message(chat, text + '\n')
66
67     else:
68         logger.info('Bucket empty :(')
69
70
71 hyst_state = {}
72
73 def hysteresis(key, value, low, high):
74     if key in hyst_state:
75         old_state = hyst_state[key]
76     else:
77         old_state = 0
78     if value is None:
79         new_state = 0
80     elif old_state <= 0:
81         if value >= high:
82             new_state = 1
83         else:
84             new_state = -1
85     else:
86         if value <= low:
87             new_state = -1
88         else:
89             new_state = 1
90     hyst_state[key] = new_state
91     return new_state
92
93
94 last_temp_state = 1
95 last_boiler_err = 0
96
97
98 async def mqtt_process_msg(topic, val):
99     global last_temp_state, last_temp_warning
100     global last_boiler_err
101     now = datetime.now()
102
103     logger.debug(f'MQTT: {topic} -> {val}')
104
105     if topic == 'burrow/temp/catarium':
106         temp, when = map(float, val.split(' '))
107         if when < now.timestamp() - 3600:
108             temp_state = 0
109         else:
110             temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
111         if temp_state != last_temp_state:
112             last_temp_state = temp_state
113             if temp_state == 0:
114                 await send_msg('Teplotní čidlo v pracovně nefunguje.')
115             else:
116                 await send_msg(f'Teplota v pracovně: {temp} °C')
117
118     if topic == 'burrow/heating/error':
119         err = int(val.split(' ')[0])
120         if err != last_boiler_err:
121             last_boiler_err = err
122             await send_msg(f'Chyba kotle: {err}')
123
124
125 async def mqtt_loop():
126     sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
127     sctx.verify_mode = ssl.CERT_REQUIRED
128     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
129     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
130
131     async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
132         async with mqtt.unfiltered_messages() as messages:
133             await mqtt.subscribe("burrow/heating/#")
134             await mqtt.subscribe("burrow/temp/#")
135             async for msg in messages:
136                 await mqtt_process_msg(msg.topic, msg.payload.decode())
137
138
139 async def mqtt_watcher():
140     while True:
141         try:
142             logger.info("Starting MQTT")
143             await mqtt_loop()
144         except asyncio_mqtt.MqttError as error:
145             logger.error(f"MQTT error: {error}")
146         finally:
147             await asyncio.sleep(10)
148
149
150 async def fortunes():
151     await asyncio.sleep(5*60)
152     while True:
153         proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
154         out, err = await proc.communicate()
155         if proc.returncode == 0:
156             await send_msg(out.decode())
157         else:
158             logger.error(f'fortune failed with return code {proc.returncode}')
159         await asyncio.sleep(24*60*60)
160
161
162 async def main():
163     loop = asyncio.get_event_loop()
164     t1 = loop.create_task(mqtt_watcher())
165     t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
166     t3 = loop.create_task(fortunes())
167     await asyncio.wait((t1, t2, t3))
168
169
170 asyncio.run(main())