@@ -10,7 +10,7 @@ use rumqttc::{AsyncClient, ConnectionError, Event, EventLoop, MqttOptions, Packe
1010use std:: { fmt:: Display , time:: Duration } ;
1111use tokio:: {
1212 sync:: { mpsc, watch} ,
13- time:: timeout,
13+ time:: { sleep , timeout} ,
1414} ;
1515
1616const TOPIC_AVAILABILITY : & str = "opensleep/availability" ;
@@ -33,6 +33,7 @@ pub struct MqttManager {
3333 pub client : AsyncClient ,
3434 eventloop : EventLoop ,
3535 device_label : String ,
36+ reconnect_attempts : u32 ,
3637}
3738
3839impl MqttManager {
@@ -66,18 +67,105 @@ impl MqttManager {
6667 client,
6768 eventloop,
6869 device_label,
70+ reconnect_attempts : 0 ,
6971 }
7072 }
7173
72- pub async fn wait_for_conn ( & mut self ) {
74+ pub async fn wait_for_conn ( & mut self ) -> Result < ( ) , ( ) > {
7375 loop {
7476 let evt = self . eventloop . poll ( ) . await ;
75- if self . handle_event ( evt) . await {
77+ match self . handle_event ( evt) . await {
78+ Ok ( true ) => return Ok ( ( ) ) ,
79+ // keep waiting for connection
80+ Ok ( false ) => { }
81+ // fatal error
82+ Err ( _) => return Err ( ( ) ) ,
83+ }
84+ }
85+ }
86+
87+ pub async fn run ( mut self ) {
88+ loop {
89+ let evt = self . eventloop . poll ( ) . await ;
90+ if self . handle_event ( evt) . await . is_err ( ) {
91+ // only errors on fatal errors, so `run` should
92+ // quit, shutting down all of opensleep
7693 return ;
7794 }
7895 }
7996 }
8097
98+ /// returns Ok(true) on ConnAck, Err(()) for fatal errors
99+ async fn handle_event ( & mut self , msg : Result < Event , ConnectionError > ) -> Result < bool , ( ) > {
100+ match msg {
101+ Ok ( Event :: Incoming ( Packet :: ConnAck ( _) ) ) => {
102+ log:: info!( "MQTT broker connected" ) ;
103+ self . spawn_new_conn_task ( ) . await ;
104+ return Ok ( true ) ;
105+ }
106+ Ok ( Event :: Incoming ( Packet :: Disconnect ) ) => {
107+ log:: warn!( "MQTT broker disconnected" ) ;
108+ }
109+ Ok ( Event :: Incoming ( Packet :: Publish ( publ) ) ) => {
110+ self . handle_action ( publ) . await ;
111+ }
112+ Ok ( _) => { }
113+
114+ // connection errors
115+ Err ( ConnectionError :: Io ( e) ) => {
116+ self . reconnect_attempts += 1 ;
117+ let backoff = self . calc_backoff ( ) ;
118+ log:: error!( "I/O error: {e}. Reconnecting in {backoff:?}..." ) ;
119+ sleep ( backoff) . await ;
120+ }
121+ Err ( ConnectionError :: ConnectionRefused ( code) ) => {
122+ self . reconnect_attempts += 1 ;
123+ let backoff = self . calc_backoff ( ) ;
124+ log:: error!( "Connection refused ({code:?}). Reconnecting in {backoff:?}..." ) ;
125+ sleep ( backoff) . await ;
126+ }
127+ Err ( ConnectionError :: NetworkTimeout ) => {
128+ self . reconnect_attempts += 1 ;
129+ let backoff = self . calc_backoff ( ) ;
130+ log:: error!( "Network timeout. Reconnecting in {backoff:?}..." ) ;
131+ sleep ( backoff) . await ;
132+ }
133+ Err ( ConnectionError :: Tls ( e) ) => {
134+ self . reconnect_attempts += 1 ;
135+ let backoff = self . calc_backoff ( ) ;
136+ log:: error!( "TLS error: {e}. Reconnecting in {backoff:?}..." ) ;
137+ sleep ( backoff) . await ;
138+ }
139+
140+ // state errors
141+ Err ( ConnectionError :: MqttState ( e) ) => {
142+ log:: error!( "State error: {e}" ) ;
143+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
144+ }
145+ Err ( ConnectionError :: FlushTimeout ) => {
146+ log:: error!( "Flush timeout" ) ;
147+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
148+ }
149+
150+ // fatal errors
151+ Err ( ConnectionError :: RequestsDone ) => {
152+ log:: info!( "Requests channel closed" ) ;
153+ return Err ( ( ) ) ;
154+ }
155+
156+ // other
157+ Err ( ConnectionError :: NotConnAck ( packet) ) => {
158+ log:: error!( "Expected ConnAck, got: {packet:?}" ) ;
159+ }
160+ }
161+ Ok ( false )
162+ }
163+
164+ fn calc_backoff ( & self ) -> Duration {
165+ let secs = ( 2u64 . pow ( self . reconnect_attempts . saturating_sub ( 1 ) ) ) . min ( 60 ) ;
166+ Duration :: from_secs ( secs)
167+ }
168+
81169 /// this must be in its own task because publishing
82170 /// topics requires someone polling the event loop
83171 async fn spawn_new_conn_task ( & mut self ) {
@@ -103,35 +191,6 @@ impl MqttManager {
103191 } ) ;
104192 }
105193
106- /// returns true if connected
107- async fn handle_event ( & mut self , msg : Result < Event , ConnectionError > ) -> bool {
108- match msg {
109- Ok ( Event :: Incoming ( Packet :: ConnAck ( _) ) ) => {
110- log:: info!( "MQTT broker connected" ) ;
111- self . spawn_new_conn_task ( ) . await ;
112- return true ;
113- }
114- Ok ( Event :: Incoming ( Packet :: Disconnect ) ) => {
115- log:: warn!( "MQTT broker disconnected" ) ;
116- }
117- Ok ( Event :: Incoming ( Packet :: Publish ( publ) ) ) => {
118- self . handle_action ( publ) . await ;
119- }
120- Ok ( _) => { }
121- Err ( e) => {
122- log:: error!( "MQTT event loop error: {e}" ) ;
123- }
124- }
125- false
126- }
127-
128- pub async fn run ( mut self ) {
129- loop {
130- let evt = self . eventloop . poll ( ) . await ;
131- self . handle_event ( evt) . await ;
132- }
133- }
134-
135194 /// handles a published action
136195 /// MUST exit quickly without calling any MQTT commands (unless in another task)
137196 async fn handle_action ( & mut self , publ : Publish ) {
0 commit comments