from aiogram import Bot, Dispatcher, executor, types
import asyncio
-import asyncio_mqtt
+import aiomqtt
from configparser import ConfigParser
from datetime import datetime, timedelta
import logging
+from logging.handlers import SysLogHandler
import signal
import ssl
import sys
config = ConfigParser()
config.read('/usr/local/etc/burrow-telegram')
API_TOKEN = config['telegram']['api_token']
-CHATS = map(int, config['telegram']['chats'].split(' '))
-
-formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
-log_handler = logging.StreamHandler(stream=sys.stdout)
+CHATS = list(map(int, config['telegram']['chats'].split(' ')))
+USE_SYSLOG = True
+
+if USE_SYSLOG:
+ formatter = logging.Formatter(fmt="%(message)s") # systemd will handle the rest
+ log_handler = SysLogHandler('/dev/log', facility=SysLogHandler.LOG_LOCAL1)
+ log_handler.ident = 'burrow-telegram: '
+else:
+ formatter = logging.Formatter(fmt="%(asctime)s %(name)s.%(levelname)s: %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
+ log_handler = logging.StreamHandler(stream=sys.stdout)
log_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
@dispatcher.message_handler()
async def echo(message: types.Message):
print(message)
+ if message.text.startswith("xyzzy"):
+ await send_msg("Nothing happens.")
# await message.answer(message.text)
global last_boiler_err
now = datetime.now()
- print(topic, '->', val)
+ logger.debug(f'MQTT: {topic} -> {val}')
if topic == 'burrow/temp/catarium':
temp, when = map(float, val.split(' '))
if when < now.timestamp() - 3600:
temp_state = 0
else:
- temp_state = hysteresis('catarium-temp', temp, 23.2, 23.5)
+ temp_state = hysteresis('catarium-temp', temp, 22, 23)
if temp_state != last_temp_state:
last_temp_state = temp_state
if temp_state == 0:
async def mqtt_loop():
- sctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ sctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sctx.verify_mode = ssl.CERT_REQUIRED
sctx.load_cert_chain('/etc/burrow-mqtt/client.crt', '/etc/burrow-mqtt/client.key')
sctx.load_verify_locations(cafile='/etc/burrow-mqtt/ca.crt')
- async with asyncio_mqtt.Client(hostname="burrow-mqtt", port=8883, tls_context=sctx) as mqtt:
- async with mqtt.unfiltered_messages() as messages:
- await mqtt.subscribe("burrow/heating/#")
- await mqtt.subscribe("burrow/temp/#")
- async for msg in messages:
- await mqtt_process_msg(msg.topic, msg.payload.decode())
+ mqtt = aiomqtt.Client(client_id='telegram', hostname="burrow-mqtt", port=8883, tls_context=sctx)
+ await mqtt.connect()
+
+ async with mqtt.messages() as messages:
+ await mqtt.subscribe("burrow/heating/#")
+ await mqtt.subscribe("burrow/temp/#")
+ async for msg in messages:
+ await mqtt_process_msg(msg.topic.value, msg.payload.decode())
async def mqtt_watcher():
try:
logger.info("Starting MQTT")
await mqtt_loop()
- except asyncio_mqtt.MqttError as error:
+ except aiomqtt.MqttError as error:
logger.error(f"MQTT error: {error}")
- finally:
- await asyncio.sleep(10)
+ await asyncio.sleep(10)
async def fortunes():
await asyncio.sleep(5*60)
while True:
- proc = await asyncio.create_subprocess_exec('fortune', stdout=asyncio.subprocess.PIPE)
+ proc = await asyncio.create_subprocess_exec('/usr/games/fortune', stdout=asyncio.subprocess.PIPE)
out, err = await proc.communicate()
if proc.returncode == 0:
await send_msg(out.decode())
async def main():
loop = asyncio.get_event_loop()
- t1 = loop.create_task(mqtt_watcher())
- t2 = loop.create_task(dispatcher.start_polling(timeout=20, relax=0.01, fast=True, allowed_updates=None))
- t3 = loop.create_task(fortunes())
- await asyncio.wait((t1, t2, t3))
+ coros = [
+ loop.create_task(mqtt_watcher()),
+ loop.create_task(dispatcher.start_polling(timeout=60, relax=0.01, fast=True, allowed_updates=None)),
+ loop.create_task(fortunes()),
+ ]
+ for coro in asyncio.as_completed(coros):
+ done = await coro
+ done.result() # The coroutine probably died of an exception, which is raised here.
asyncio.run(main())