]> mj.ucw.cz Git - home-hw.git/blob - telegram/test.py
fb940a4f7d5feb473ab2a1a45c75281a3eeba6da
[home-hw.git] / telegram / test.py
1 #!/usr/bin/env python
2
3 from aiogram import Bot, Dispatcher, executor, types
4 import asyncio
5 import asyncio_mqtt
6 from datetime import datetime, timedelta
7 import logging
8 import ssl
9
10 import config
11
12 logging.basicConfig(level=logging.INFO)
13
14 bot = Bot(token=config.API_TOKEN)
15 dp = Dispatcher(bot)
16
17
18 @dp.message_handler(commands=['start', 'help'])
19 async def send_welcome(message: types.Message):
20     logging.info(f'Start from {message.chat}')
21     await message.reply("Brum!\nI'm BurrowBot!\n.")
22
23
24 @dp.message_handler()
25 async def echo(message: types.Message):
26     print(message)
27     # await message.answer(message.text)
28
29
30 BUCKET_CAPACITY = 5
31 BUCKET_REFRESH = timedelta(seconds=10)
32 bucket = BUCKET_CAPACITY
33 bucket_last_fill = datetime.now()
34
35
36 async def send_msg(text):
37     global bucket, bucket_last_fill
38     logging.info(f'>>> Sending message: {text}')
39
40     if bucket < BUCKET_CAPACITY:
41         now = datetime.now()
42         while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
43             bucket_last_fill += BUCKET_REFRESH
44             bucket += 1
45             logging.info(f'Bucket refill: {bucket}')
46
47     if bucket > 0:
48         bucket -= 1
49         logging.info(f'Bucket drain: {bucket}')
50
51         for chat in config.CHATS:
52             await bot.send_message(chat, text + '\n')
53
54     else:
55         logging.info('Bucket empty :(')
56
57
58 hyst_state = {}
59
60 def hysteresis(key, value, low, high):
61     if key in hyst_state:
62         old_state = hyst_state[key]
63     else:
64         old_state = 0
65     if value is None:
66         new_state = 0
67     elif old_state <= 0:
68         if value >= high:
69             new_state = 1
70         else:
71             new_state = -1
72     else:
73         if value <= low:
74             new_state = -1
75         else:
76             new_state = 1
77     hyst_state[key] = new_state
78     return new_state
79
80
81 last_temp_state = 0
82 last_boiler_err = 0
83
84
85 async def mqtt_process_msg(topic, val):
86     global last_temp_state, last_temp_warning
87     global last_boiler_err
88
89     # print(topic, '->', val)
90
91     if topic == 'burrow/temp/catarium':
92         temp = float(val.split(' ')[0])
93         temp_state = hysteresis('catarium-temp', temp, 22, 23)
94         if temp_state != last_temp_state:
95             last_temp_state = temp_state
96             await send_msg(f'Teplota v pracovně: {temp} °C')
97
98     if topic == 'burrow/heating/error':
99         err = int(val.split(' ')[0])
100         if err != last_boiler_err:
101             last_boiler_err = err
102             await send_msg(f'Chyba kotle: {err}')
103
104
105 async def mqtt_loop():
106     sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
107     sctx.verify_mode = ssl.CERT_REQUIRED
108     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
109     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
110
111     async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
112         async with mqtt.unfiltered_messages() as messages:
113             await mqtt.subscribe("burrow/heating/#")
114             await mqtt.subscribe("burrow/temp/#")
115             async for msg in messages:
116                 await mqtt_process_msg(msg.topic, msg.payload.decode())
117
118
119 async def mqtt_watcher():
120     while True:
121         try:
122             logging.info(">>> Starting MQTT")
123             await mqtt_loop()
124         except asyncio_mqtt.MqttError as error:
125             logging.error(f"MQTT error: {error}")
126         finally:
127             await asyncio.sleep(30)
128
129
130 loop = asyncio.get_event_loop()
131 loop.create_task(mqtt_watcher())
132
133 executor.start_polling(dp, skip_updates=True)
134
135 ## loop = asyncio.get_event_loop()
136 ## loop.create_task(self.dispatcher.start_polling(reset_webhook=reset_webhook, timeout=20, relax=0.01, fast=True, allowed_updates=None))
137 ## loop.run_forever()