X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;ds=inline;f=telegram%2Fburrow-telegram.py;h=22fa145eda4f97ef4cacb5cb859b5c4891f18d69;hb=ef81a4d74227ac8fb82b0d153ae85b25cfc7b18b;hp=608fd119466ec59e3fe8e0754f6781f3cf2f626c;hpb=c575e82942e42fb343a8fee502bfb91d539272f5;p=home-hw.git diff --git a/telegram/burrow-telegram.py b/telegram/burrow-telegram.py index 608fd11..22fa145 100755 --- a/telegram/burrow-telegram.py +++ b/telegram/burrow-telegram.py @@ -4,10 +4,11 @@ from aiogram import Bot, Dispatcher, executor, types import asyncio -import asyncio_mqtt +import aiomqtt from configparser import ConfigParser from datetime import datetime, timedelta import logging +from logging.handlers import SysLogHandler import signal import ssl import sys @@ -15,10 +16,16 @@ import sys config = ConfigParser() config.read('/usr/local/etc/burrow-telegram') API_TOKEN = config['telegram']['api_token'] -CHATS = map(int, config['telegram']['chats'].split(' ')) - -formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S') -log_handler = logging.StreamHandler(stream=sys.stdout) +CHATS = list(map(int, config['telegram']['chats'].split(' '))) +USE_SYSLOG = True + +if USE_SYSLOG: + formatter = logging.Formatter(fmt="%(message)s") # systemd will handle the rest + log_handler = SysLogHandler('/dev/log', facility=SysLogHandler.LOG_LOCAL1) + log_handler.ident = 'burrow-telegram: ' +else: + formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S') + log_handler = logging.StreamHandler(stream=sys.stdout) log_handler.setFormatter(formatter) logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -37,6 +44,8 @@ async def send_welcome(message: types.Message): @dispatcher.message_handler() async def echo(message: types.Message): print(message) + if message.text.startswith("xyzzy"): + await send_msg("Nothing happens.") # await message.answer(message.text) @@ -107,7 +116,7 @@ async def mqtt_process_msg(topic, val): if when < now.timestamp() - 3600: temp_state = 0 else: - temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5) + temp_state = hysteresis('catarium-temp', temp, 22, 23) if temp_state != last_temp_state: last_temp_state = temp_state if temp_state == 0: @@ -123,17 +132,19 @@ async def mqtt_process_msg(topic, val): async def mqtt_loop(): - sctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + sctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) sctx.verify_mode = ssl.CERT_REQUIRED sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key') sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt') - async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt: - async with mqtt.unfiltered_messages() as messages: - await mqtt.subscribe("burrow/heating/#") - await mqtt.subscribe("burrow/temp/#") - async for msg in messages: - await mqtt_process_msg(msg.topic, msg.payload.decode()) + mqtt = aiomqtt.Client(client_id='telegram', hostname="burrow-mqtt", port=8883, tls_context=sctx) + await mqtt.connect() + + async with mqtt.messages() as messages: + await mqtt.subscribe("burrow/heating/#") + await mqtt.subscribe("burrow/temp/#") + async for msg in messages: + await mqtt_process_msg(msg.topic.value, msg.payload.decode()) async def mqtt_watcher(): @@ -141,16 +152,15 @@ async def mqtt_watcher(): try: logger.info("Starting MQTT") await mqtt_loop() - except asyncio_mqtt.MqttError as error: + except aiomqtt.MqttError as error: logger.error(f"MQTT error: {error}") - finally: - await asyncio.sleep(10) + await asyncio.sleep(10) async def fortunes(): await asyncio.sleep(5*60) while True: - proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE) + proc = await asyncio.create_subprocess_exec('/usr/games/fortune', stdout=asyncio.subprocess.PIPE) out, err = await proc.communicate() if proc.returncode == 0: await send_msg(out.decode()) @@ -161,10 +171,14 @@ async def fortunes(): async def main(): loop = asyncio.get_event_loop() - t1 = loop.create_task(mqtt_watcher()) - t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None)) - t3 = loop.create_task(fortunes()) - await asyncio.wait((t1, t2, t3)) + coros = [ + loop.create_task(mqtt_watcher()), + loop.create_task(dispatcher.start_polling(timeout=60, relax=0.01, fast=True, allowed_updates=None)), + loop.create_task(fortunes()), + ] + for coro in asyncio.as_completed(coros): + done = await coro + done.result() # The coroutine probably died of an exception, which is raised here. asyncio.run(main())