|
| 1 | +import os |
| 2 | +import re |
| 3 | +from datetime import datetime |
| 4 | +from functools import partial |
| 5 | + |
| 6 | +import aiomqtt |
| 7 | +from aiocache import SimpleMemoryCache |
| 8 | +from discord import Embed, Message |
| 9 | +from discord.ext import commands |
| 10 | +from influxdb_client import InfluxDBClient |
| 11 | +from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope |
| 12 | +from meshtastic.protobuf.portnums_pb2 import TEXT_MESSAGE_APP |
| 13 | + |
| 14 | +from bridger.config import MQTT_BROKER, MQTT_PASS, MQTT_PORT, MQTT_TOPIC, MQTT_USER |
| 15 | +from bridger.dataclasses import TextMessagePoint |
| 16 | +from bridger.influx.interfaces import InfluxReader |
| 17 | +from bridger.log import logger |
| 18 | +from bridger.mqtt import PBPacketProcessor |
| 19 | + |
| 20 | +MQTT_TEST_CHANNEL_MESHTASTIC = os.getenv("MQTT_TEST_CHANNEL", "LongFast") |
| 21 | +MQTT_TEST_CHANNEL_DISCORD = int(os.getenv("MQTT_TEST_CHANNEL_ID", 1253788609316913265)) |
| 22 | +TEST_MESSAGE_MATCHERS = [ |
| 23 | + re.compile(r"^.*$", flags=re.IGNORECASE) if os.getenv("TEST_MESSAGE_MATCH_ALL", "false").lower() == "true" else None, |
| 24 | + re.compile(r"^\!\b.+$", flags=re.IGNORECASE), |
| 25 | + re.compile(r"^test\s+.+$", flags=re.IGNORECASE), |
| 26 | +] |
| 27 | + |
| 28 | + |
| 29 | +class TestMsg(commands.GroupCog, name="testmsg"): |
| 30 | + queue = SimpleMemoryCache() |
| 31 | + |
| 32 | + def __init__(self, bot: commands.Bot, discord_channel_id: int, influx_reader: InfluxReader): |
| 33 | + self.bot = bot |
| 34 | + self.discord_channel_id = discord_channel_id |
| 35 | + self.discord_channel = None |
| 36 | + self.influx_reader = influx_reader |
| 37 | + |
| 38 | + @commands.Cog.listener(name="on_ready") |
| 39 | + async def on_ready(self): |
| 40 | + self.discord_channel = self.bot.get_channel(self.discord_channel_id) |
| 41 | + logger.info(f"TestMsg cog is ready and channel is: {self.discord_channel}") |
| 42 | + |
| 43 | + @staticmethod |
| 44 | + def create_embed(service_envelope: ServiceEnvelope): |
| 45 | + packet = service_envelope.packet |
| 46 | + gateway = service_envelope.gateway_id |
| 47 | + color = int(gateway[-6:], 16) |
| 48 | + snr = packet.rx_snr |
| 49 | + rssi = packet.rx_rssi |
| 50 | + hop_count = None |
| 51 | + formatted_time = datetime.fromtimestamp(packet.rx_time).strftime("%H:%M:%S") |
| 52 | + |
| 53 | + if packet.hop_start > 0: |
| 54 | + hop_count = packet.hop_start - packet.hop_limit |
| 55 | + |
| 56 | + embed = Embed(color=color) |
| 57 | + # embed.set_author(name=gateway) |
| 58 | + embed.description = f"Heard by **{gateway}** at {formatted_time}" |
| 59 | + embed.add_field(name="SNR", value=snr, inline=True) |
| 60 | + embed.add_field(name="RSSI", value=rssi, inline=True) |
| 61 | + |
| 62 | + if hop_count == 0: |
| 63 | + embed.add_field(name="Hops", value="Direct", inline=True) |
| 64 | + elif hop_count is not None: |
| 65 | + embed.add_field(name="Hops", value=hop_count, inline=True) |
| 66 | + |
| 67 | + return embed |
| 68 | + |
| 69 | + async def update_message_embeds(self, message: Message, envelope: ServiceEnvelope): |
| 70 | + if len(message.embeds) >= 10: |
| 71 | + message_id = message.id |
| 72 | + logger.warning(f"Embed limit reached for message ID {message_id}, skipping update") |
| 73 | + return |
| 74 | + message.embeds.append(self.create_embed(envelope)) |
| 75 | + await message.edit(embeds=message.embeds) |
| 76 | + |
| 77 | + async def run_mqtt(self): |
| 78 | + topic = MQTT_TOPIC.removesuffix("/#") |
| 79 | + channel = MQTT_TEST_CHANNEL_MESHTASTIC |
| 80 | + full_topic = f"{topic}/{channel}/#" |
| 81 | + |
| 82 | + async with aiomqtt.Client( |
| 83 | + MQTT_BROKER, |
| 84 | + MQTT_PORT, |
| 85 | + username=MQTT_USER, |
| 86 | + password=MQTT_PASS, |
| 87 | + clean_session=True, |
| 88 | + ) as client: |
| 89 | + await client.subscribe(full_topic) |
| 90 | + logger.info(f"Subscribed to {full_topic}") |
| 91 | + await logger.complete() |
| 92 | + |
| 93 | + async for message in client.messages: |
| 94 | + try: |
| 95 | + service_envelope = ServiceEnvelope.FromString(message.payload) |
| 96 | + except Exception: |
| 97 | + logger.exception("Failed to decode MQTT message") |
| 98 | + continue |
| 99 | + |
| 100 | + processor = PBPacketProcessor(service_envelope=service_envelope, strip_text=False) |
| 101 | + |
| 102 | + if processor.portnum == TEXT_MESSAGE_APP: |
| 103 | + data: TextMessagePoint = processor.data |
| 104 | + if not data or not data.text: |
| 105 | + continue |
| 106 | + |
| 107 | + if not any(pattern.match(data.text) for pattern in TEST_MESSAGE_MATCHERS if pattern): |
| 108 | + continue |
| 109 | + |
| 110 | + logger.debug(f"Test message matched: {data.text}") |
| 111 | + |
| 112 | + packet = service_envelope.packet |
| 113 | + packet_id = packet.id |
| 114 | + source_node_id = getattr(packet, "from") |
| 115 | + node_info = self.influx_reader.get_node_info(source_node_id) |
| 116 | + |
| 117 | + short = node_info.get("short_name") if node_info else None |
| 118 | + long = node_info.get("long_name") if node_info else None |
| 119 | + name = f"**{short}** ({long})" if short and long else f"**{source_node_id}**" |
| 120 | + message_id = await self.queue.get(packet_id) |
| 121 | + |
| 122 | + extra = { |
| 123 | + "packet_id": packet_id, |
| 124 | + "source_node_id": source_node_id, |
| 125 | + "text": data.text, |
| 126 | + "gateway": service_envelope.gateway_id, |
| 127 | + "short_name": short, |
| 128 | + "long_name": long, |
| 129 | + "name": name, |
| 130 | + "node_info": node_info, |
| 131 | + } |
| 132 | + |
| 133 | + logger.bind(**extra).debug(f"Message ID {message_id} for packet ID {packet_id} from {name}") |
| 134 | + |
| 135 | + if message_id: |
| 136 | + try: |
| 137 | + message = await self.discord_channel.fetch_message(message_id) |
| 138 | + await self.update_message_embeds(message, service_envelope) |
| 139 | + except Exception: |
| 140 | + logger.exception("Failed to fetch or edit Discord message") |
| 141 | + else: |
| 142 | + content = f"Test message from {name} <t:{packet.rx_time}:R>\n> {data.text}" |
| 143 | + embeds = [self.create_embed(service_envelope)] |
| 144 | + try: |
| 145 | + message: Message = await self.discord_channel.send(content, embeds=embeds) |
| 146 | + await self.queue.set(packet_id, message.id, ttl=3600) |
| 147 | + except Exception: |
| 148 | + logger.exception("Failed to send Discord message") |
| 149 | + |
| 150 | + |
| 151 | +def restart_mqtt_on_exception(task, bot: commands.Bot): |
| 152 | + try: |
| 153 | + task.result() |
| 154 | + except Exception: |
| 155 | + logger.exception("MQTT task failed. Restarting...") |
| 156 | + new_task = bot.loop.create_task(bot.cogs["testmsg"].run_mqtt()) |
| 157 | + new_task.add_done_callback(partial(restart_mqtt_on_exception, bot=bot)) |
| 158 | + |
| 159 | + |
| 160 | +async def setup(bot: commands.Bot): |
| 161 | + influx_client = InfluxDBClient.from_env_properties() |
| 162 | + influx_reader = InfluxReader(influx_client=influx_client) |
| 163 | + await bot.add_cog(TestMsg(bot, MQTT_TEST_CHANNEL_DISCORD, influx_reader)) |
| 164 | + run_mqtt_task = bot.loop.create_task(bot.cogs["testmsg"].run_mqtt()) |
| 165 | + run_mqtt_task.add_done_callback(partial(restart_mqtt_on_exception, bot=bot)) |
0 commit comments