]> mj.ucw.cz Git - home-hw.git/blobdiff - telegram/burrow-telegram.py
Auto: Meditation mode turned off
[home-hw.git] / telegram / burrow-telegram.py
index 1568397dfac448b86ad55caaba4c48599d0d2941..22fa145eda4f97ef4cacb5cb859b5c4891f18d69 100755 (executable)
@@ -4,10 +4,11 @@
 
 from aiogram import Bot, Dispatcher, executor, types
 import asyncio
-import asyncio_mqtt
+import aiomqtt
 from configparser import ConfigParser
 from datetime import datetime, timedelta
 import logging
+from logging.handlers import SysLogHandler
 import signal
 import ssl
 import sys
@@ -15,10 +16,16 @@ import sys
 config = ConfigParser()
 config.read('/usr/local/etc/burrow-telegram')
 API_TOKEN = config['telegram']['api_token']
-CHATS = map(int, config['telegram']['chats'].split(' '))
-
-formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
-log_handler = logging.StreamHandler(stream=sys.stdout)
+CHATS = list(map(int, config['telegram']['chats'].split(' ')))
+USE_SYSLOG = True
+
+if USE_SYSLOG:
+    formatter = logging.Formatter(fmt="%(message)s")        # systemd will handle the rest
+    log_handler = SysLogHandler('/dev/log', facility=SysLogHandler.LOG_LOCAL1)
+    log_handler.ident = 'burrow-telegram: '
+else:
+    formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
+    log_handler = logging.StreamHandler(stream=sys.stdout)
 log_handler.setFormatter(formatter)
 logger = logging.getLogger()
 logger.setLevel(logging.INFO)
@@ -37,6 +44,8 @@ async def send_welcome(message: types.Message):
 @dispatcher.message_handler()
 async def echo(message: types.Message):
     print(message)
+    if message.text.startswith("xyzzy"):
+        await send_msg("Nothing happens.")
     # await message.answer(message.text)
 
 
@@ -100,14 +109,14 @@ async def mqtt_process_msg(topic, val):
     global last_boiler_err
     now = datetime.now()
 
-    print(topic, '->', val)
+    logger.debug(f'MQTT: {topic} -> {val}')
 
     if topic == 'burrow/temp/catarium':
         temp, when = map(float, val.split(' '))
         if when < now.timestamp() - 3600:
             temp_state = 0
         else:
-            temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
+            temp_state = hysteresis('catarium-temp', temp, 22, 23)
         if temp_state != last_temp_state:
             last_temp_state = temp_state
             if temp_state == 0:
@@ -123,17 +132,19 @@ async def mqtt_process_msg(topic, val):
 
 
 async def mqtt_loop():
-    sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+    sctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     sctx.verify_mode = ssl.CERT_REQUIRED
     sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
     sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
 
-    async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
-        async with mqtt.unfiltered_messages() as messages:
-            await mqtt.subscribe("burrow/heating/#")
-            await mqtt.subscribe("burrow/temp/#")
-            async for msg in messages:
-                await mqtt_process_msg(msg.topic, msg.payload.decode())
+    mqtt = aiomqtt.Client(client_id='telegram', hostname="burrow-mqtt", port=8883, tls_context=sctx)
+    await mqtt.connect()
+
+    async with mqtt.messages() as messages:
+        await mqtt.subscribe("burrow/heating/#")
+        await mqtt.subscribe("burrow/temp/#")
+        async for msg in messages:
+            await mqtt_process_msg(msg.topic.value, msg.payload.decode())
 
 
 async def mqtt_watcher():
@@ -141,16 +152,15 @@ async def mqtt_watcher():
         try:
             logger.info("Starting MQTT")
             await mqtt_loop()
-        except asyncio_mqtt.MqttError as error:
+        except aiomqtt.MqttError as error:
             logger.error(f"MQTT error: {error}")
-        finally:
-            await asyncio.sleep(10)
+        await asyncio.sleep(10)
 
 
 async def fortunes():
     await asyncio.sleep(5*60)
     while True:
-        proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
+        proc = await asyncio.create_subprocess_exec('/usr/games/fortune', stdout=asyncio.subprocess.PIPE)
         out, err = await proc.communicate()
         if proc.returncode == 0:
             await send_msg(out.decode())
@@ -161,10 +171,14 @@ async def fortunes():
 
 async def main():
     loop = asyncio.get_event_loop()
-    t1 = loop.create_task(mqtt_watcher())
-    t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
-    t3 = loop.create_task(fortunes())
-    await asyncio.wait((t1, t2, t3))
+    coros = [
+        loop.create_task(mqtt_watcher()),
+        loop.create_task(dispatcher.start_polling(timeout=60, relax=0.01, fast=True, allowed_updates=None)),
+        loop.create_task(fortunes()),
+    ]
+    for coro in asyncio.as_completed(coros):
+        done = await coro
+        done.result()       # The coroutine probably died of an exception, which is raised here.
 
 
 asyncio.run(main())