]> mj.ucw.cz Git - home-hw.git/blob - telegram/test.py
Telegram: A bit more robust
[home-hw.git] / telegram / test.py
1 #!/usr/bin/env python
2
3 from aiogram import Bot, Dispatcher, executor, types
4 import asyncio
5 import asyncio_mqtt
6 from datetime import datetime, timedelta
7 import logging
8 import signal
9 import ssl
10 import sys
11
12 import config
13
14 formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
15 log_handler = logging.StreamHandler(stream=sys.stdout)
16 log_handler.setFormatter(formatter)
17 logger = logging.getLogger()
18 logger.setLevel(logging.DEBUG)
19 logger.addHandler(log_handler)
20
21 bot = Bot(token=config.API_TOKEN)
22 dispatcher = Dispatcher(bot)
23
24
25 @dispatcher.message_handler(commands=['start', 'help'])
26 async def send_welcome(message: types.Message):
27     logger.info(f'Start from {message.chat}')
28     await message.reply("Brum!\nI'm BurrowBot!\n")
29
30
31 @dispatcher.message_handler()
32 async def echo(message: types.Message):
33     print(message)
34     # await message.answer(message.text)
35
36
37 BUCKET_CAPACITY = 5
38 BUCKET_REFRESH = timedelta(seconds=10)
39 bucket = BUCKET_CAPACITY
40 bucket_last_fill = datetime.now()
41
42
43 async def send_msg(text):
44     global bucket, bucket_last_fill
45     logger.info(f'Sending message: {text}')
46
47     if bucket < BUCKET_CAPACITY:
48         now = datetime.now()
49         while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
50             bucket_last_fill += BUCKET_REFRESH
51             bucket += 1
52             logger.debug(f'Bucket refill: {bucket}')
53
54     if bucket > 0:
55         bucket -= 1
56         logger.debug(f'Bucket drain: {bucket}')
57
58         for chat in config.CHATS:
59             await bot.send_message(chat, text + '\n')
60
61     else:
62         logger.info('Bucket empty :(')
63
64
65 hyst_state = {}
66
67 def hysteresis(key, value, low, high):
68     if key in hyst_state:
69         old_state = hyst_state[key]
70     else:
71         old_state = 0
72     if value is None:
73         new_state = 0
74     elif old_state <= 0:
75         if value >= high:
76             new_state = 1
77         else:
78             new_state = -1
79     else:
80         if value <= low:
81             new_state = -1
82         else:
83             new_state = 1
84     hyst_state[key] = new_state
85     return new_state
86
87
88 last_temp_state = 1
89 last_boiler_err = 0
90
91
92 async def mqtt_process_msg(topic, val):
93     global last_temp_state, last_temp_warning
94     global last_boiler_err
95     now = datetime.now()
96
97     print(topic, '->', val)
98
99     if topic == 'burrow/temp/catarium':
100         temp, when = map(float, val.split(' '))
101         if when < now.timestamp() - 3600:
102             temp_state = 0
103         else:
104             temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
105         if temp_state != last_temp_state:
106             last_temp_state = temp_state
107             if temp_state == 0:
108                 await send_msg('Teplotní čidlo v pracovně nefunguje.')
109             else:
110                 await send_msg(f'Teplota v pracovně: {temp} °C')
111
112     if topic == 'burrow/heating/error':
113         err = int(val.split(' ')[0])
114         if err != last_boiler_err:
115             last_boiler_err = err
116             await send_msg(f'Chyba kotle: {err}')
117
118
119 async def mqtt_loop():
120     sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
121     sctx.verify_mode = ssl.CERT_REQUIRED
122     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
123     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
124
125     async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
126         async with mqtt.unfiltered_messages() as messages:
127             await mqtt.subscribe("burrow/heating/#")
128             await mqtt.subscribe("burrow/temp/#")
129             async for msg in messages:
130                 await mqtt_process_msg(msg.topic, msg.payload.decode())
131
132
133 async def mqtt_watcher():
134     while True:
135         try:
136             logger.info("Starting MQTT")
137             await mqtt_loop()
138         except asyncio_mqtt.MqttError as error:
139             logger.error(f"MQTT error: {error}")
140         finally:
141             await asyncio.sleep(10)
142
143
144 async def fortunes():
145     while True:
146         proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
147         out, err = await proc.communicate()
148         if proc.returncode == 0:
149             await send_msg(out.decode())
150         else:
151             logger.error(f'fortune failed with return code {proc.returncode}')
152         await asyncio.sleep(24*60*60)
153
154
155 async def main():
156     loop = asyncio.get_event_loop()
157     t1 = loop.create_task(mqtt_watcher())
158     t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
159     t3 = loop.create_task(fortunes())
160     await asyncio.wait((t1, t2, t3))
161
162
163 asyncio.run(main())