Skip to content

Commit 939fd4b

Browse files
committed
perf: concurrent HA API calls
perf: only update HA API for states that have changed
1 parent 72f15d1 commit 939fd4b

File tree

5 files changed

+76
-39
lines changed

5 files changed

+76
-39
lines changed

src/home_assistant/api.rs

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,31 @@ use home_assistant_rest::Client;
99
use log::{error, info};
1010
use std::collections::HashMap;
1111
use std::sync::atomic::{AtomicBool, Ordering};
12+
use futures_util::future::try_join_all;
1213

1314
pub struct HaApi {
14-
client: Client,
1515
ha_configuration: HaConfiguration,
1616
}
1717

1818
impl HaApi {
1919
pub fn new(ha_configuration: HaConfiguration) -> anyhow::Result<Self> {
20-
let client = Client::new(&*ha_configuration.url, &*ha_configuration.long_live_token)?;
2120
Ok(Self {
22-
client,
2321
ha_configuration,
2422
})
2523
}
2624

27-
/* friendly_name is needed as API calls wipe the configured name */
28-
async fn update_ha(&self, state: &AtomicBool, ha_entity: &HaEntity) -> anyhow::Result<()> {
29-
let api_status = self.client.get_api_status().await.unwrap();
25+
// friendly_name is needed as API calls wipe the configured name
26+
async fn update_ha(&self, state: &AtomicBool, prev_state: &AtomicBool, ha_entity: &HaEntity, force_update: bool) -> anyhow::Result<()> {
27+
let state_bool = state.load(Ordering::Relaxed);
28+
let prev_state_bool = prev_state.load(Ordering::Relaxed);
29+
30+
// we exit early if nothing has changed, and we are not forcing an update
31+
if state_bool == prev_state_bool && !force_update {
32+
return Ok(());
33+
}
34+
35+
let client = Client::new(&*self.ha_configuration.url, &*self.ha_configuration.long_live_token)?;
36+
let api_status = client.get_api_status().await.unwrap();
3037

3138
if api_status.message != "API running." {
3239
error!("Home Assistant API cannot be reached");
@@ -39,8 +46,6 @@ impl HaApi {
3946
ha_entity.friendly_name.to_string(),
4047
);
4148

42-
let state_bool = state.load(Ordering::Relaxed);
43-
4449
let icon = if state_bool {
4550
&ha_entity.icons.on
4651
} else {
@@ -58,67 +63,81 @@ impl HaApi {
5863

5964
info!("Updating HA entity ({}) to '{}'", &ha_entity.id, &state_str);
6065

61-
let post_states_res = self.client.post_states(params).await;
66+
let post_states_res = client.post_states(params).await;
6267

6368
if post_states_res.is_err() {
6469
error!("{}", post_states_res.unwrap_err());
6570
};
6671

72+
prev_state.store(state_bool, Ordering::Relaxed);
73+
6774
Ok(())
6875
}
6976
}
7077

7178
#[async_trait]
7279
impl Listener for HaApi {
73-
async fn notify_changed(&self, teams_states: &TeamsStates) -> anyhow::Result<()> {
80+
async fn notify_changed(&self, teams_states: &TeamsStates, force_update: bool) -> anyhow::Result<()> {
7481
// Reflection would be nice here... Tried with bevy_reflect but ran into an issue with AtomicBool
75-
self.update_ha(
82+
let mut futures = Vec::new();
83+
84+
futures.push(self.update_ha(
7685
&teams_states.is_in_meeting,
86+
&teams_states.prev_is_in_meeting,
7787
&self.ha_configuration.entities.is_in_meeting,
78-
)
79-
.await?;
88+
force_update,
89+
));
8090

81-
self.update_ha(
91+
futures.push(self.update_ha(
8292
&teams_states.is_video_on,
93+
&teams_states.prev_is_video_on,
8394
&self.ha_configuration.entities.is_video_on,
84-
)
85-
.await?;
86-
87-
self.update_ha(
95+
force_update,
96+
));
97+
98+
futures.push(self.update_ha(
8899
&teams_states.is_muted,
100+
&teams_states.prev_is_muted,
89101
&self.ha_configuration.entities.is_muted,
90-
)
91-
.await?;
102+
force_update,
103+
));
92104

93-
self.update_ha(
105+
futures.push(self.update_ha(
94106
&teams_states.is_hand_raised,
107+
&teams_states.prev_is_hand_raised,
95108
&self.ha_configuration.entities.is_hand_raised,
96-
)
97-
.await?;
109+
force_update,
110+
));
98111

99-
self.update_ha(
112+
futures.push(self.update_ha(
100113
&teams_states.is_recording_on,
114+
&teams_states.prev_is_recording_on,
101115
&self.ha_configuration.entities.is_recording_on,
102-
)
103-
.await?;
116+
force_update,
117+
));
104118

105-
self.update_ha(
119+
futures.push(self.update_ha(
106120
&teams_states.is_background_blurred,
121+
&teams_states.prev_is_background_blurred,
107122
&self.ha_configuration.entities.is_background_blurred,
108-
)
109-
.await?;
123+
force_update,
124+
));
110125

111-
self.update_ha(
126+
futures.push(self.update_ha(
112127
&teams_states.is_sharing,
128+
&teams_states.prev_is_sharing,
113129
&self.ha_configuration.entities.is_sharing,
114-
)
115-
.await?;
130+
force_update,
131+
));
116132

117-
self.update_ha(
133+
futures.push(self.update_ha(
118134
&teams_states.has_unread_messages,
135+
&teams_states.prev_has_unread_messages,
119136
&self.ha_configuration.entities.has_unread_messages,
120-
)
121-
.await?;
137+
force_update,
138+
));
139+
140+
try_join_all(futures).await?;
122141

123142
Ok(())
124143
}

src/mqtt/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl MqttApi {
3838

3939
#[async_trait]
4040
impl Listener for MqttApi {
41-
async fn notify_changed(&self, teams_states: &TeamsStates) -> anyhow::Result<()> {
41+
async fn notify_changed(&self, teams_states: &TeamsStates, _: bool) -> anyhow::Result<()> {
4242
let muted = &*bool_to_str(teams_states.is_muted.load(Ordering::Relaxed));
4343
let video_on = &*bool_to_str(teams_states.is_video_on.load(Ordering::Relaxed));
4444
let hand_raised = &*bool_to_str(teams_states.is_hand_raised.load(Ordering::Relaxed));

src/teams_ws/api.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,21 @@ impl TeamsAPI {
3434
pub fn new(conf: &TeamsConfiguration) -> Self {
3535
let teams_states = Arc::new(TeamsStates {
3636
is_muted: AtomicBool::new(false),
37+
prev_is_muted: AtomicBool::new(false),
3738
is_video_on: AtomicBool::new(false),
39+
prev_is_video_on: AtomicBool::new(false),
3840
is_hand_raised: AtomicBool::new(false),
41+
prev_is_hand_raised: AtomicBool::new(false),
3942
is_in_meeting: AtomicBool::new(false),
43+
prev_is_in_meeting: AtomicBool::new(false),
4044
is_recording_on: AtomicBool::new(false),
45+
prev_is_recording_on: AtomicBool::new(false),
4146
is_background_blurred: AtomicBool::new(false),
47+
prev_is_background_blurred: AtomicBool::new(false),
4248
is_sharing: AtomicBool::new(false),
49+
prev_is_sharing: AtomicBool::new(false),
4350
has_unread_messages: AtomicBool::new(false),
51+
prev_has_unread_messages: AtomicBool::new(false),
4452
});
4553

4654
let api_token = if !conf.api_token.is_empty() {
@@ -160,12 +168,14 @@ async fn parse_data_and_notify_listener(
160168
)
161169
.await;
162170

163-
if force_update.swap(false, Ordering::Relaxed) || has_changed {
171+
let force_update = force_update.swap(false, Ordering::Relaxed);
172+
173+
if force_update || has_changed {
164174
// Issue!: This will only run once regardless of MAX_RETRIES
165175
// for some reason after a reconnect the notify_changed will get a pass no matter what
166176
const MAX_RETRIES: i32 = 3;
167177
for i in 1..MAX_RETRIES {
168-
let result = listener.lock().unwrap().notify_changed(&teams_states).await;
178+
let result = listener.lock().unwrap().notify_changed(&teams_states, force_update).await;
169179

170180
if result.is_ok() || (i == MAX_RETRIES) {
171181
result?;

src/teams_ws/states.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ use std::sync::atomic::AtomicBool;
22

33
pub struct TeamsStates {
44
pub is_muted: AtomicBool,
5+
pub prev_is_muted: AtomicBool,
56
pub is_video_on: AtomicBool,
7+
pub prev_is_video_on: AtomicBool,
68
pub is_hand_raised: AtomicBool,
9+
pub prev_is_hand_raised: AtomicBool,
710
pub is_in_meeting: AtomicBool,
11+
pub prev_is_in_meeting: AtomicBool,
812
pub is_recording_on: AtomicBool,
13+
pub prev_is_recording_on: AtomicBool,
914
pub is_background_blurred: AtomicBool,
15+
pub prev_is_background_blurred: AtomicBool,
1016
pub is_sharing: AtomicBool,
17+
pub prev_is_sharing: AtomicBool,
1118
pub has_unread_messages: AtomicBool,
19+
pub prev_has_unread_messages: AtomicBool,
1220
}

src/traits.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ pub trait StopController {}
66
// todo: convert to Rust built-in once 1.75 is released
77
#[async_trait]
88
pub trait Listener {
9-
async fn notify_changed(&self, teams_states: &TeamsStates) -> anyhow::Result<()>;
9+
async fn notify_changed(&self, teams_states: &TeamsStates, force_update: bool) -> anyhow::Result<()>;
1010
fn reconnect(&mut self);
1111
}

0 commit comments

Comments
 (0)