22import re
33from datetime import datetime
44from functools import partial
5+ from typing import Optional
56
67import aiomqtt
78from aiocache import SimpleMemoryCache
1718from bridger .influx .interfaces import InfluxReader
1819from bridger .log import logger
1920from bridger .mqtt import PBPacketProcessor
21+ from bridger .utils import should_ignore_pki_message
2022
2123MQTT_TEST_CHANNEL_MESHTASTIC = os .getenv ("MQTT_TEST_CHANNEL" , "+" )
2224MQTT_TEST_CHANNEL_DISCORD = int (os .getenv ("MQTT_TEST_CHANNEL_ID" , 1253788609316913265 ))
@@ -43,7 +45,7 @@ async def on_ready(self):
4345 logger .info (f"TestMsg cog is ready and channel is: { self .discord_channel } " )
4446
4547 @staticmethod
46- def format_node_name (node_id : int , node_info : dict = None ) -> str :
48+ def format_node_name (node_id : int , node_info : Optional [ dict ] = None ) -> str :
4749 """Format a consistent node name based on available info"""
4850 if not node_info :
4951 return f"**{ node_id } **"
@@ -71,14 +73,17 @@ def create_embed(self, service_envelope: ServiceEnvelope):
7173
7274 embed = Embed (color = color )
7375 # Try to get node info for the gateway hex ID
76+ gateway_id = None
77+ node_info = None
78+
7479 try :
7580 gateway_id = int (gateway .strip ("!" ), 16 )
7681 node_info = self .influx_reader .get_node_info (gateway_id )
7782 except (ValueError , TypeError ) as e :
7883 logger .error (f"Failed to parse gateway ID '{ gateway } ': { e } " )
79- node_info = None
8084
81- gateway_name = self .format_node_name (gateway_id if "gateway_id" in locals () else gateway , node_info )
85+ node_id = gateway_id if gateway_id is not None else int (gateway , 16 )
86+ gateway_name = self .format_node_name (node_id , node_info )
8287 embed .description = f"Heard by { gateway_name } - `{ gateway } ` at { formatted_time } "
8388 embed .add_field (name = "SNR" , value = snr , inline = True )
8489 embed .add_field (name = "RSSI" , value = rssi , inline = True )
@@ -112,7 +117,7 @@ async def run_mqtt(self):
112117 logger .info (f"Attempting to connect to MQTT broker at { MQTT_BROKER } :{ MQTT_PORT } " )
113118 async with aiomqtt .Client (
114119 MQTT_BROKER ,
115- MQTT_PORT ,
120+ int ( MQTT_PORT ) ,
116121 username = MQTT_USER ,
117122 password = MQTT_PASS ,
118123 clean_session = True ,
@@ -121,9 +126,16 @@ async def run_mqtt(self):
121126 logger .info (f"Subscribed to { full_topic } " )
122127 await logger .complete ()
123128
124- async for message in client .messages :
129+ async for mqtt_message in client .messages :
130+ # Ignoring PKI messages for now as we cannot decrypt them without storing keys somewhere
131+ if should_ignore_pki_message (str (mqtt_message .topic )):
132+ logger .bind (topic = topic , channel = channel , * mqtt_message .properties ).debug (
133+ f"Ignoring PKI message on topic { mqtt_message .topic } "
134+ ) # noqa: E501
135+ continue
136+
125137 try :
126- service_envelope = ServiceEnvelope .FromString (message .payload )
138+ service_envelope = ServiceEnvelope .FromString (mqtt_message .payload )
127139 except Exception :
128140 logger .exception ("Failed to decode MQTT message" )
129141 continue
0 commit comments