+++ /dev/null
-#!/usr/bin/env python
-
-from aiogram import Bot, Dispatcher, executor, types
-import asyncio
-import asyncio_mqtt
-from datetime import datetime, timedelta
-import logging
-import signal
-import ssl
-import sys
-
-import config
-
-formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
-log_handler = logging.StreamHandler(stream=sys.stdout)
-log_handler.setFormatter(formatter)
-logger = logging.getLogger()
-logger.setLevel(logging.DEBUG)
-logger.addHandler(log_handler)
-
-bot = Bot(token=config.API_TOKEN)
-dispatcher = Dispatcher(bot)
-
-
-@dispatcher.message_handler(commands=['start', 'help'])
-async def send_welcome(message: types.Message):
- logger.info(f'Start from {message.chat}')
- await message.reply("Brum!\nI'm BurrowBot!\n")
-
-
-@dispatcher.message_handler()
-async def echo(message: types.Message):
- print(message)
- # await message.answer(message.text)
-
-
-BUCKET_CAPACITY = 5
-BUCKET_REFRESH = timedelta(seconds=10)
-bucket = BUCKET_CAPACITY
-bucket_last_fill = datetime.now()
-
-
-async def send_msg(text):
- global bucket, bucket_last_fill
- logger.info(f'Sending message: {text}')
-
- if bucket < BUCKET_CAPACITY:
- now = datetime.now()
- while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
- bucket_last_fill += BUCKET_REFRESH
- bucket += 1
- logger.debug(f'Bucket refill: {bucket}')
-
- if bucket > 0:
- bucket -= 1
- logger.debug(f'Bucket drain: {bucket}')
-
- for chat in config.CHATS:
- await bot.send_message(chat, text + '\n')
-
- else:
- logger.info('Bucket empty :(')
-
-
-hyst_state = {}
-
-def hysteresis(key, value, low, high):
- if key in hyst_state:
- old_state = hyst_state[key]
- else:
- old_state = 0
- if value is None:
- new_state = 0
- elif old_state <= 0:
- if value >= high:
- new_state = 1
- else:
- new_state = -1
- else:
- if value <= low:
- new_state = -1
- else:
- new_state = 1
- hyst_state[key] = new_state
- return new_state
-
-
-last_temp_state = 1
-last_boiler_err = 0
-
-
-async def mqtt_process_msg(topic, val):
- global last_temp_state, last_temp_warning
- global last_boiler_err
- now = datetime.now()
-
- print(topic, '->', val)
-
- if topic == 'burrow/temp/catarium':
- temp, when = map(float, val.split(' '))
- if when < now.timestamp() - 3600:
- temp_state = 0
- else:
- temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
- if temp_state != last_temp_state:
- last_temp_state = temp_state
- if temp_state == 0:
- await send_msg('Teplotní čidlo v pracovně nefunguje.')
- else:
- await send_msg(f'Teplota v pracovně: {temp} °C')
-
- if topic == 'burrow/heating/error':
- err = int(val.split(' ')[0])
- if err != last_boiler_err:
- last_boiler_err = err
- await send_msg(f'Chyba kotle: {err}')
-
-
-async def mqtt_loop():
- sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- sctx.verify_mode = ssl.CERT_REQUIRED
- sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
- sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
-
- async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
- async with mqtt.unfiltered_messages() as messages:
- await mqtt.subscribe("burrow/heating/#")
- await mqtt.subscribe("burrow/temp/#")
- async for msg in messages:
- await mqtt_process_msg(msg.topic, msg.payload.decode())
-
-
-async def mqtt_watcher():
- while True:
- try:
- logger.info("Starting MQTT")
- await mqtt_loop()
- except asyncio_mqtt.MqttError as error:
- logger.error(f"MQTT error: {error}")
- finally:
- await asyncio.sleep(10)
-
-
-async def fortunes():
- while True:
- proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
- out, err = await proc.communicate()
- if proc.returncode == 0:
- await send_msg(out.decode())
- else:
- logger.error(f'fortune failed with return code {proc.returncode}')
- await asyncio.sleep(24*60*60)
-
-
-async def main():
- loop = asyncio.get_event_loop()
- t1 = loop.create_task(mqtt_watcher())
- t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
- t3 = loop.create_task(fortunes())
- await asyncio.wait((t1, t2, t3))
-
-
-asyncio.run(main())
--- /dev/null
+#!/usr/bin/env python
+# A simple daemon for sending Telegram notifications about failures in the Burrow
+# (c) 2022 Martin Mareš <mj@ucw.cz>
+
+from aiogram import Bot, Dispatcher, executor, types
+import asyncio
+import asyncio_mqtt
+from configparser import ConfigParser
+from datetime import datetime, timedelta
+import logging
+import signal
+import ssl
+import sys
+
+config = ConfigParser()
+config.read('/usr/local/etc/burrow-telegram')
+API_TOKEN = config['telegram']['api_token']
+CHATS = map(int, config['telegram']['chats'].split(' '))
+
+formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
+log_handler = logging.StreamHandler(stream=sys.stdout)
+log_handler.setFormatter(formatter)
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+logger.addHandler(log_handler)
+
+bot = Bot(token=API_TOKEN)
+dispatcher = Dispatcher(bot)
+
+
+@dispatcher.message_handler(commands=['start', 'help'])
+async def send_welcome(message: types.Message):
+ logger.info(f'Start from {message.chat}')
+ await message.reply("Brum!\nI'm BurrowBot!\n")
+
+
+@dispatcher.message_handler()
+async def echo(message: types.Message):
+ print(message)
+ # await message.answer(message.text)
+
+
+BUCKET_CAPACITY = 5
+BUCKET_REFRESH = timedelta(seconds=10)
+bucket = BUCKET_CAPACITY
+bucket_last_fill = datetime.now()
+
+
+async def send_msg(text):
+ global bucket, bucket_last_fill
+ logger.info(f'Sending message: {text}')
+
+ if bucket < BUCKET_CAPACITY:
+ now = datetime.now()
+ while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
+ bucket_last_fill += BUCKET_REFRESH
+ bucket += 1
+ logger.debug(f'Bucket refill: {bucket}')
+
+ if bucket > 0:
+ bucket -= 1
+ logger.debug(f'Bucket drain: {bucket}')
+
+ for chat in CHATS:
+ await bot.send_message(chat, text + '\n')
+
+ else:
+ logger.info('Bucket empty :(')
+
+
+hyst_state = {}
+
+def hysteresis(key, value, low, high):
+ if key in hyst_state:
+ old_state = hyst_state[key]
+ else:
+ old_state = 0
+ if value is None:
+ new_state = 0
+ elif old_state <= 0:
+ if value >= high:
+ new_state = 1
+ else:
+ new_state = -1
+ else:
+ if value <= low:
+ new_state = -1
+ else:
+ new_state = 1
+ hyst_state[key] = new_state
+ return new_state
+
+
+last_temp_state = 1
+last_boiler_err = 0
+
+
+async def mqtt_process_msg(topic, val):
+ global last_temp_state, last_temp_warning
+ global last_boiler_err
+ now = datetime.now()
+
+ print(topic, '->', val)
+
+ if topic == 'burrow/temp/catarium':
+ temp, when = map(float, val.split(' '))
+ if when < now.timestamp() - 3600:
+ temp_state = 0
+ else:
+ temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
+ if temp_state != last_temp_state:
+ last_temp_state = temp_state
+ if temp_state == 0:
+ await send_msg('Teplotní čidlo v pracovně nefunguje.')
+ else:
+ await send_msg(f'Teplota v pracovně: {temp} °C')
+
+ if topic == 'burrow/heating/error':
+ err = int(val.split(' ')[0])
+ if err != last_boiler_err:
+ last_boiler_err = err
+ await send_msg(f'Chyba kotle: {err}')
+
+
+async def mqtt_loop():
+ sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ sctx.verify_mode = ssl.CERT_REQUIRED
+ sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
+ sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
+
+ async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
+ async with mqtt.unfiltered_messages() as messages:
+ await mqtt.subscribe("burrow/heating/#")
+ await mqtt.subscribe("burrow/temp/#")
+ async for msg in messages:
+ await mqtt_process_msg(msg.topic, msg.payload.decode())
+
+
+async def mqtt_watcher():
+ while True:
+ try:
+ logger.info("Starting MQTT")
+ await mqtt_loop()
+ except asyncio_mqtt.MqttError as error:
+ logger.error(f"MQTT error: {error}")
+ finally:
+ await asyncio.sleep(10)
+
+
+async def fortunes():
+ await asyncio.sleep(5*60)
+ while True:
+ proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
+ out, err = await proc.communicate()
+ if proc.returncode == 0:
+ await send_msg(out.decode())
+ else:
+ logger.error(f'fortune failed with return code {proc.returncode}')
+ await asyncio.sleep(24*60*60)
+
+
+async def main():
+ loop = asyncio.get_event_loop()
+ t1 = loop.create_task(mqtt_watcher())
+ t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
+ t3 = loop.create_task(fortunes())
+ await asyncio.wait((t1, t2, t3))
+
+
+asyncio.run(main())