2 # A simple daemon for sending Telegram notifications about failures in the Burrow
3 # (c) 2022 Martin Mareš <mj@ucw.cz>
5 from aiogram import Bot, Dispatcher, executor, types
8 from configparser import ConfigParser
9 from datetime import datetime, timedelta
15 config = ConfigParser()
16 config.read('/usr/local/etc/burrow-telegram')
17 API_TOKEN = config['telegram']['api_token']
18 CHATS = map(int, config['telegram']['chats'].split(' '))
20 formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
21 log_handler = logging.StreamHandler(stream=sys.stdout)
22 log_handler.setFormatter(formatter)
23 logger = logging.getLogger()
24 logger.setLevel(logging.INFO)
25 logger.addHandler(log_handler)
27 bot = Bot(token=API_TOKEN)
28 dispatcher = Dispatcher(bot)
31 @dispatcher.message_handler(commands=['start', 'help'])
32 async def send_welcome(message: types.Message):
33 logger.info(f'Start from {message.chat}')
34 await message.reply("Brum!\nI'm BurrowBot!\n")
37 @dispatcher.message_handler()
38 async def echo(message: types.Message):
40 # await message.answer(message.text)
44 BUCKET_REFRESH = timedelta(seconds=10)
45 bucket = BUCKET_CAPACITY
46 bucket_last_fill = datetime.now()
49 async def send_msg(text):
50 global bucket, bucket_last_fill
51 logger.info(f'Sending message: {text}')
53 if bucket < BUCKET_CAPACITY:
55 while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
56 bucket_last_fill += BUCKET_REFRESH
58 logger.debug(f'Bucket refill: {bucket}')
62 logger.debug(f'Bucket drain: {bucket}')
65 await bot.send_message(chat, text + '\n')
68 logger.info('Bucket empty :(')
73 def hysteresis(key, value, low, high):
75 old_state = hyst_state[key]
90 hyst_state[key] = new_state
98 async def mqtt_process_msg(topic, val):
99 global last_temp_state, last_temp_warning
100 global last_boiler_err
103 print(topic, '->', val)
105 if topic == 'burrow/temp/catarium':
106 temp, when = map(float, val.split(' '))
107 if when < now.timestamp() - 3600:
110 temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
111 if temp_state != last_temp_state:
112 last_temp_state = temp_state
114 await send_msg('Teplotní čidlo v pracovně nefunguje.')
116 await send_msg(f'Teplota v pracovně: {temp} °C')
118 if topic == 'burrow/heating/error':
119 err = int(val.split(' ')[0])
120 if err != last_boiler_err:
121 last_boiler_err = err
122 await send_msg(f'Chyba kotle: {err}')
125 async def mqtt_loop():
126 sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
127 sctx.verify_mode = ssl.CERT_REQUIRED
128 sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
129 sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
131 async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
132 async with mqtt.unfiltered_messages() as messages:
133 await mqtt.subscribe("burrow/heating/#")
134 await mqtt.subscribe("burrow/temp/#")
135 async for msg in messages:
136 await mqtt_process_msg(msg.topic, msg.payload.decode())
139 async def mqtt_watcher():
142 logger.info("Starting MQTT")
144 except asyncio_mqtt.MqttError as error:
145 logger.error(f"MQTT error: {error}")
147 await asyncio.sleep(10)
150 async def fortunes():
151 await asyncio.sleep(5*60)
153 proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
154 out, err = await proc.communicate()
155 if proc.returncode == 0:
156 await send_msg(out.decode())
158 logger.error(f'fortune failed with return code {proc.returncode}')
159 await asyncio.sleep(24*60*60)
163 loop = asyncio.get_event_loop()
164 t1 = loop.create_task(mqtt_watcher())
165 t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
166 t3 = loop.create_task(fortunes())
167 await asyncio.wait((t1, t2, t3))