@@ -4,20 +4,61 @@ use crate::converter::device_to_discovery_payload;
44use crate :: utils:: ResultExtensions ;
55use async_channel:: { bounded, Receiver , Sender } ;
66use rumqttc:: { Event , EventLoop , Incoming , Publish , Request , Subscribe } ;
7+ use serde:: { Serialize , Serializer } ;
78use serde_json:: value:: Value :: Object ;
89use simple_error:: { bail, simple_error} ;
9- use slog:: { debug, error, info, trace, warn} ;
10+ use slog:: { crit , debug, error, info, trace, warn} ;
1011use slog_scope;
11- use std:: collections:: HashMap ;
12+ use std:: collections:: { HashMap , VecDeque } ;
1213use std:: error:: Error ;
14+ use std:: ops:: Deref ;
1315use std:: sync:: Arc ;
16+ use tokio:: sync:: Mutex ;
1417use tokio:: time:: Duration ;
1518
19+ #[ derive( Clone , Debug , Eq , PartialEq ) ]
20+ pub struct MaybeJsonString {
21+ pub byte_contents : Vec < u8 > ,
22+ }
23+
24+ #[ derive( Clone , Debug , Eq , PartialEq , Serialize ) ]
25+ pub enum LoggedMessage {
26+ OutgoingMessage ( String , MaybeJsonString ) ,
27+ IncomingMessage ( String , MaybeJsonString ) ,
28+ Connected ,
29+ Disconnected ,
30+ }
31+
32+ impl MaybeJsonString {
33+ pub fn new < P : Clone + Into < Vec < u8 > > > ( bytes : & P ) -> MaybeJsonString {
34+ MaybeJsonString {
35+ byte_contents : bytes. clone ( ) . into ( ) ,
36+ }
37+ }
38+ }
39+
40+ impl Serialize for MaybeJsonString {
41+ fn serialize < S > ( & self , serializer : S ) -> Result < <S as Serializer >:: Ok , <S as Serializer >:: Error >
42+ where
43+ S : Serializer ,
44+ {
45+ let str = match std:: str:: from_utf8 ( & self . byte_contents ) {
46+ Ok ( v) => v,
47+ Err ( _) => return serializer. serialize_bytes ( & self . byte_contents ) ,
48+ } ;
49+ match serde_json:: from_str ( str) {
50+ Ok ( Object ( m) ) => m. serialize ( serializer) ,
51+ _ => serializer. serialize_str ( str) ,
52+ }
53+ }
54+ }
55+
1656pub struct DeviceSyncer {
1757 config : Config ,
1858 controller : Arc < dyn DeviceController > ,
1959 sender : Sender < Request > ,
2060 repoll : Sender < DeviceId > ,
61+ pub last_n_messages : Mutex < VecDeque < LoggedMessage > > ,
2162}
2263
2364impl < ' a > DeviceSyncer {
@@ -32,6 +73,7 @@ impl<'a> DeviceSyncer {
3273 controller,
3374 sender : ev. handle ( ) ,
3475 repoll : repoll_sender,
76+ last_n_messages : Mutex :: new ( VecDeque :: with_capacity ( 10 ) ) ,
3577 } ;
3678 let this = Arc :: new ( syncer) ;
3779 trace ! ( slog_scope:: logger( ) , "start_thread" ) ;
@@ -48,14 +90,16 @@ impl<'a> DeviceSyncer {
4890 . await
4991 }
5092 } ) ;
93+ this
94+ }
5195
52- if this. config . discovery_topic_prefix . is_some ( ) {
96+ async fn start_broadcast_discovery_broadcast ( self : Arc < Self > ) {
97+ if self . config . discovery_topic_prefix . is_some ( ) {
5398 tokio:: task:: spawn ( {
54- let this = this . clone ( ) ;
99+ let this = self . clone ( ) ;
55100 async move { this. broadcast_discovery ( ) . await }
56101 } ) ;
57102 }
58- this
59103 }
60104
61105 async fn do_subscribe ( & self ) -> Result < ( ) , Box < dyn Error > > {
@@ -212,6 +256,14 @@ impl<'a> DeviceSyncer {
212256 Ok ( ( ) )
213257 }
214258
259+ async fn log_message ( self : Arc < Self > , message : LoggedMessage ) {
260+ let mut msgs = self . last_n_messages . lock ( ) . await ;
261+ if msgs. len ( ) == 10 {
262+ msgs. pop_front ( ) ;
263+ } ;
264+ msgs. push_back ( message)
265+ }
266+
215267 async fn loop_once ( self : Arc < Self > , ev : & mut EventLoop ) -> Result < ( ) , Box < dyn Error > > {
216268 let message = match ev. poll ( ) . await ? {
217269 Event :: Incoming ( i) => i,
@@ -223,10 +275,18 @@ impl<'a> DeviceSyncer {
223275 return match message {
224276 Incoming :: Connect ( _) => Ok ( ( ) ) ,
225277 Incoming :: ConnAck ( _) => {
226- self . do_subscribe ( ) . await ?;
278+ self . clone ( ) . log_message ( LoggedMessage :: Connected ) . await ;
279+ self . clone ( ) . do_subscribe ( ) . await ?;
280+ self . start_broadcast_discovery_broadcast ( ) . await ;
227281 Ok ( ( ) )
228282 }
229283 Incoming :: Publish ( message) => {
284+ self . clone ( )
285+ . log_message ( LoggedMessage :: IncomingMessage (
286+ message. topic . clone ( ) ,
287+ MaybeJsonString :: new ( & message. payload . deref ( ) ) ,
288+ ) )
289+ . await ;
230290 let this = self . clone ( ) ;
231291 tokio:: task:: spawn ( async move {
232292 this. process_one ( message)
@@ -249,7 +309,10 @@ impl<'a> DeviceSyncer {
249309 Incoming :: UnsubAck ( _) => bail ! ( "Unexpected unsuback!" ) ,
250310 Incoming :: PingReq => Ok ( ( ) ) ,
251311 Incoming :: PingResp => Ok ( ( ) ) ,
252- Incoming :: Disconnect => Ok ( ( ) ) ,
312+ Incoming :: Disconnect => {
313+ self . clone ( ) . log_message ( LoggedMessage :: Disconnected ) . await ;
314+ Ok ( ( ) )
315+ }
253316 } ;
254317 }
255318
@@ -271,7 +334,7 @@ impl<'a> DeviceSyncer {
271334 }
272335 }
273336
274- async fn poll_device_ ( & self , device_id : DeviceId ) -> Result < ( ) , Box < dyn Error > > {
337+ async fn poll_device_ ( self : Arc < Self > , device_id : DeviceId ) -> Result < ( ) , Box < dyn Error > > {
275338 let device_info = { self . controller . describe ( device_id) . await ? } ;
276339 let attributes = device_info
277340 . attributes
@@ -287,17 +350,24 @@ impl<'a> DeviceSyncer {
287350 let payload = serde_json:: Value :: Object ( attributes) . to_string ( ) ;
288351 trace ! ( slog_scope:: logger( ) , "poll_device_status" ; "device_id" => device_id, "payload" => & payload) ;
289352
290- let mut publish = Publish :: new (
291- self . config
292- . to_topic_string ( & TopicType :: StatusTopic ( device_id) )
293- . unwrap ( ) ,
294- rumqttc :: QoS :: AtLeastOnce ,
295- payload ,
296- ) ;
353+ let topic = self
354+ . config
355+ . to_topic_string ( & TopicType :: StatusTopic ( device_id) )
356+ . unwrap ( ) ;
357+ let logged_message =
358+ LoggedMessage :: OutgoingMessage ( topic . clone ( ) , MaybeJsonString :: new ( & payload ) ) ;
359+ let mut publish = Publish :: new ( topic , rumqttc :: QoS :: AtLeastOnce , payload ) ;
297360 publish. retain = true ;
298- self . sender . try_send ( Request :: Publish ( publish) ) ?;
299-
300- Ok ( ( ) )
361+ match self . sender . try_send ( Request :: Publish ( publish) ) {
362+ Ok ( _) => {
363+ self . log_message ( logged_message) . await ;
364+ Ok ( ( ) )
365+ }
366+ Err ( e) => {
367+ crit ! ( slog_scope:: logger( ) , "sending_failed_crashing_to_maybe_reconnect" ; "error" => ?e) ;
368+ panic ! ( e)
369+ }
370+ }
301371 }
302372
303373 async fn poll_device ( self : Arc < Self > , device_id : DeviceId ) -> ( ) {
@@ -362,13 +432,16 @@ impl<'a> DeviceSyncer {
362432 let config = v. discovery_info . to_string ( ) ;
363433 info ! ( slog_scope:: logger( ) , "discovered_device" ; "id" => id, "name" => & device. name) ;
364434 debug ! ( slog_scope:: logger( ) , "broadcast_discovery_result" ; "id" => id, "topic" => & topic, "config" => & config) ;
435+ let log_message =
436+ LoggedMessage :: OutgoingMessage ( topic. clone ( ) , MaybeJsonString :: new ( & config) ) ;
365437 self . sender
366438 . send ( Request :: Publish ( Publish :: new (
367439 topic,
368440 rumqttc:: QoS :: AtLeastOnce ,
369441 config,
370442 ) ) )
371443 . await ?;
444+ self . log_message ( log_message) . await ;
372445 Ok ( ( ) )
373446 }
374447 None => {
0 commit comments