|
34 | 34 | topic_prefix: &str, |
35 | 35 | discovery_prefix: Option<&str>, |
36 | 36 | discovery_listen_topic: Option<&str>, |
| 37 | + resync_interval: u64, |
37 | 38 | controller: T, |
38 | 39 | ) -> Arc<DeviceSyncer<T>> { |
39 | | - info!(slog_scope::logger(), "opening_client"; "host" => options.broker_address().0, "port" => options.broker_address().1); |
| 40 | + info!(slog_scope::logger(), "opening_client"; "host" => options.broker_address().0, "port" => options.broker_address().1, "client_id" => &options.client_id()); |
40 | 41 | options.set_clean_session(true); |
41 | 42 | let ev = EventLoop::new(options, 100).await; |
42 | 43 | let (repoll_sender, repoll_rx) = bounded(10); |
|
52 | 53 | let ptr_clone = ptr.clone(); |
53 | 54 | trace!(slog_scope::logger(), "start_thread"); |
54 | 55 | tokio::task::spawn(async move { Self::run_mqtt(ptr, ev).await }); |
| 56 | + |
55 | 57 | let ptr_2 = ptr_clone.clone(); |
56 | | - tokio::task::spawn(async move { Self::run_poller(ptr_2, repoll_rx).await }); |
| 58 | + tokio::task::spawn( |
| 59 | + async move { Self::run_poller(ptr_2, resync_interval, repoll_rx).await }, |
| 60 | + ); |
| 61 | + |
57 | 62 | if ptr_clone.discovery_prefix.is_some() { |
58 | 63 | let ptr_3 = ptr_clone.clone(); |
59 | 64 | tokio::task::spawn(async move { Self::broadcast_discovery(ptr_3).await }); |
@@ -343,10 +348,11 @@ where |
343 | 348 | Self::report_async_result("poll_all", Self::poll_all_(this).await) |
344 | 349 | } |
345 | 350 |
|
346 | | - async fn run_poller(this: Arc<Self>, rx: Receiver<DeviceId>) -> () { |
| 351 | + async fn run_poller(this: Arc<Self>, resync_interval: u64, rx: Receiver<DeviceId>) -> () { |
347 | 352 | let that = this.clone(); |
| 353 | + info!(slog_scope::logger(), "poller_starting"; "resync_interval" => resync_interval); |
348 | 354 | tokio::task::spawn(async move { |
349 | | - let mut timer = tokio::time::interval(Duration::from_secs(10)); |
| 355 | + let mut timer = tokio::time::interval(Duration::from_millis(resync_interval)); |
350 | 356 | loop { |
351 | 357 | timer.tick().await; |
352 | 358 | let _ = that.repoll.send(0).await; |
@@ -387,6 +393,7 @@ where |
387 | 393 | device.id |
388 | 394 | ); |
389 | 395 | let config = v.discovery_info.to_string(); |
| 396 | + info!(slog_scope::logger(), "discovered_device"; "id" => id, "name" => &device.name); |
390 | 397 | debug!(slog_scope::logger(), "broadcast_discovery_result"; "id" => id, "topic" => &topic, "config" => &config); |
391 | 398 | this.sender |
392 | 399 | .send(Request::Publish(Publish::new( |
|
0 commit comments