3 from aiogram import Bot, Dispatcher, executor, types
6 from datetime import datetime, timedelta
12 logging.basicConfig(level=logging.INFO)
14 bot = Bot(token=config.API_TOKEN)
18 @dp.message_handler(commands=['start', 'help'])
19 async def send_welcome(message: types.Message):
20 logging.info(f'Start from {message.chat}')
21 await message.reply("Brum!\nI'm BurrowBot!\n.")
25 async def echo(message: types.Message):
27 # await message.answer(message.text)
31 BUCKET_REFRESH = timedelta(seconds=10)
32 bucket = BUCKET_CAPACITY
33 bucket_last_fill = datetime.now()
36 async def send_msg(text):
37 global bucket, bucket_last_fill
38 logging.info(f'>>> Sending message: {text}')
40 if bucket < BUCKET_CAPACITY:
42 while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
43 bucket_last_fill += BUCKET_REFRESH
45 logging.info(f'Bucket refill: {bucket}')
49 logging.info(f'Bucket drain: {bucket}')
51 for chat in config.CHATS:
52 await bot.send_message(chat, text + '\n')
55 logging.info('Bucket empty :(')
60 def hysteresis(key, value, low, high):
62 old_state = hyst_state[key]
77 hyst_state[key] = new_state
85 async def mqtt_process_msg(topic, val):
86 global last_temp_state, last_temp_warning
87 global last_boiler_err
89 # print(topic, '->', val)
91 if topic == 'burrow/temp/catarium':
92 temp = float(val.split(' ')[0])
93 temp_state = hysteresis('catarium-temp', temp, 22, 23)
94 if temp_state != last_temp_state:
95 last_temp_state = temp_state
96 await send_msg(f'Teplota v pracovně: {temp} °C')
98 if topic == 'burrow/heating/error':
99 err = int(val.split(' ')[0])
100 if err != last_boiler_err:
101 last_boiler_err = err
102 await send_msg(f'Chyba kotle: {err}')
105 async def mqtt_loop():
106 sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
107 sctx.verify_mode = ssl.CERT_REQUIRED
108 sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
109 sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
111 async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
112 async with mqtt.unfiltered_messages() as messages:
113 await mqtt.subscribe("burrow/heating/#")
114 await mqtt.subscribe("burrow/temp/#")
115 async for msg in messages:
116 await mqtt_process_msg(msg.topic, msg.payload.decode())
119 async def mqtt_watcher():
122 logging.info(">>> Starting MQTT")
124 except asyncio_mqtt.MqttError as error:
125 logging.error(f"MQTT error: {error}")
127 await asyncio.sleep(30)
130 loop = asyncio.get_event_loop()
131 loop.create_task(mqtt_watcher())
133 executor.start_polling(dp, skip_updates=True)
135 ## loop = asyncio.get_event_loop()
136 ## loop.create_task(self.dispatcher.start_polling(reset_webhook=reset_webhook, timeout=20, relax=0.01, fast=True, allowed_updates=None))
137 ## loop.run_forever()