From: Martin Mares Date: Mon, 14 Feb 2022 21:47:07 +0000 (+0100) Subject: Telegram: Parts of installation machinery X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=45470bc30725b91df88376a70fc3cf5e68b1ed54;p=home-hw.git Telegram: Parts of installation machinery --- diff --git a/telegram/Makefile b/telegram/Makefile new file mode 100644 index 0000000..2b01e2b --- /dev/null +++ b/telegram/Makefile @@ -0,0 +1,10 @@ +VENV=/usr/local/lib/burrow-venv + +all: + +install: + [ -d $(VENV) ] || su -c "python3 -m venv $(VENV)" + su -c ". $(VENV)/bin/activate && pip install -r requirements.txt" + su -c "install -m 755 burrow-telegraf.py $(VENV)/bin/burrow-telegraf" + +.PHONY: all install diff --git a/telegram/burrow-telegraf.py b/telegram/burrow-telegraf.py new file mode 100755 index 0000000..8ff3797 --- /dev/null +++ b/telegram/burrow-telegraf.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +from aiogram import Bot, Dispatcher, executor, types +import asyncio +import asyncio_mqtt +from datetime import datetime, timedelta +import logging +import signal +import ssl +import sys + +import config + +formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S') +log_handler = logging.StreamHandler(stream=sys.stdout) +log_handler.setFormatter(formatter) +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) +logger.addHandler(log_handler) + +bot = Bot(token=config.API_TOKEN) +dispatcher = Dispatcher(bot) + + +@dispatcher.message_handler(commands=['start', 'help']) +async def send_welcome(message: types.Message): + logger.info(f'Start from {message.chat}') + await message.reply("Brum!\nI'm BurrowBot!\n") + + +@dispatcher.message_handler() +async def echo(message: types.Message): + print(message) + # await message.answer(message.text) + + +BUCKET_CAPACITY = 5 +BUCKET_REFRESH = timedelta(seconds=10) +bucket = BUCKET_CAPACITY +bucket_last_fill = datetime.now() + + +async def send_msg(text): + global bucket, bucket_last_fill + logger.info(f'Sending message: {text}') + + if bucket < BUCKET_CAPACITY: + now = datetime.now() + while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now: + bucket_last_fill += BUCKET_REFRESH + bucket += 1 + logger.debug(f'Bucket refill: {bucket}') + + if bucket > 0: + bucket -= 1 + logger.debug(f'Bucket drain: {bucket}') + + for chat in config.CHATS: + await bot.send_message(chat, text + '\n') + + else: + logger.info('Bucket empty :(') + + +hyst_state = {} + +def hysteresis(key, value, low, high): + if key in hyst_state: + old_state = hyst_state[key] + else: + old_state = 0 + if value is None: + new_state = 0 + elif old_state <= 0: + if value >= high: + new_state = 1 + else: + new_state = -1 + else: + if value <= low: + new_state = -1 + else: + new_state = 1 + hyst_state[key] = new_state + return new_state + + +last_temp_state = 1 +last_boiler_err = 0 + + +async def mqtt_process_msg(topic, val): + global last_temp_state, last_temp_warning + global last_boiler_err + now = datetime.now() + + print(topic, '->', val) + + if topic == 'burrow/temp/catarium': + temp, when = map(float, val.split(' ')) + if when < now.timestamp() - 3600: + temp_state = 0 + else: + temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5) + if temp_state != last_temp_state: + last_temp_state = temp_state + if temp_state == 0: + await send_msg('Teplotní čidlo v pracovně nefunguje.') + else: + await send_msg(f'Teplota v pracovně: {temp} °C') + + if topic == 'burrow/heating/error': + err = int(val.split(' ')[0]) + if err != last_boiler_err: + last_boiler_err = err + await send_msg(f'Chyba kotle: {err}') + + +async def mqtt_loop(): + sctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + sctx.verify_mode = ssl.CERT_REQUIRED + sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key') + sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt') + + async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt: + async with mqtt.unfiltered_messages() as messages: + await mqtt.subscribe("burrow/heating/#") + await mqtt.subscribe("burrow/temp/#") + async for msg in messages: + await mqtt_process_msg(msg.topic, msg.payload.decode()) + + +async def mqtt_watcher(): + while True: + try: + logger.info("Starting MQTT") + await mqtt_loop() + except asyncio_mqtt.MqttError as error: + logger.error(f"MQTT error: {error}") + finally: + await asyncio.sleep(10) + + +async def fortunes(): + while True: + proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE) + out, err = await proc.communicate() + if proc.returncode == 0: + await send_msg(out.decode()) + else: + logger.error(f'fortune failed with return code {proc.returncode}') + await asyncio.sleep(24*60*60) + + +async def main(): + loop = asyncio.get_event_loop() + t1 = loop.create_task(mqtt_watcher()) + t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None)) + t3 = loop.create_task(fortunes()) + await asyncio.wait((t1, t2, t3)) + + +asyncio.run(main()) diff --git a/telegram/test.py b/telegram/test.py deleted file mode 100755 index 8ff3797..0000000 --- a/telegram/test.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python - -from aiogram import Bot, Dispatcher, executor, types -import asyncio -import asyncio_mqtt -from datetime import datetime, timedelta -import logging -import signal -import ssl -import sys - -import config - -formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S') -log_handler = logging.StreamHandler(stream=sys.stdout) -log_handler.setFormatter(formatter) -logger = logging.getLogger() -logger.setLevel(logging.DEBUG) -logger.addHandler(log_handler) - -bot = Bot(token=config.API_TOKEN) -dispatcher = Dispatcher(bot) - - -@dispatcher.message_handler(commands=['start', 'help']) -async def send_welcome(message: types.Message): - logger.info(f'Start from {message.chat}') - await message.reply("Brum!\nI'm BurrowBot!\n") - - -@dispatcher.message_handler() -async def echo(message: types.Message): - print(message) - # await message.answer(message.text) - - -BUCKET_CAPACITY = 5 -BUCKET_REFRESH = timedelta(seconds=10) -bucket = BUCKET_CAPACITY -bucket_last_fill = datetime.now() - - -async def send_msg(text): - global bucket, bucket_last_fill - logger.info(f'Sending message: {text}') - - if bucket < BUCKET_CAPACITY: - now = datetime.now() - while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now: - bucket_last_fill += BUCKET_REFRESH - bucket += 1 - logger.debug(f'Bucket refill: {bucket}') - - if bucket > 0: - bucket -= 1 - logger.debug(f'Bucket drain: {bucket}') - - for chat in config.CHATS: - await bot.send_message(chat, text + '\n') - - else: - logger.info('Bucket empty :(') - - -hyst_state = {} - -def hysteresis(key, value, low, high): - if key in hyst_state: - old_state = hyst_state[key] - else: - old_state = 0 - if value is None: - new_state = 0 - elif old_state <= 0: - if value >= high: - new_state = 1 - else: - new_state = -1 - else: - if value <= low: - new_state = -1 - else: - new_state = 1 - hyst_state[key] = new_state - return new_state - - -last_temp_state = 1 -last_boiler_err = 0 - - -async def mqtt_process_msg(topic, val): - global last_temp_state, last_temp_warning - global last_boiler_err - now = datetime.now() - - print(topic, '->', val) - - if topic == 'burrow/temp/catarium': - temp, when = map(float, val.split(' ')) - if when < now.timestamp() - 3600: - temp_state = 0 - else: - temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5) - if temp_state != last_temp_state: - last_temp_state = temp_state - if temp_state == 0: - await send_msg('Teplotní čidlo v pracovně nefunguje.') - else: - await send_msg(f'Teplota v pracovně: {temp} °C') - - if topic == 'burrow/heating/error': - err = int(val.split(' ')[0]) - if err != last_boiler_err: - last_boiler_err = err - await send_msg(f'Chyba kotle: {err}') - - -async def mqtt_loop(): - sctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - sctx.verify_mode = ssl.CERT_REQUIRED - sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key') - sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt') - - async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt: - async with mqtt.unfiltered_messages() as messages: - await mqtt.subscribe("burrow/heating/#") - await mqtt.subscribe("burrow/temp/#") - async for msg in messages: - await mqtt_process_msg(msg.topic, msg.payload.decode()) - - -async def mqtt_watcher(): - while True: - try: - logger.info("Starting MQTT") - await mqtt_loop() - except asyncio_mqtt.MqttError as error: - logger.error(f"MQTT error: {error}") - finally: - await asyncio.sleep(10) - - -async def fortunes(): - while True: - proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE) - out, err = await proc.communicate() - if proc.returncode == 0: - await send_msg(out.decode()) - else: - logger.error(f'fortune failed with return code {proc.returncode}') - await asyncio.sleep(24*60*60) - - -async def main(): - loop = asyncio.get_event_loop() - t1 = loop.create_task(mqtt_watcher()) - t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None)) - t3 = loop.create_task(fortunes()) - await asyncio.wait((t1, t2, t3)) - - -asyncio.run(main())