]> mj.ucw.cz Git - home-hw.git/commitdiff
Testing Telegram bot
authorMartin Mares <mj@ucw.cz>
Sat, 12 Feb 2022 20:44:14 +0000 (21:44 +0100)
committerMartin Mares <mj@ucw.cz>
Sat, 12 Feb 2022 20:44:14 +0000 (21:44 +0100)
telegram/requirements.txt [new file with mode: 0644]
telegram/test.py [new file with mode: 0755]

diff --git a/telegram/requirements.txt b/telegram/requirements.txt
new file mode 100644 (file)
index 0000000..ce90c2b
--- /dev/null
@@ -0,0 +1,2 @@
+aiogram
+asyncio-mqtt
diff --git a/telegram/test.py b/telegram/test.py
new file mode 100755 (executable)
index 0000000..fb940a4
--- /dev/null
@@ -0,0 +1,137 @@
+#!/usr/bin/env python
+
+from aiogram import Bot, Dispatcher, executor, types
+import asyncio
+import asyncio_mqtt
+from datetime import datetime, timedelta
+import logging
+import ssl
+
+import config
+
+logging.basicConfig(level=logging.INFO)
+
+bot = Bot(token=config.API_TOKEN)
+dp = Dispatcher(bot)
+
+
+@dp.message_handler(commands=['start', 'help'])
+async def send_welcome(message: types.Message):
+    logging.info(f'Start from {message.chat}')
+    await message.reply("Brum!\nI'm BurrowBot!\n.")
+
+
+@dp.message_handler()
+async def echo(message: types.Message):
+    print(message)
+    # await message.answer(message.text)
+
+
+BUCKET_CAPACITY = 5
+BUCKET_REFRESH = timedelta(seconds=10)
+bucket = BUCKET_CAPACITY
+bucket_last_fill = datetime.now()
+
+
+async def send_msg(text):
+    global bucket, bucket_last_fill
+    logging.info(f'>>> Sending message: {text}')
+
+    if bucket < BUCKET_CAPACITY:
+        now = datetime.now()
+        while bucket < BUCKET_CAPACITY and bucket_last_fill + BUCKET_REFRESH <= now:
+            bucket_last_fill += BUCKET_REFRESH
+            bucket += 1
+            logging.info(f'Bucket refill: {bucket}')
+
+    if bucket > 0:
+        bucket -= 1
+        logging.info(f'Bucket drain: {bucket}')
+
+        for chat in config.CHATS:
+            await bot.send_message(chat, text + '\n')
+
+    else:
+        logging.info('Bucket empty :(')
+
+
+hyst_state = {}
+
+def hysteresis(key, value, low, high):
+    if key in hyst_state:
+        old_state = hyst_state[key]
+    else:
+        old_state = 0
+    if value is None:
+        new_state = 0
+    elif old_state <= 0:
+        if value >= high:
+            new_state = 1
+        else:
+            new_state = -1
+    else:
+        if value <= low:
+            new_state = -1
+        else:
+            new_state = 1
+    hyst_state[key] = new_state
+    return new_state
+
+
+last_temp_state = 0
+last_boiler_err = 0
+
+
+async def mqtt_process_msg(topic, val):
+    global last_temp_state, last_temp_warning
+    global last_boiler_err
+
+    # print(topic, '->', val)
+
+    if topic == 'burrow/temp/catarium':
+        temp = float(val.split(' ')[0])
+        temp_state = hysteresis('catarium-temp', temp, 22, 23)
+        if temp_state != last_temp_state:
+            last_temp_state = temp_state
+            await send_msg(f'Teplota v pracovně: {temp} °C')
+
+    if topic == 'burrow/heating/error':
+        err = int(val.split(' ')[0])
+        if err != last_boiler_err:
+            last_boiler_err = err
+            await send_msg(f'Chyba kotle: {err}')
+
+
+async def mqtt_loop():
+    sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+    sctx.verify_mode = ssl.CERT_REQUIRED
+    sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
+    sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
+
+    async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
+        async with mqtt.unfiltered_messages() as messages:
+            await mqtt.subscribe("burrow/heating/#")
+            await mqtt.subscribe("burrow/temp/#")
+            async for msg in messages:
+                await mqtt_process_msg(msg.topic, msg.payload.decode())
+
+
+async def mqtt_watcher():
+    while True:
+        try:
+            logging.info(">>> Starting MQTT")
+            await mqtt_loop()
+        except asyncio_mqtt.MqttError as error:
+            logging.error(f"MQTT error: {error}")
+        finally:
+            await asyncio.sleep(30)
+
+
+loop = asyncio.get_event_loop()
+loop.create_task(mqtt_watcher())
+
+executor.start_polling(dp, skip_updates=True)
+
+## loop = asyncio.get_event_loop()
+## loop.create_task(self.dispatcher.start_polling(reset_webhook=reset_webhook, timeout=20, relax=0.01, fast=True, allowed_updates=None))
+## loop.run_forever()