3 from aiogram import Bot, Dispatcher, executor, types
6 from datetime import datetime, timedelta
14 formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
15 log_handler = logging.StreamHandler(stream=sys.stdout)
16 log_handler.setFormatter(formatter)
17 logger = logging.getLogger()
18 logger.setLevel(logging.DEBUG)
19 logger.addHandler(log_handler)
21 bot = Bot(token=config.API_TOKEN)
22 dispatcher = Dispatcher(bot)
25 @dispatcher.message_handler(commands=['start', 'help'])
26 async def send_welcome(message: types.Message):
27 logger.info(f'Start from {message.chat}')
28 await message.reply("Brum!\nI'm BurrowBot!\n")
31 @dispatcher.message_handler()
32 async def echo(message: types.Message):
34 # await message.answer(message.text)
38 BUCKET_REFRESH = timedelta(seconds=10)
39 bucket = BUCKET_CAPACITY
40 bucket_last_fill = datetime.now()
43 async def send_msg(text):
44 global bucket, bucket_last_fill
45 logger.info(f'Sending message: {text}')
47 if bucket < BUCKET_CAPACITY:
49 while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
50 bucket_last_fill += BUCKET_REFRESH
52 logger.debug(f'Bucket refill: {bucket}')
56 logger.debug(f'Bucket drain: {bucket}')
58 for chat in config.CHATS:
59 await bot.send_message(chat, text + '\n')
62 logger.info('Bucket empty :(')
67 def hysteresis(key, value, low, high):
69 old_state = hyst_state[key]
84 hyst_state[key] = new_state
92 async def mqtt_process_msg(topic, val):
93 global last_temp_state, last_temp_warning
94 global last_boiler_err
97 print(topic, '->', val)
99 if topic == 'burrow/temp/catarium':
100 temp, when = map(float, val.split(' '))
101 if when < now.timestamp() - 3600:
104 temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
105 if temp_state != last_temp_state:
106 last_temp_state = temp_state
108 await send_msg('Teplotní čidlo v pracovně nefunguje.')
110 await send_msg(f'Teplota v pracovně: {temp} °C')
112 if topic == 'burrow/heating/error':
113 err = int(val.split(' ')[0])
114 if err != last_boiler_err:
115 last_boiler_err = err
116 await send_msg(f'Chyba kotle: {err}')
119 async def mqtt_loop():
120 sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
121 sctx.verify_mode = ssl.CERT_REQUIRED
122 sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
123 sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
125 async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
126 async with mqtt.unfiltered_messages() as messages:
127 await mqtt.subscribe("burrow/heating/#")
128 await mqtt.subscribe("burrow/temp/#")
129 async for msg in messages:
130 await mqtt_process_msg(msg.topic, msg.payload.decode())
133 async def mqtt_watcher():
136 logger.info("Starting MQTT")
138 except asyncio_mqtt.MqttError as error:
139 logger.error(f"MQTT error: {error}")
141 await asyncio.sleep(10)
144 async def fortunes():
146 proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
147 out, err = await proc.communicate()
148 if proc.returncode == 0:
149 await send_msg(out.decode())
151 logger.error(f'fortune failed with return code {proc.returncode}')
152 await asyncio.sleep(24*60*60)
156 loop = asyncio.get_event_loop()
157 t1 = loop.create_task(mqtt_watcher())
158 t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
159 t3 = loop.create_task(fortunes())
160 await asyncio.wait((t1, t2, t3))