Skip to content

Commit 5c51832

Browse files
committed
Reduce cpu usage and fix network monitor
1 parent ae2084c commit 5c51832

File tree

8 files changed

+68
-28
lines changed

8 files changed

+68
-28
lines changed

Cargo.lock

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ rand = "0.8.5"
3131
ureq = "2.7.1"
3232
futures = "0.3.0"
3333
ashpd = "0.6.0"
34+
async-channel = "2.1.0"
3435
relm4-macros = { version = "0.6.2", features = [], default-features = false }

ntfy-daemon/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ rusqlite = "0.29.0"
2424
rand = "0.8.5"
2525
reqwest = { version = "0.11.18", features = ["stream", "rustls-tls"]}
2626
url = "2.4.0"
27-
ashpd = "0.6.0"
2827
generational-arena = "0.2.9"
2928
tracing = "0.1.37"
3029
thiserror = "1.0.49"

ntfy-daemon/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub struct SharedEnv {
1414
db: message_repo::Db,
1515
proxy: Arc<dyn models::NotificationProxy>,
1616
http: reqwest::Client,
17-
network: Arc<ashpd::desktop::network_monitor::NetworkMonitor<'static>>,
17+
network: Arc<dyn models::NetworkMonitorProxy>,
1818
}
1919

2020
#[derive(thiserror::Error, Debug)]

ntfy-daemon/src/models.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::collections::HashMap;
2+
use std::pin::Pin;
23
use std::sync::OnceLock;
34

5+
use futures::stream::Stream;
46
use regex::Regex;
57
use serde::{Deserialize, Serialize};
68

@@ -316,3 +318,7 @@ pub struct Notification {
316318
pub trait NotificationProxy: Sync + Send {
317319
fn send(&self, n: Notification) -> anyhow::Result<()>;
318320
}
321+
322+
pub trait NetworkMonitorProxy: Sync + Send {
323+
fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>>;
324+
}

ntfy-daemon/src/system_client.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::sync::Arc;
55
use std::time::Duration;
66
use std::{collections::HashMap, hash::Hash};
77

8-
use ashpd::desktop::network_monitor::NetworkMonitor;
98
use capnp::capability::Promise;
109
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
1110
use futures::future::join_all;
@@ -343,7 +342,7 @@ impl SystemNotifier {
343342
pub fn new(
344343
dbpath: &str,
345344
notification_proxy: Arc<dyn models::NotificationProxy>,
346-
network: Arc<NetworkMonitor<'static>>,
345+
network: Arc<dyn models::NetworkMonitorProxy>,
347346
) -> Self {
348347
Self {
349348
watching: Rc::new(RefCell::new(HashMap::new())),
@@ -457,12 +456,12 @@ pub fn start(
457456
socket_path: std::path::PathBuf,
458457
dbpath: &str,
459458
notification_proxy: Arc<dyn models::NotificationProxy>,
459+
network_proxy: Arc<dyn models::NetworkMonitorProxy>,
460460
) -> anyhow::Result<()> {
461461
let rt = tokio::runtime::Builder::new_current_thread()
462462
.enable_all()
463463
.build()?;
464464

465-
let network_monitor = rt.block_on(async move { NetworkMonitor::new().await.unwrap() });
466465
let listener = rt.block_on(async move {
467466
let _ = std::fs::remove_file(&socket_path);
468467
UnixListener::bind(&socket_path).unwrap()
@@ -471,8 +470,7 @@ pub fn start(
471470
let dbpath = dbpath.to_owned();
472471
let f = move || {
473472
let local = tokio::task::LocalSet::new();
474-
let mut system_notifier =
475-
SystemNotifier::new(&dbpath, notification_proxy, Arc::new(network_monitor));
473+
let mut system_notifier = SystemNotifier::new(&dbpath, notification_proxy, network_proxy);
476474
local.spawn_local(async move {
477475
system_notifier.watch_subscribed().await.unwrap();
478476
let system_client: system_notifier::Client = capnp_rpc::new_client(system_notifier);

ntfy-daemon/src/topic_listener.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::ops::ControlFlow;
22
use std::sync::Arc;
33
use std::time::Duration;
44

5-
use ashpd::desktop::network_monitor::NetworkMonitor;
65
use futures::prelude::*;
76
use reqwest::header::HeaderValue;
87
use serde::{Deserialize, Serialize};
@@ -141,21 +140,14 @@ impl TopicListener {
141140
}
142141

143142
async fn reload_on_network_change(
144-
monitor: Arc<NetworkMonitor<'static>>,
143+
monitor: Arc<dyn models::NetworkMonitorProxy>,
145144
tx: mpsc::Sender<ControlFlow<()>>,
146145
) -> anyhow::Result<()> {
147-
let mut prev_available = false;
148-
149-
loop {
150-
let _ = monitor.receive_changed().await?;
151-
let available = monitor.is_available().await?;
152-
if available && !prev_available {
153-
if let Err(e) = tx.send(ControlFlow::Continue(())).await {
154-
return Err(e.into());
155-
}
156-
}
157-
prev_available = available;
146+
let mut m = monitor.listen();
147+
while let Some(_) = m.next().await {
148+
tx.send(ControlFlow::Continue(())).await?;
158149
}
150+
Ok(())
159151
}
160152

161153
fn send_current_status(&mut self) -> impl Future<Output = anyhow::Result<()>> {

src/application.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
use std::cell::Cell;
12
use std::path::Path;
23
use std::path::PathBuf;
4+
use std::pin::Pin;
5+
use std::rc::Rc;
36

47
use adw::subclass::prelude::*;
58
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
9+
use futures::stream::Stream;
610
use futures::AsyncReadExt;
711
use gettextrs::gettext;
812
use gio::SocketClient;
@@ -242,6 +246,11 @@ impl NotifyApplication {
242246
let dbpath = glib::user_data_dir().join("com.ranfdev.Notify.sqlite");
243247
info!(database_path = %dbpath.display());
244248

249+
// Here I'm sending notifications to the desktop environment and listening for network changes.
250+
// This should have been inside ntfy-daemon, but using portals from another thread causes the error
251+
// `Invalid client serial` and it's broken.
252+
// Until https://github.com/flatpak/xdg-dbus-proxy/issues/46 is solved, I have to handle these things
253+
// in the main thread. Uff.
245254
let (tx, rx) = glib::MainContext::channel(Default::default());
246255
let app = self.clone();
247256
rx.attach(None, move |n: models::Notification| {
@@ -268,17 +277,39 @@ impl NotifyApplication {
268277
glib::ControlFlow::Continue
269278
});
270279

271-
struct Proxy(glib::Sender<models::Notification>);
272-
impl models::NotificationProxy for Proxy {
280+
struct Proxies {
281+
notification: glib::Sender<models::Notification>,
282+
}
283+
impl models::NotificationProxy for Proxies {
273284
fn send(&self, n: models::Notification) -> anyhow::Result<()> {
274-
self.0.send(n)?;
285+
self.notification.send(n)?;
275286
Ok(())
276287
}
277288
}
289+
impl models::NetworkMonitorProxy for Proxies {
290+
fn listen(&self) -> Pin<Box<dyn Stream<Item = ()>>> {
291+
let (tx, rx) = async_channel::bounded(1);
292+
let mut prev_available = Rc::new(Cell::new(false));
293+
294+
gio::NetworkMonitor::default().connect_network_changed(move |_, available| {
295+
dbg!("sent", available);
296+
if available && !prev_available.get() {
297+
if let Err(e) = tx.send_blocking(()) {
298+
warn!(error = %e);
299+
}
300+
}
301+
prev_available.replace(available);
302+
});
303+
304+
Box::pin(rx)
305+
}
306+
}
307+
let proxies = std::sync::Arc::new(Proxies { notification: tx });
278308
ntfy_daemon::system_client::start(
279309
socket_path.to_owned(),
280310
dbpath.to_str().unwrap(),
281-
std::sync::Arc::new(Proxy(tx)),
311+
proxies.clone(),
312+
proxies,
282313
)
283314
.unwrap();
284315
self.imp().hold_guard.set(self.hold()).unwrap();

0 commit comments

Comments
 (0)