@@ -13,61 +13,67 @@ use crate::{
13
13
sequence_id:: SequenceId ,
14
14
} ;
15
15
16
+ const MQTT_PORT : u16 = 8883 ;
17
+ const MAX_PACKET_SIZE : usize = 1024 * 1024 ;
18
+
16
19
/// The Bambu MQTT client.
17
20
#[ derive( Clone ) ]
18
21
pub struct Client {
19
- /// The MQTT host.
20
- pub host : String ,
22
+ /// The IP address of the MQTT host.
23
+ pub ip : String ,
21
24
/// The access code.
22
25
pub access_code : String ,
23
26
/// The serial number.
24
27
pub serial : String ,
25
28
26
- client : rumqttc:: AsyncClient ,
29
+ topic_device_request : String ,
30
+ topic_device_report : String ,
31
+
32
+ client : Arc < rumqttc:: AsyncClient > ,
27
33
event_loop : Arc < Mutex < rumqttc:: EventLoop > > ,
28
34
29
35
responses : Arc < DashMap < SequenceId , Message > > ,
30
-
31
- topic_device_request : String ,
32
- topic_device_report : String ,
33
36
}
34
37
35
- const MAX_PACKET_SIZE : usize = 1024 * 1024 ;
36
-
37
38
impl Client {
38
39
/// Creates a new Bambu printer MQTT client.
39
40
pub fn new < S : Into < String > + Clone > ( ip : S , access_code : S , serial : S ) -> Result < Self > {
40
41
let access_code = access_code. into ( ) ;
42
+ let ip = ip. into ( ) ;
41
43
let serial = serial. into ( ) ;
42
- let host = format ! ( "mqtts://{}:8883" , ip. clone( ) . into( ) ) ;
43
44
45
+ let opts = Self :: get_config ( & ip, & access_code) ?;
46
+ let ( client, event_loop) = rumqttc:: AsyncClient :: new ( opts, 25 ) ;
47
+
48
+ Ok ( Self {
49
+ ip,
50
+ access_code,
51
+ topic_device_request : format ! ( "device/{}/request" , & serial) ,
52
+ topic_device_report : format ! ( "device/{}/report" , & serial) ,
53
+ serial,
54
+ client : Arc :: new ( client) ,
55
+ event_loop : Arc :: new ( Mutex :: new ( event_loop) ) ,
56
+ responses : Arc :: new ( DashMap :: new ( ) ) ,
57
+ } )
58
+ }
59
+
60
+ fn get_config ( ip : & str , access_code : & str ) -> Result < rumqttc:: MqttOptions > {
44
61
let client_id = format ! ( "bambu-api-{}" , nanoid:: nanoid!( 8 ) ) ;
45
62
46
63
let ssl_config = rustls:: ClientConfig :: builder ( )
47
64
. dangerous ( )
48
65
. with_custom_certificate_verifier ( Arc :: new ( crate :: no_auth:: NoAuth :: new ( ) ) )
49
66
. with_no_client_auth ( ) ;
50
67
51
- let mut opts = rumqttc:: MqttOptions :: new ( client_id, ip, 8883 ) ;
68
+ let mut opts = rumqttc:: MqttOptions :: new ( client_id, ip, MQTT_PORT ) ;
52
69
opts. set_max_packet_size ( MAX_PACKET_SIZE , MAX_PACKET_SIZE ) ;
53
70
opts. set_keep_alive ( Duration :: from_secs ( 5 ) ) ;
54
- opts. set_credentials ( "bblp" , & access_code) ;
71
+ opts. set_credentials ( "bblp" , access_code) ;
55
72
opts. set_transport ( rumqttc:: Transport :: Tls ( rumqttc:: TlsConfiguration :: Rustls ( Arc :: new (
56
73
ssl_config,
57
74
) ) ) ) ;
58
75
59
- let ( client, event_loop) = rumqttc:: AsyncClient :: new ( opts, 25 ) ;
60
-
61
- Ok ( Self {
62
- host,
63
- access_code,
64
- topic_device_request : format ! ( "device/{}/request" , & serial) ,
65
- topic_device_report : format ! ( "device/{}/report" , & serial) ,
66
- serial,
67
- client,
68
- event_loop : Arc :: new ( Mutex :: new ( event_loop) ) ,
69
- responses : Arc :: new ( DashMap :: new ( ) ) ,
70
- } )
76
+ Ok ( opts)
71
77
}
72
78
73
79
/// Polls for a message from the MQTT event loop.
@@ -81,7 +87,27 @@ impl Client {
81
87
///
82
88
/// Returns an error if there was a problem polling for a message or parsing the event.
83
89
async fn poll ( & mut self ) -> Result < ( ) > {
84
- let msg_opt = self . event_loop . lock ( ) . await . poll ( ) . await ?;
90
+ let mut ep = self . event_loop . lock ( ) . await ;
91
+ let msg_opt = match ep. poll ( ) . await {
92
+ Ok ( msg_opt) => msg_opt,
93
+ Err ( err) => {
94
+ if let rumqttc:: ConnectionError :: MqttState ( rumqttc:: StateError :: Io ( err) ) = err {
95
+ tracing:: error!( "Error polling for message: {:?}" , err) ;
96
+ tracing:: warn!( "Reconnecting..." ) ;
97
+ // We are in a bad state and should reconnect.
98
+ let opts = Self :: get_config ( & self . ip , & self . access_code ) ?;
99
+ let ( client, event_loop) = rumqttc:: AsyncClient :: new ( opts, 25 ) ;
100
+ drop ( ep) ;
101
+ self . client = Arc :: new ( client) ;
102
+ self . event_loop = Arc :: new ( Mutex :: new ( event_loop) ) ;
103
+ tracing:: warn!( "Reconnected." ) ;
104
+ return Ok ( ( ) ) ;
105
+ }
106
+
107
+ tracing:: error!( "Error polling for message: {:?}" , err) ;
108
+ return Ok ( ( ) ) ;
109
+ }
110
+ } ;
85
111
86
112
let message = parse_message ( & msg_opt) ;
87
113
@@ -163,14 +189,16 @@ impl Client {
163
189
if let Some ( response) = self . responses . get ( sequence_id) {
164
190
return Ok ( response. value ( ) . clone ( ) ) ;
165
191
}
192
+ // This sleep is important since it frees up the thread.
193
+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
166
194
}
167
195
168
196
anyhow:: bail!( "Timeout waiting for response to command: {:?}" , command)
169
197
}
170
198
171
199
/// Upload a file.
172
200
pub async fn upload_file ( & self , path : & std:: path:: Path ) -> Result < ( ) > {
173
- let host_url = url:: Url :: parse ( & self . host ) ?;
201
+ let host_url = url:: Url :: parse ( & format ! ( "mqtts://{}:{}" , self . ip , MQTT_PORT ) ) ?;
174
202
let host = host_url
175
203
. host_str ( )
176
204
. ok_or ( anyhow:: anyhow!( "not a valid hostname" ) ) ?
0 commit comments