Skip to content

Commit be6ce20

Browse files
committed
Refactor: remove ntfy_proxy, use SharedEnv
1 parent 1a65c29 commit be6ce20

File tree

5 files changed

+150
-254
lines changed

5 files changed

+150
-254
lines changed

ntfy-daemon/src/lib.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
11
pub mod message_repo;
22
pub mod models;
3-
pub mod ntfy_proxy;
43
pub mod retry;
54
pub mod system_client;
5+
pub mod topic_listener;
66
pub mod ntfy_capnp {
77
include!(concat!(env!("OUT_DIR"), "/src/ntfy_capnp.rs"));
88
}
99

10+
use std::sync::Arc;
11+
12+
#[derive(Clone)]
13+
pub struct SharedEnv {
14+
db: message_repo::Db,
15+
proxy: Arc<dyn models::NotificationProxy>,
16+
http: reqwest::Client,
17+
network: Arc<ashpd::desktop::network_monitor::NetworkMonitor<'static>>,
18+
}
19+
1020
#[derive(thiserror::Error, Debug)]
1121
pub enum Error {
1222
#[error("topic {0} must not be empty and must contain only alphanumeric characters and _ (underscore)")]
@@ -22,3 +32,9 @@ pub enum Error {
2232
#[error("subscription not found while {0}")]
2333
SubscriptionNotFound(String),
2434
}
35+
36+
impl From<Error> for capnp::Error {
37+
fn from(value: Error) -> Self {
38+
capnp::Error::failed(format!("{:?}", value))
39+
}
40+
}

ntfy-daemon/src/ntfy.capnp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@ interface OutputChannel {
1414
done @2 ();
1515
}
1616

17-
interface NtfyProxy {
18-
getServer @0 () -> (server: Text);
19-
watch @1 (topic: Text, watcher: OutputChannel, since: UInt64) -> (handle: WatchHandle);
20-
publish @2 (message: Text);
21-
}
22-
2317
struct SubscriptionInfo {
2418
server @0 :Text;
2519
topic @1 :Text;

ntfy-daemon/src/system_client.rs

Lines changed: 85 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,50 @@
1-
use std::cell::OnceCell;
21
use std::cell::{Cell, RefCell};
2+
use std::ops::ControlFlow;
33
use std::rc::{Rc, Weak};
44
use std::sync::Arc;
55
use std::time::Duration;
66
use std::{collections::HashMap, hash::Hash};
77

8+
use ashpd::desktop::network_monitor::NetworkMonitor;
89
use capnp::capability::Promise;
910
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
1011
use futures::future::join_all;
1112
use futures::prelude::*;
1213
use generational_arena::Arena;
1314
use tokio::net::UnixListener;
15+
use tokio::sync::mpsc;
1416
use tracing::{error, info, warn};
1517

1618
use crate::models::Message;
1719
use crate::Error;
20+
use crate::SharedEnv;
1821
use crate::{
1922
message_repo::Db,
2023
models::{self, MinMessage},
21-
ntfy_capnp::ntfy_proxy,
2224
ntfy_capnp::{output_channel, subscription, system_notifier, watch_handle, Status},
23-
ntfy_proxy::NtfyProxyImpl,
25+
topic_listener::{build_client, TopicListener},
2426
};
2527

2628
const MESSAGE_THROTTLE: Duration = Duration::from_millis(150);
2729

28-
impl From<Error> for capnp::Error {
29-
fn from(value: Error) -> Self {
30-
capnp::Error::failed(format!("{:?}", value))
31-
}
32-
}
33-
3430
pub struct NotifyForwarder {
3531
model: Rc<RefCell<models::Subscription>>,
36-
db: Db,
32+
env: SharedEnv,
3733
watching: Weak<RefCell<Arena<output_channel::Client>>>,
3834
status: Rc<Cell<Status>>,
39-
notification_proxy: Arc<dyn models::NotificationProxy>,
4035
}
4136
impl NotifyForwarder {
4237
pub fn new(
4338
model: Rc<RefCell<models::Subscription>>,
44-
db: Db,
39+
env: SharedEnv,
4540
watching: Weak<RefCell<Arena<output_channel::Client>>>,
4641
status: Rc<Cell<Status>>,
47-
notification_proxy: Arc<dyn models::NotificationProxy>,
4842
) -> Self {
4943
Self {
5044
model,
51-
db,
45+
env,
5246
watching,
5347
status,
54-
notification_proxy,
5548
}
5649
}
5750
}
@@ -73,7 +66,7 @@ impl output_channel::Server for NotifyForwarder {
7366
let min_message: MinMessage = pry!(serde_json::from_str(&message)
7467
.map_err(|e| Error::InvalidMinMessage(message.to_string(), e)));
7568
let model = self.model.borrow();
76-
match self.db.insert_message(&model.server, message) {
69+
match self.env.db.insert_message(&model.server, message) {
7770
Err(Error::DuplicateMessage) => {
7871
warn!(min_message = ?min_message, "Received duplicate message");
7972
true
@@ -92,7 +85,7 @@ impl output_channel::Server for NotifyForwarder {
9285
if !{ self.model.borrow().muted } {
9386
let msg: Message = pry!(serde_json::from_str(&message)
9487
.map_err(|e| Error::InvalidMessage(message.to_string(), e)));
95-
let np = self.notification_proxy.clone();
88+
let np = self.env.proxy.clone();
9689
tokio::task::spawn_local(async move {
9790
let title = msg.display_title();
9891
let title = title.as_ref().map(|x| x.as_str()).unwrap_or(&msg.topic);
@@ -172,40 +165,64 @@ impl Drop for WatcherImpl {
172165

173166
pub struct SubscriptionImpl {
174167
model: Rc<RefCell<models::Subscription>>,
175-
db: Db,
176-
server: ntfy_proxy::Client,
177-
server_watch_handle: OnceCell<watch_handle::Client>,
168+
env: SharedEnv,
178169
watchers: Rc<RefCell<Arena<output_channel::Client>>>,
179170
status: Rc<Cell<Status>>,
180-
notification_proxy: Arc<dyn models::NotificationProxy>,
171+
topic_listener: mpsc::Sender<ControlFlow<()>>,
172+
}
173+
174+
impl Drop for SubscriptionImpl {
175+
fn drop(&mut self) {
176+
let t = self.topic_listener.clone();
177+
tokio::task::spawn_local(async move {
178+
t.send(ControlFlow::Break(())).await.unwrap();
179+
});
180+
}
181181
}
182182

183183
impl SubscriptionImpl {
184-
fn new(
185-
model: models::Subscription,
186-
server: ntfy_proxy::Client,
187-
db: Db,
188-
notification_proxy: Arc<dyn models::NotificationProxy>,
189-
) -> Self {
184+
fn new(model: models::Subscription, env: SharedEnv) -> Self {
185+
let status = Rc::new(Cell::new(Status::Down));
186+
let watchers = Default::default();
187+
let rc_model = Rc::new(RefCell::new(model.clone()));
188+
let output_channel = NotifyForwarder::new(
189+
rc_model.clone(),
190+
env.clone(),
191+
Rc::downgrade(&watchers),
192+
status.clone(),
193+
);
194+
let topic_listener = TopicListener::new(
195+
env.clone(),
196+
model.server.clone(),
197+
model.topic.clone(),
198+
model.read_until,
199+
capnp_rpc::new_client(output_channel),
200+
);
190201
Self {
191-
model: Rc::new(RefCell::new(model)),
192-
server,
193-
db,
194-
watchers: Default::default(),
195-
server_watch_handle: Default::default(),
196-
status: Rc::new(Cell::new(Status::Down)),
197-
notification_proxy,
202+
model: rc_model,
203+
env,
204+
watchers,
205+
status,
206+
topic_listener,
198207
}
199208
}
200209

201-
fn output_channel(&self) -> NotifyForwarder {
202-
NotifyForwarder::new(
203-
self.model.clone(),
204-
self.db.clone(),
205-
Rc::downgrade(&self.watchers),
206-
self.status.clone(),
207-
self.notification_proxy.clone(),
208-
)
210+
fn _publish<'a>(&'a mut self, msg: &'a str) -> impl Future<Output = Result<(), capnp::Error>> {
211+
let msg = msg.to_owned();
212+
let req = self.env.http.post(&self.model.borrow().server).body(msg);
213+
214+
async move {
215+
info!("sending message");
216+
let res = req.send().await;
217+
match res {
218+
Err(e) => Err(capnp::Error::failed(e.to_string())),
219+
Ok(res) => {
220+
res.error_for_status()
221+
.map_err(|e| capnp::Error::failed(e.to_string()))?;
222+
Ok(())
223+
}
224+
}
225+
}
209226
}
210227
}
211228

@@ -222,6 +239,7 @@ impl subscription::Server for SubscriptionImpl {
222239
let msgs = {
223240
let model = self.model.borrow();
224241
pry!(self
242+
.env
225243
.db
226244
.list_messages(&model.server, &model.topic, since)
227245
.map_err(Error::Db))
@@ -250,18 +268,17 @@ impl subscription::Server for SubscriptionImpl {
250268
Ok(())
251269
})
252270
}
271+
253272
fn publish(
254273
&mut self,
255274
params: subscription::PublishParams,
256275
_results: subscription::PublishResults,
257276
) -> capnp::capability::Promise<(), capnp::Error> {
258277
let msg = pry!(pry!(params.get()).get_message());
259-
260-
let mut req = self.server.publish_request();
261-
req.get().set_message(msg);
278+
let fut = self._publish(msg);
262279

263280
Promise::from_future(async move {
264-
req.send().promise.await?;
281+
fut.await?;
265282
Ok(())
266283
})
267284
}
@@ -289,7 +306,7 @@ impl subscription::Server for SubscriptionImpl {
289306
model.display_name = pry!(info.get_display_name()).to_string();
290307
model.muted = info.get_muted();
291308
model.read_until = info.get_read_until();
292-
pry!(self.db.update_subscription(model.clone()));
309+
pry!(self.env.db.update_subscription(model.clone()));
293310
Promise::ok(())
294311
}
295312
fn clear_notifications(
@@ -298,7 +315,7 @@ impl subscription::Server for SubscriptionImpl {
298315
_results: subscription::ClearNotificationsResults,
299316
) -> capnp::capability::Promise<(), capnp::Error> {
300317
let model = self.model.borrow_mut();
301-
pry!(self.db.delete_messages(&model.server, &model.topic));
318+
pry!(self.env.db.delete_messages(&model.server, &model.topic));
302319
Promise::ok(())
303320
}
304321

@@ -310,6 +327,7 @@ impl subscription::Server for SubscriptionImpl {
310327
let value = pry!(params.get()).get_value();
311328
let mut model = self.model.borrow_mut();
312329
pry!(self
330+
.env
313331
.db
314332
.update_read_until(&model.server, &model.topic, value));
315333
model.read_until = value;
@@ -323,53 +341,33 @@ pub struct WatchKey {
323341
topic: String,
324342
}
325343
pub struct SystemNotifier {
326-
servers: HashMap<String, ntfy_proxy::Client>,
327344
watching: Rc<RefCell<HashMap<WatchKey, subscription::Client>>>,
328-
db: Db,
329-
notification_proxy: Arc<dyn models::NotificationProxy>,
345+
env: SharedEnv,
330346
}
331347

332348
impl SystemNotifier {
333-
pub fn new(dbpath: &str, notification_proxy: Arc<dyn models::NotificationProxy>) -> Self {
349+
pub fn new(
350+
dbpath: &str,
351+
notification_proxy: Arc<dyn models::NotificationProxy>,
352+
network: Arc<NetworkMonitor<'static>>,
353+
) -> Self {
334354
Self {
335-
servers: HashMap::new(),
336355
watching: Rc::new(RefCell::new(HashMap::new())),
337-
db: Db::connect(dbpath).unwrap(),
338-
notification_proxy,
356+
env: SharedEnv {
357+
db: Db::connect(dbpath).unwrap(),
358+
proxy: notification_proxy,
359+
http: build_client().unwrap(),
360+
network,
361+
},
339362
}
340363
}
341364
fn watch(&mut self, sub: models::Subscription) -> Promise<subscription::Client, capnp::Error> {
342-
let ntfy = self
343-
.servers
344-
.entry(sub.server.to_owned())
345-
.or_insert_with(|| capnp_rpc::new_client(NtfyProxyImpl::new(sub.server.to_owned())));
346-
347-
let subscription = SubscriptionImpl::new(
348-
sub.clone(),
349-
ntfy.clone(),
350-
self.db.clone(),
351-
self.notification_proxy.clone(),
352-
);
353-
354-
let mut req = ntfy.watch_request();
355-
req.get().set_topic(&sub.topic);
356-
req.get()
357-
.set_watcher(capnp_rpc::new_client(subscription.output_channel()));
358-
let res = req.send();
359-
let handle = res.pipeline.get_handle();
360-
subscription
361-
.server_watch_handle
362-
.set(handle)
363-
.map_err(|_| "already set")
364-
.unwrap();
365+
let subscription = SubscriptionImpl::new(sub.clone(), self.env.clone());
365366

366367
let watching = self.watching.clone();
367368
let subc: subscription::Client = capnp_rpc::new_client(subscription);
368369

369370
Promise::from_future(async move {
370-
res.promise
371-
.await
372-
.map_err(|e| capnp::Error::failed(e.to_string()))?;
373371
watching.borrow_mut().insert(
374372
WatchKey {
375373
server: sub.server.to_owned(),
@@ -381,7 +379,7 @@ impl SystemNotifier {
381379
})
382380
}
383381
pub fn watch_subscribed(&mut self) -> Promise<(), capnp::Error> {
384-
let f: Vec<_> = pry!(self.db.list_subscriptions())
382+
let f: Vec<_> = pry!(self.env.db.list_subscriptions())
385383
.into_iter()
386384
.map(|m| self.watch(m.clone()))
387385
.collect();
@@ -418,7 +416,7 @@ impl system_notifier::Server for SystemNotifier {
418416
);
419417
let sub: Promise<subscription::Client, capnp::Error> = self.watch(subscription.clone());
420418

421-
let mut db = self.db.clone();
419+
let mut db = self.env.db.clone();
422420
Promise::from_future(async move {
423421
results.get().set_subscription(sub.await?);
424422

@@ -441,6 +439,7 @@ impl system_notifier::Server for SystemNotifier {
441439
topic: topic.to_string(),
442440
});
443441
pry!(self
442+
.env
444443
.db
445444
.remove_subscription(&server, &topic)
446445
.map_err(|e| capnp::Error::failed(e.to_string())));
@@ -475,6 +474,7 @@ pub fn start(
475474
.enable_all()
476475
.build()?;
477476

477+
let network_monitor = rt.block_on(async move { NetworkMonitor::new().await.unwrap() });
478478
let listener = rt.block_on(async move {
479479
let _ = std::fs::remove_file(&socket_path);
480480
UnixListener::bind(&socket_path).unwrap()
@@ -483,7 +483,8 @@ pub fn start(
483483
let dbpath = dbpath.to_owned();
484484
let f = move || {
485485
let local = tokio::task::LocalSet::new();
486-
let mut system_notifier = SystemNotifier::new(&dbpath, notification_proxy);
486+
let mut system_notifier =
487+
SystemNotifier::new(&dbpath, notification_proxy, Arc::new(network_monitor));
487488
local.spawn_local(async move {
488489
system_notifier.watch_subscribed().await.unwrap();
489490
let system_client: system_notifier::Client = capnp_rpc::new_client(system_notifier);

0 commit comments

Comments
 (0)