Skip to content

Commit 901de69

Browse files
authored
Notification filter (#386)
* Add active filter for notification endpoint * Add identity context to Nostr event handling * Full filters * Merged and docs * Review fixes
1 parent a2d5bc7 commit 901de69

File tree

11 files changed

+244
-67
lines changed

11 files changed

+244
-67
lines changed

src/persistence/db/notification.rs

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde_json::Value;
55
use surrealdb::{engine::any::Any, sql::Thing, Surreal};
66

77
use crate::{
8-
persistence::notification::NotificationStoreApi,
8+
persistence::notification::{NotificationFilter, NotificationStoreApi},
99
service::notification_service::{ActionType, Notification, NotificationType},
1010
util::date::{now, DateTimeUtc},
1111
};
@@ -46,14 +46,28 @@ impl NotificationStoreApi for SurrealNotificationStore {
4646
}
4747
}
4848
/// Returns all currently active notifications from the database
49-
async fn list(&self) -> Result<Vec<Notification>> {
50-
let result: Vec<NotificationDb> = self
49+
async fn list(&self, filter: NotificationFilter) -> Result<Vec<Notification>> {
50+
let filters = filter.filters();
51+
let mut query = self
5152
.db
52-
.query("SELECT * FROM type::table($table) WHERE active = true ORDER BY datetime DESC")
53+
.query(format!(
54+
"SELECT * FROM type::table($table) {} ORDER BY datetime DESC LIMIT $limit START $offset",
55+
filters
56+
))
5357
.bind(("table", Self::TABLE))
54-
.await?
55-
.take(0)?;
58+
.bind(("limit", filter.get_limit()))
59+
.bind(("offset", filter.get_offset()));
5660

61+
if let Some(active) = filter.get_active() {
62+
query = query.bind(active.to_owned());
63+
}
64+
if let Some(reference_id) = filter.get_reference_id() {
65+
query = query.bind(reference_id.to_owned());
66+
}
67+
if let Some(notification_type) = filter.get_notification_type() {
68+
query = query.bind(notification_type.to_owned());
69+
}
70+
let result: Vec<NotificationDb> = query.await?.take(0)?;
5771
Ok(result.into_iter().map(|n| n.into()).collect())
5872
}
5973
/// Returns the latest active notification for the given reference and notification type
@@ -62,24 +76,27 @@ impl NotificationStoreApi for SurrealNotificationStore {
6276
reference: &str,
6377
notification_type: NotificationType,
6478
) -> Result<Option<Notification>> {
65-
let result: Vec<NotificationDb> = self.db.query("SELECT * FROM type::table($table) WHERE active = true AND reference_id = $reference_id AND notification_type = $notification_type ORDER BY datetime desc")
66-
.bind(("table", Self::TABLE))
67-
.bind(("reference_id", reference.to_owned()))
68-
.bind(("notification_type", notification_type))
69-
.await?
70-
.take(0)?;
71-
72-
Ok(result.first().map(|n| n.clone().into()))
79+
let result = self
80+
.list(NotificationFilter {
81+
active: Some(true),
82+
reference_id: Some(reference.to_owned()),
83+
notification_type: Some(notification_type.to_string()),
84+
limit: Some(1),
85+
..Default::default()
86+
})
87+
.await?;
88+
Ok(result.first().cloned())
7389
}
7490
/// Returns all notifications for the given reference and notification type that are active
7591
async fn list_by_type(&self, notification_type: NotificationType) -> Result<Vec<Notification>> {
76-
let result: Vec<NotificationDb> = self.db.query("SELECT * FROM type::table($table) WHERE active = true AND notification_type = $notification_type ORDER BY datetime desc")
77-
.bind(("table", Self::TABLE))
78-
.bind(("notification_type", notification_type))
79-
.await?
80-
.take(0)?;
81-
82-
Ok(result.into_iter().map(|n| n.into()).collect())
92+
let result = self
93+
.list(NotificationFilter {
94+
active: Some(true),
95+
notification_type: Some(notification_type.to_string()),
96+
..Default::default()
97+
})
98+
.await?;
99+
Ok(result)
83100
}
84101
/// Marks an active notification as done
85102
async fn mark_as_done(&self, notification_id: &str) -> Result<()> {
@@ -135,6 +152,7 @@ impl NotificationStoreApi for SurrealNotificationStore {
135152
#[derive(Debug, Clone, Serialize, Deserialize)]
136153
struct NotificationDb {
137154
pub id: Thing,
155+
pub node_id: Option<String>,
138156
pub notification_type: NotificationType,
139157
pub reference_id: Option<String>,
140158
pub description: String,
@@ -147,6 +165,7 @@ impl From<NotificationDb> for Notification {
147165
fn from(value: NotificationDb) -> Self {
148166
Self {
149167
id: value.id.id.to_raw(),
168+
node_id: value.node_id,
150169
notification_type: value.notification_type,
151170
reference_id: value.reference_id,
152171
description: value.description,
@@ -165,6 +184,7 @@ impl From<Notification> for NotificationDb {
165184
value.id.to_owned(),
166185
)
167186
.into(),
187+
node_id: value.node_id,
168188
notification_type: value.notification_type,
169189
reference_id: value.reference_id,
170190
description: value.description,
@@ -243,36 +263,45 @@ mod tests {
243263
}
244264

245265
#[tokio::test]
246-
async fn test_inserts_and_queries_notifiction() {
266+
async fn test_inserts_and_queries_notification() {
247267
let store = get_store().await;
248268
let notification = test_notification("bill_id", Some(test_payload()));
249269
let r = store
250270
.add(notification.clone())
251271
.await
252272
.expect("could not create notification");
253273

254-
let all = store.list().await.expect("could not list notifications");
274+
let all = store
275+
.list(NotificationFilter::default())
276+
.await
277+
.expect("could not list notifications");
255278
assert!(!all.is_empty());
256279
assert_eq!(notification.id, r.id);
257280
}
258281

259282
#[tokio::test]
260-
async fn test_deletes_existing_notifiction() {
283+
async fn test_deletes_existing_notification() {
261284
let store = get_store().await;
262285
let notification = test_notification("bill_id", Some(test_payload()));
263286
let r = store
264287
.add(notification.clone())
265288
.await
266289
.expect("could not create notification");
267290

268-
let all = store.list().await.expect("could not list notifications");
291+
let all = store
292+
.list(NotificationFilter::default())
293+
.await
294+
.expect("could not list notifications");
269295
assert!(!all.is_empty());
270296

271297
store
272298
.delete(&r.id)
273299
.await
274300
.expect("could not delete notification");
275-
let all = store.list().await.expect("could not list notifications");
301+
let all = store
302+
.list(NotificationFilter::default())
303+
.await
304+
.expect("could not list notifications");
276305
assert!(all.is_empty());
277306
}
278307

@@ -285,15 +314,23 @@ mod tests {
285314
.await
286315
.expect("could not create notification");
287316

288-
let all = store.list().await.expect("could not list notifications");
317+
let mut filter = NotificationFilter::default();
318+
filter.active = Some(true);
319+
let all = store
320+
.list(filter.clone())
321+
.await
322+
.expect("could not list notifications");
289323
assert!(!all.is_empty());
290324

291325
store
292326
.mark_as_done(&r.id)
293327
.await
294328
.expect("could not mark notification as done");
295329

296-
let all = store.list().await.expect("could not list notifications");
330+
let all = store
331+
.list(filter)
332+
.await
333+
.expect("could not list notifications");
297334
assert!(all.is_empty());
298335
}
299336

@@ -369,7 +406,7 @@ mod tests {
369406
}
370407

371408
fn test_notification(bill_id: &str, payload: Option<Value>) -> Notification {
372-
Notification::new_bill_notification(bill_id, "test_notification", payload)
409+
Notification::new_bill_notification(bill_id, "node_id", "test_notification", payload)
373410
}
374411

375412
fn test_payload() -> Value {
@@ -379,6 +416,7 @@ mod tests {
379416
fn test_general_notification() -> Notification {
380417
Notification {
381418
id: Uuid::new_v4().to_string(),
419+
node_id: Some("node_id".to_string()),
382420
notification_type: NotificationType::General,
383421
reference_id: Some("general".to_string()),
384422
description: "general desc".to_string(),

src/persistence/notification.rs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub trait NotificationStoreApi: Send + Sync {
1111
/// Stores a new notification into the database
1212
async fn add(&self, notification: Notification) -> Result<Notification>;
1313
/// Returns all currently active notifications from the database
14-
async fn list(&self) -> Result<Vec<Notification>>;
14+
async fn list(&self, filter: NotificationFilter) -> Result<Vec<Notification>>;
1515
/// Returns the latest active notification for the given reference and notification type
1616
async fn get_latest_by_reference(
1717
&self,
@@ -41,3 +41,88 @@ pub trait NotificationStoreApi: Send + Sync {
4141
action_type: ActionType,
4242
) -> Result<bool>;
4343
}
44+
45+
#[derive(Default, Clone, PartialEq, Debug)]
46+
pub struct NotificationFilter {
47+
pub active: Option<bool>,
48+
pub reference_id: Option<String>,
49+
pub notification_type: Option<String>,
50+
pub limit: Option<i64>,
51+
pub offset: Option<i64>,
52+
}
53+
54+
impl NotificationFilter {
55+
pub fn filters(&self) -> String {
56+
let mut parts = vec![];
57+
if self.active.is_some() {
58+
parts.push("active = $active");
59+
}
60+
if self.reference_id.is_some() {
61+
parts.push("reference_id = $reference_id");
62+
}
63+
if self.notification_type.is_some() {
64+
parts.push("notification_type = $notification_type");
65+
}
66+
67+
let filters = parts.join(" AND ");
68+
if filters.is_empty() {
69+
filters
70+
} else {
71+
format!("WHERE {}", filters)
72+
}
73+
}
74+
75+
pub fn get_limit(&self) -> i64 {
76+
self.limit.unwrap_or(200)
77+
}
78+
79+
pub fn get_offset(&self) -> i64 {
80+
self.offset.unwrap_or(0)
81+
}
82+
83+
pub fn get_active(&self) -> Option<(String, bool)> {
84+
self.active.map(|active| ("active".to_string(), active))
85+
}
86+
87+
pub fn get_reference_id(&self) -> Option<(String, String)> {
88+
self.reference_id
89+
.as_ref()
90+
.map(|reference_id| ("reference_id".to_string(), reference_id.to_string()))
91+
}
92+
93+
pub fn get_notification_type(&self) -> Option<(String, String)> {
94+
self.notification_type.as_ref().map(|notification_type| {
95+
(
96+
"notification_type".to_string(),
97+
notification_type.to_string(),
98+
)
99+
})
100+
}
101+
}
102+
103+
#[cfg(test)]
104+
mod tests {
105+
#[test]
106+
fn test_query_filters() {
107+
let empty = super::NotificationFilter::default();
108+
assert_eq!(empty.filters(), "");
109+
110+
let active = super::NotificationFilter {
111+
active: Some(true),
112+
..Default::default()
113+
};
114+
assert_eq!(active.filters(), "WHERE active = $active");
115+
116+
let all = super::NotificationFilter {
117+
active: Some(true),
118+
reference_id: Some("123".to_string()),
119+
notification_type: Some("Bill".to_string()),
120+
..Default::default()
121+
};
122+
123+
assert_eq!(
124+
all.filters(),
125+
"WHERE active = $active AND reference_id = $reference_id AND notification_type = $notification_type"
126+
);
127+
}
128+
}

src/service/bill_service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use async_trait::async_trait;
4242
use borsh::to_vec;
4343
use borsh_derive::{BorshDeserialize, BorshSerialize};
4444
use log::info;
45+
#[cfg(test)]
46+
use mockall::automock;
4547
use rocket::http::ContentType;
4648
use rocket::Response;
4749
use rocket::{http::Status, response::Responder};
@@ -264,6 +266,7 @@ impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
264266
}
265267
}
266268

269+
#[cfg_attr(test, automock)]
267270
#[async_trait]
268271
pub trait BillServiceApi: Send + Sync {
269272
/// Get bill balances

src/service/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ pub async fn create_service_context(
240240
let notification_service =
241241
create_notification_service(nostr_client.clone(), db.notification_store.clone()).await?;
242242

243-
let bill_service = BillService::new(
243+
let bill_service = Arc::new(BillService::new(
244244
client.clone(),
245245
db.bill_store,
246246
db.bill_blockchain_store.clone(),
@@ -252,7 +252,7 @@ pub async fn create_service_context(
252252
db.company_chain_store.clone(),
253253
db.contact_store.clone(),
254254
db.company_store.clone(),
255-
);
255+
));
256256
let identity_service = IdentityService::new(
257257
db.identity_store.clone(),
258258
db.file_upload_store.clone(),
@@ -281,7 +281,7 @@ pub async fn create_service_context(
281281
.await?;
282282

283283
let search_service = SearchService::new(
284-
Arc::new(bill_service.clone()),
284+
bill_service.clone(),
285285
contact_service.clone(),
286286
Arc::new(company_service.clone()),
287287
);
@@ -298,7 +298,7 @@ pub async fn create_service_context(
298298
dht_client: client,
299299
contact_service,
300300
search_service: Arc::new(search_service),
301-
bill_service: Arc::new(bill_service),
301+
bill_service,
302302
identity_service: Arc::new(identity_service),
303303
company_service: Arc::new(company_service),
304304
file_upload_service: Arc::new(file_upload_service),

src/service/notification_service/bill_action_event_handler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,13 @@ impl NotificationHandlerApi for BillActionEventHandler {
6262
true
6363
}
6464

65-
async fn handle_event(&self, event: EventEnvelope) -> Result<()> {
65+
async fn handle_event(&self, event: EventEnvelope, node_id: &str) -> Result<()> {
6666
let event: Option<Event<BillActionEventPayload>> = event.try_into().ok();
6767
if let Some(event) = event {
6868
// create notification
6969
let notification = Notification::new_bill_notification(
7070
&event.data.bill_id,
71+
node_id,
7172
&self.event_description(&event.event_type),
7273
Some(serde_json::to_value(&event.data)?),
7374
);

0 commit comments

Comments
 (0)