Skip to content

Commit beb5a44

Browse files
committed
fix: ensure MQTT is reconnected upon connection failure, fixes #19
1 parent 14bbf29 commit beb5a44

File tree

5 files changed

+60
-20
lines changed

5 files changed

+60
-20
lines changed

src/home_assistant/api.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,50 +73,54 @@ impl Listener for HaApi {
7373
&teams_states.is_muted,
7474
&self.ha_configuration.entities.is_muted,
7575
)
76-
.await?;
76+
.await?;
7777

7878
self.update_ha(
7979
&teams_states.is_video_on,
8080
&self.ha_configuration.entities.is_video_on,
8181
)
82-
.await?;
82+
.await?;
8383

8484
self.update_ha(
8585
&teams_states.is_hand_raised,
8686
&self.ha_configuration.entities.is_hand_raised,
8787
)
88-
.await?;
88+
.await?;
8989

9090
self.update_ha(
9191
&teams_states.is_in_meeting,
9292
&self.ha_configuration.entities.is_in_meeting,
9393
)
94-
.await?;
94+
.await?;
9595

9696
self.update_ha(
9797
&teams_states.is_recording_on,
9898
&self.ha_configuration.entities.is_recording_on,
9999
)
100-
.await?;
100+
.await?;
101101

102102
self.update_ha(
103103
&teams_states.is_background_blurred,
104104
&self.ha_configuration.entities.is_background_blurred,
105105
)
106-
.await?;
106+
.await?;
107107

108108
self.update_ha(
109109
&teams_states.is_sharing,
110110
&self.ha_configuration.entities.is_sharing,
111111
)
112-
.await?;
112+
.await?;
113113

114114
self.update_ha(
115115
&teams_states.has_unread_messages,
116116
&self.ha_configuration.entities.has_unread_messages,
117117
)
118-
.await?;
118+
.await?;
119119

120120
Ok(())
121121
}
122+
123+
fn reconnect(&mut self) {
124+
// considered not needed for now, as I believe the API will reconnect upon failure (not tested)
125+
}
122126
}

src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![windows_subsystem = "windows"]
2+
23
mod configuration;
34
mod home_assistant;
45
mod logging;
@@ -10,7 +11,7 @@ mod utils;
1011

1112
use std::process::exit;
1213
use std::sync::atomic::{AtomicBool, Ordering};
13-
use std::sync::Arc;
14+
use std::sync::{Arc, Mutex};
1415
use std::time;
1516

1617
use crate::configuration::get_configuration;
@@ -72,7 +73,7 @@ async fn run_apis(
7273
};
7374

7475
teams_api
75-
.start_listening(Arc::new(listener), is_running.clone(), toggle_mute.clone())
76+
.start_listening(Arc::new(Mutex::new(listener)), is_running.clone(), toggle_mute.clone())
7677
.await?;
7778

7879
Ok(())

src/mqtt/api.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,20 @@ impl Listener for MqttApi {
7676

7777
Ok(())
7878
}
79+
80+
fn reconnect(&mut self) {
81+
let mut mqtt_options = MqttOptions::new(
82+
"teams-status",
83+
self.mqtt_configuration.url(),
84+
self.mqtt_configuration.port,
85+
);
86+
87+
mqtt_options.set_credentials(&self.mqtt_configuration.username, &self.mqtt_configuration.password);
88+
mqtt_options.set_keep_alive(Duration::from_secs(5));
89+
let (client, mut event_loop) = AsyncClient::new(mqtt_options, 10);
90+
91+
self.client = client;
92+
// mqttc requires this to work
93+
task::spawn(async move { while let Ok(_) = event_loop.poll().await {} });
94+
}
7995
}

src/teams_ws/api.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use futures_util::{future, pin_mut, SinkExt, StreamExt};
88
use json::JsonValue;
99
use log::{error, info};
1010
use std::sync::atomic::{AtomicBool, Ordering};
11-
use std::sync::Arc;
11+
use std::sync::{Arc, Mutex};
1212
use std::time;
13+
use std::time::Duration;
1314
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
1415

1516
const JSON_MEETING_UPDATE: &str = "meetingUpdate";
@@ -23,6 +24,7 @@ const JSON_IS_BACKGROUND_BLURRED: &str = "isBackgroundBlurred";
2324
const JSON_IS_SHARING: &str = "isSharing";
2425
const JSON_HAS_UNREAD_MESSAGES: &str = "hasUnreadMessages";
2526
const JSON_TOKEN_REFRESH: &str = "tokenRefresh";
27+
2628
pub struct TeamsAPI {
2729
pub teams_states: Arc<TeamsStates>,
2830
pub url: String,
@@ -56,7 +58,7 @@ impl TeamsAPI {
5658

5759
pub async fn start_listening(
5860
&self,
59-
listener: Arc<Box<dyn Listener>>,
61+
listener: Arc<Mutex<Box<dyn Listener>>>,
6062
is_running: Arc<AtomicBool>,
6163
toggle_mute: Arc<AtomicBool>,
6264
) -> anyhow::Result<()> {
@@ -72,16 +74,17 @@ impl TeamsAPI {
7274
let data = &message.unwrap().into_data();
7375
let json = String::from_utf8_lossy(data);
7476
info!("{}", json);
75-
let parse_result = parse_data(
77+
78+
let parse_result = parse_data_and_notify_listener(
7679
&json,
7780
listener.clone(),
7881
self.teams_states.clone(),
7982
force_update.clone(),
8083
)
81-
.await;
84+
.await;
8285

8386
if parse_result.is_err() {
84-
error!("{}", parse_result.unwrap_err())
87+
error!("Unable to parse or notify listener, abandoning: {}", parse_result.unwrap_err());
8588
}
8689
}
8790
})
@@ -127,9 +130,9 @@ async fn update_value(
127130
teams_state_value.swap(new_value, Ordering::Relaxed) != new_value
128131
}
129132

130-
async fn parse_data(
133+
async fn parse_data_and_notify_listener(
131134
json: &str,
132-
listener: Arc<Box<dyn Listener>>,
135+
listener: Arc<Mutex<Box<dyn Listener>>>,
133136
teams_states: Arc<TeamsStates>,
134137
force_update: Arc<AtomicBool>,
135138
) -> anyhow::Result<()> {
@@ -148,17 +151,32 @@ async fn parse_data(
148151
&answer,
149152
JSON_IS_BACKGROUND_BLURRED,
150153
)
151-
.await;
154+
.await;
152155
has_changed |= update_value(&teams_states.is_sharing, &answer, JSON_IS_SHARING).await;
153156
has_changed |= update_value(
154157
&teams_states.has_unread_messages,
155158
&answer,
156159
JSON_HAS_UNREAD_MESSAGES,
157160
)
158-
.await;
161+
.await;
159162

160163
if force_update.swap(false, Ordering::Relaxed) || has_changed {
161-
listener.notify_changed(&teams_states).await?;
164+
// Issue!: This will only run once regardless of MAX_RETRIES
165+
// for some reason after a reconnect the notify_changed will get a pass no matter what
166+
const MAX_RETRIES: i32 = 3;
167+
for i in 1..MAX_RETRIES {
168+
let result = listener.lock().unwrap().notify_changed(&teams_states).await;
169+
170+
if result.is_ok() || (i == MAX_RETRIES) {
171+
result?
172+
}
173+
// we will try to reconnect if the connection failed
174+
else if i < MAX_RETRIES {
175+
error!("{}: Reconnecting and retrying...", result.unwrap_err());
176+
tokio::time::sleep(Duration::from_secs(1)).await;
177+
listener.lock().unwrap().reconnect();
178+
}
179+
}
162180
}
163181
} else if answer.has_key(JSON_TOKEN_REFRESH) && !answer[JSON_TOKEN_REFRESH].is_empty() {
164182
change_teams_configuration(

src/traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ pub trait StopController {}
77
#[async_trait]
88
pub trait Listener {
99
async fn notify_changed(&self, teams_states: &TeamsStates) -> anyhow::Result<()>;
10+
fn reconnect(&mut self);
1011
}

0 commit comments

Comments
 (0)