11import asyncio
22import contextlib
3+ import logging
34import time
45import traceback
56
67from enapter import async_ , mqtt
78
89from .device_protocol import DeviceProtocol
910
11+ LOGGER = logging .getLogger (__name__ )
12+
1013
1114class MQTTAdapter (async_ .Routine ):
1215
1316 def __init__ (
1417 self ,
15- device_channel : mqtt .api .device .Channel ,
18+ hardware_id : str ,
19+ channel_id : str ,
20+ mqtt_api_client : mqtt .api .Client ,
1621 device : DeviceProtocol ,
1722 task_group : asyncio .TaskGroup | None ,
1823 ) -> None :
1924 super ().__init__ (task_group = task_group )
20- self ._device_channel = device_channel
25+ self ._logger = logging .LoggerAdapter (
26+ LOGGER , extra = {"hardware_id" : hardware_id , "channel_id" : channel_id }
27+ )
28+ self ._device_channel = mqtt_api_client .device_channel (hardware_id , channel_id )
2129 self ._device = device
2230
2331 async def _run (self ) -> None :
@@ -33,36 +41,63 @@ async def _stream_properties(self) -> None:
3341 async for properties in stream :
3442 properties = properties .copy ()
3543 timestamp = properties .pop ("timestamp" , int (time .time ()))
36- await self ._device_channel .publish_properties (
37- properties = mqtt .api .device .Properties (
38- timestamp = timestamp , values = properties
39- )
44+ await self ._publish_properties (
45+ mqtt .api .device .Properties (timestamp = timestamp , values = properties )
4046 )
4147
48+ async def _publish_properties (self , properties : mqtt .api .device .Properties ) -> None :
49+ try :
50+ await self ._device_channel .publish_properties (properties = properties )
51+ except Exception as e :
52+ self ._logger .error ("failed to publish properties: %s" , e )
53+
4254 async def _stream_telemetry (self ) -> None :
4355 async with contextlib .aclosing (self ._device .stream_telemetry ()) as stream :
4456 async for telemetry in stream :
4557 telemetry = telemetry .copy ()
4658 timestamp = telemetry .pop ("timestamp" , int (time .time ()))
4759 alerts = telemetry .pop ("alerts" , None )
48- await self ._device_channel . publish_telemetry (
49- telemetry = mqtt .api .device .Telemetry (
60+ await self ._publish_telemetry (
61+ mqtt .api .device .Telemetry (
5062 timestamp = timestamp , alerts = alerts , values = telemetry
5163 )
5264 )
5365
66+ async def _publish_telemetry (self , telemetry : mqtt .api .device .Telemetry ) -> None :
67+ try :
68+ await self ._device_channel .publish_telemetry (telemetry = telemetry )
69+ except Exception as e :
70+ self ._logger .error ("failed to publish telemetry: %s" , e )
71+
5472 async def _stream_logs (self ) -> None :
5573 async with contextlib .aclosing (self ._device .stream_logs ()) as stream :
5674 async for log in stream :
57- await self ._device_channel .publish_log (
58- log = mqtt .api .device .Log (
75+ match log .severity :
76+ case "debug" :
77+ self ._logger .debug (log .message )
78+ case "info" :
79+ self ._logger .info (log .message )
80+ case "warning" :
81+ self ._logger .warning (log .message )
82+ case "error" :
83+ self ._logger .error (log .message )
84+ case _:
85+ raise NotImplementedError (log .severity )
86+ await self ._publish_log (
87+ mqtt .api .device .Log (
5988 timestamp = int (time .time ()),
6089 severity = mqtt .api .device .LogSeverity (log .severity ),
6190 message = log .message ,
6291 persist = log .persist ,
6392 )
6493 )
6594
95+ async def _publish_log (self , log : mqtt .api .device .Log ) -> None :
96+ try :
97+ await self ._device_channel .publish_log (log = log )
98+ except Exception as e :
99+ self ._logger .error ("failed to publish log: %s" , e )
100+
66101 async def _execute_commands (self ) -> None :
67102 async with asyncio .TaskGroup () as tg :
68103 async with self ._device_channel .subscribe_to_command_requests () as requests :
0 commit comments