Skip to content

Commit e60b61a

Browse files
committed
refresh on network connection changes
1 parent 5002772 commit e60b61a

File tree

2 files changed

+92
-75
lines changed

2 files changed

+92
-75
lines changed

ntfy-daemon/src/actor_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
macro_rules! send_command {
22
($self:expr, $command:expr) => {{
33
let (resp_tx, rx) = oneshot::channel();
4+
use anyhow::Context;
45
$self
56
.command_tx
67
.send($command(resp_tx))
78
.await
8-
.map_err(|_| anyhow::anyhow!("Actor mailbox error"))?;
9-
rx.await
10-
.map_err(|_| anyhow::anyhow!("Actor response error"))?
9+
.context("Actor mailbox error")?;
10+
rx.await.context("Actor response error")?
1111
}};
1212
}
1313

ntfy-daemon/src/ntfy.rs

Lines changed: 89 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::models::NullNetworkMonitor;
33
use crate::models::NullNotifier;
44
use anyhow::{anyhow, Context};
55
use futures::future::join_all;
6+
use futures::StreamExt;
67
use std::{collections::HashMap, future::Future, sync::Arc};
8+
use tokio::select;
79
use tokio::{
810
sync::{broadcast, mpsc, oneshot, RwLock},
911
task::{spawn_local, LocalSet},
@@ -135,85 +137,89 @@ impl NtfyActor {
135137
}
136138

137139
pub async fn run(&mut self) {
138-
while let Some(msg) = self.command_rx.recv().await {
139-
match msg {
140-
NtfyCommand::Subscribe {
141-
server,
142-
topic,
143-
resp_tx,
144-
} => {
145-
let result = self.handle_subscribe(server, topic).await;
146-
let _ = resp_tx.send(result);
147-
}
140+
let mut network_change_stream = self.env.network_monitor.listen();
141+
loop {
142+
select! {
143+
Some(_) = network_change_stream.next() => {
144+
let _ = self.refresh_all().await;
145+
},
146+
Some(command) = self.command_rx.recv() => self.handle_command(command).await,
147+
};
148+
}
149+
}
148150

149-
NtfyCommand::Unsubscribe {
150-
server,
151-
topic,
152-
resp_tx,
153-
} => {
154-
let result = self.handle_unsubscribe(server, topic).await;
155-
let _ = resp_tx.send(result);
156-
}
151+
async fn handle_command(&mut self, command: NtfyCommand) {
152+
match command {
153+
NtfyCommand::Subscribe {
154+
server,
155+
topic,
156+
resp_tx,
157+
} => {
158+
let result = self.handle_subscribe(server, topic).await;
159+
let _ = resp_tx.send(result);
160+
}
157161

158-
NtfyCommand::RefreshAll { resp_tx } => {
159-
let mut res = Ok(());
160-
for sub in self.listener_handles.read().await.values() {
161-
res = sub.restart().await;
162-
if res.is_err() {
163-
break;
164-
}
165-
}
166-
let _ = resp_tx.send(res);
167-
}
162+
NtfyCommand::Unsubscribe {
163+
server,
164+
topic,
165+
resp_tx,
166+
} => {
167+
let result = self.handle_unsubscribe(server, topic).await;
168+
let _ = resp_tx.send(result);
169+
}
168170

169-
NtfyCommand::ListSubscriptions { resp_tx } => {
170-
let subs = self
171-
.listener_handles
172-
.read()
173-
.await
174-
.values()
175-
.cloned()
176-
.collect();
177-
let _ = resp_tx.send(Ok(subs));
178-
}
171+
NtfyCommand::RefreshAll { resp_tx } => {
172+
let res = self.refresh_all().await;
173+
let _ = resp_tx.send(res);
174+
}
179175

180-
NtfyCommand::ListAccounts { resp_tx } => {
181-
let accounts = self
182-
.env
183-
.credentials
184-
.list_all()
185-
.into_iter()
186-
.map(|(server, credential)| Account {
187-
server,
188-
username: credential.username,
189-
})
190-
.collect();
191-
let _ = resp_tx.send(Ok(accounts));
192-
}
176+
NtfyCommand::ListSubscriptions { resp_tx } => {
177+
let subs = self
178+
.listener_handles
179+
.read()
180+
.await
181+
.values()
182+
.cloned()
183+
.collect();
184+
let _ = resp_tx.send(Ok(subs));
185+
}
193186

194-
NtfyCommand::WatchSubscribed { resp_tx } => {
195-
let result = self.handle_watch_subscribed().await;
196-
let _ = resp_tx.send(result);
197-
}
187+
NtfyCommand::ListAccounts { resp_tx } => {
188+
let accounts = self
189+
.env
190+
.credentials
191+
.list_all()
192+
.into_iter()
193+
.map(|(server, credential)| Account {
194+
server,
195+
username: credential.username,
196+
})
197+
.collect();
198+
let _ = resp_tx.send(Ok(accounts));
199+
}
198200

199-
NtfyCommand::AddAccount {
200-
server,
201-
username,
202-
password,
203-
resp_tx,
204-
} => {
205-
let result = self
206-
.env
207-
.credentials
208-
.insert(&server, &username, &password)
209-
.await;
210-
let _ = resp_tx.send(result);
211-
}
201+
NtfyCommand::WatchSubscribed { resp_tx } => {
202+
let result = self.handle_watch_subscribed().await;
203+
let _ = resp_tx.send(result);
204+
}
212205

213-
NtfyCommand::RemoveAccount { server, resp_tx } => {
214-
let result = self.env.credentials.delete(&server).await;
215-
let _ = resp_tx.send(result);
216-
}
206+
NtfyCommand::AddAccount {
207+
server,
208+
username,
209+
password,
210+
resp_tx,
211+
} => {
212+
let result = self
213+
.env
214+
.credentials
215+
.insert(&server, &username, &password)
216+
.await;
217+
let _ = resp_tx.send(result);
218+
}
219+
220+
NtfyCommand::RemoveAccount { server, resp_tx } => {
221+
let result = self.env.credentials.delete(&server).await;
222+
let _ = resp_tx.send(result);
217223
}
218224
}
219225
}
@@ -261,6 +267,17 @@ impl NtfyActor {
261267
Ok(sub)
262268
}
263269
}
270+
271+
async fn refresh_all(&self) -> anyhow::Result<()> {
272+
let mut res = Ok(());
273+
for sub in self.listener_handles.read().await.values() {
274+
res = sub.restart().await;
275+
if res.is_err() {
276+
break;
277+
}
278+
}
279+
res
280+
}
264281
}
265282

266283
impl NtfyHandle {

0 commit comments

Comments
 (0)