Skip to content

Commit 213276a

Browse files
authored
Notification filter with node_ids (#489)
* Notification filter with node_ids * Review fixes
1 parent 20247ab commit 213276a

File tree

9 files changed

+84
-28
lines changed

9 files changed

+84
-28
lines changed

crates/bcr-ebill-persistence/src/db/notification.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ impl NotificationStoreApi for SurrealNotificationStore {
8484
if let Some(notification_type) = filter.get_notification_type() {
8585
query = query.bind(notification_type.to_owned());
8686
}
87+
if let Some(node_ids) = filter.get_node_ids() {
88+
query = query.bind(node_ids.to_owned());
89+
}
8790
let result: Vec<NotificationDb> = query.await?.take(0)?;
8891
Ok(result.into_iter().map(|n| n.into()).collect())
8992
}

crates/bcr-ebill-persistence/src/notification.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub struct NotificationFilter {
5252
pub active: Option<bool>,
5353
pub reference_id: Option<String>,
5454
pub notification_type: Option<String>,
55+
pub node_ids: Vec<String>,
5556
pub limit: Option<i64>,
5657
pub offset: Option<i64>,
5758
}
@@ -69,6 +70,10 @@ impl NotificationFilter {
6970
parts.push("notification_type = $notification_type");
7071
}
7172

73+
if !self.node_ids.is_empty() {
74+
parts.push("node_id IN $node_ids");
75+
}
76+
7277
let filters = parts.join(" AND ");
7378
if filters.is_empty() {
7479
filters
@@ -103,6 +108,14 @@ impl NotificationFilter {
103108
)
104109
})
105110
}
111+
112+
pub fn get_node_ids(&self) -> Option<(String, Vec<String>)> {
113+
if !self.node_ids.is_empty() {
114+
Some(("node_ids".to_string(), self.node_ids.clone()))
115+
} else {
116+
None
117+
}
118+
}
106119
}
107120

108121
#[cfg(test)]
@@ -118,16 +131,32 @@ mod tests {
118131
};
119132
assert_eq!(active.filters(), "WHERE active = $active");
120133

134+
let node_ids = super::NotificationFilter {
135+
node_ids: vec!["123".to_string(), "456".to_string()],
136+
..Default::default()
137+
};
138+
139+
assert_eq!(node_ids.filters(), "WHERE node_id IN $node_ids");
140+
141+
assert_eq!(
142+
node_ids.get_node_ids(),
143+
Some((
144+
"node_ids".to_string(),
145+
vec!["123".to_string(), "456".to_string()]
146+
))
147+
);
148+
121149
let all = super::NotificationFilter {
122150
active: Some(true),
123151
reference_id: Some("123".to_string()),
124152
notification_type: Some("Bill".to_string()),
153+
node_ids: vec!["123".to_string()],
125154
..Default::default()
126155
};
127156

128157
assert_eq!(
129158
all.filters(),
130-
"WHERE active = $active AND reference_id = $reference_id AND notification_type = $notification_type"
159+
"WHERE active = $active AND reference_id = $reference_id AND notification_type = $notification_type AND node_id IN $node_ids"
131160
);
132161
}
133162
}

crates/bcr-ebill-transport/src/handler/bill_chain_event_handler.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@ use bcr_ebill_core::notification::{Notification, NotificationType};
1616
use bcr_ebill_persistence::NotificationStoreApi;
1717
use bcr_ebill_persistence::bill::BillChainStoreApi;
1818
use bcr_ebill_persistence::bill::BillStoreApi;
19-
use log::debug;
20-
use log::error;
21-
use log::info;
22-
use log::warn;
19+
use log::{debug, error, info, trace, warn};
2320
use std::sync::Arc;
2421

2522
#[derive(Clone)]
@@ -50,7 +47,7 @@ impl BillChainEventHandler {
5047
event: &BillChainEventPayload,
5148
node_id: &str,
5249
) -> Result<()> {
53-
debug!("creating notification {event:?} for {node_id}");
50+
trace!("creating notification {event:?} for {node_id}");
5451
// no action no notification required
5552
if event.action_type.is_none() {
5653
return Ok(());
@@ -95,7 +92,7 @@ impl BillChainEventHandler {
9592
// send push notification to connected clients
9693
match serde_json::to_value(notification) {
9794
Ok(notification) => {
98-
debug!("sending notification {notification:?} for {node_id}");
95+
trace!("sending notification {notification:?} for {node_id}");
9996
self.push_service.send(notification).await;
10097
}
10198
Err(e) => {
@@ -384,7 +381,7 @@ impl NotificationHandlerApi for BillChainEventHandler {
384381
}
385382

386383
async fn handle_event(&self, event: EventEnvelope, node_id: &str) -> Result<()> {
387-
debug!("incoming bill chain event {event:?} for {node_id}");
384+
debug!("incoming bill chain event for {node_id}");
388385
if let Ok(decoded) = Event::<BillChainEventPayload>::try_from(event.clone()) {
389386
if !decoded.data.blocks.is_empty() {
390387
if let Err(e) = self

crates/bcr-ebill-transport/src/handler/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::Result;
22
use async_trait::async_trait;
33
use bcr_ebill_core::ServiceTraitBounds;
4-
use log::info;
4+
use log::trace;
55
#[cfg(test)]
66
use mockall::automock;
77

@@ -47,7 +47,7 @@ impl NotificationHandlerApi for LoggingEventHandler {
4747
}
4848

4949
async fn handle_event(&self, event: EventEnvelope, identity: &str) -> Result<()> {
50-
info!("Received event: {event:?} for identity: {identity}");
50+
trace!("Received event: {event:?} for identity: {identity}");
5151
Ok(())
5252
}
5353
}

crates/bcr-ebill-transport/src/push_notification.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_trait::async_trait;
2-
use log::error;
2+
use log::trace;
33
use std::sync::Arc;
44

55
use async_broadcast::{InactiveReceiver, Receiver, Sender};
@@ -23,7 +23,9 @@ pub struct PushService {
2323

2424
impl PushService {
2525
pub fn new() -> Self {
26-
let (tx, rx) = async_broadcast::broadcast::<Value>(5);
26+
let (mut tx, rx) = async_broadcast::broadcast::<Value>(5);
27+
tx.set_overflow(true);
28+
tx.set_await_active(false);
2729
let inactive = rx.deactivate();
2830
Self {
2931
sender: Arc::new(tx),
@@ -44,7 +46,7 @@ impl PushApi for PushService {
4446
match self.sender.broadcast(value).await {
4547
Ok(_) => {}
4648
Err(err) => {
47-
error!("Error sending push message: {}", err);
49+
trace!("Error sending push message: {}", err);
4850
}
4951
}
5052
}

crates/bcr-ebill-wasm/main.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ async function start() {
133133
console.log("status: ", status);
134134

135135
// Notifications
136-
let notifications = await notificationApi.list();
136+
let filter = current_identity ? { node_ids: [current_identity.node_id] } : null;
137+
let notifications = await notificationApi.list(filter);
137138
console.log("notifications: ", notifications);
138139
return { companyApi, generalApi, identityApi, billApi, contactApi, notificationApi };
139140
}

crates/bcr-ebill-wasm/src/api/notification.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::Result;
22
use crate::{
33
context::get_ctx,
4-
data::{IntoWeb, notification::NotificationWeb},
4+
data::{FromWeb, IntoWeb, NotificationFilters, notification::NotificationWeb},
55
};
66
use bcr_ebill_api::NotificationFilter;
77
use log::{error, info};
@@ -40,21 +40,17 @@ impl Notification {
4040
#[wasm_bindgen(unchecked_return_type = "NotificationWeb[]")]
4141
pub async fn list(
4242
&self,
43-
active: Option<bool>,
44-
reference_id: Option<String>,
45-
notification_type: Option<String>,
46-
limit: Option<i64>,
47-
offset: Option<i64>,
43+
#[wasm_bindgen(unchecked_param_type = "NotificationFilters")] filters: JsValue,
4844
) -> Result<JsValue> {
45+
let filter = NotificationFilter::from_web(
46+
serde_wasm_bindgen::from_value::<NotificationFilters>(filters)
47+
.ok()
48+
.unwrap_or_default(),
49+
);
50+
4951
let notifications = get_ctx()
5052
.notification_service
51-
.get_client_notifications(NotificationFilter {
52-
active,
53-
reference_id,
54-
notification_type,
55-
limit,
56-
offset,
57-
})
53+
.get_client_notifications(filter)
5854
.await?;
5955

6056
let web: Vec<NotificationWeb> = notifications.into_iter().map(|n| n.into_web()).collect();

crates/bcr-ebill-wasm/src/data/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use bcr_ebill_api::{
3+
NotificationFilter,
34
data::{
45
File, GeneralSearchFilterItemType, GeneralSearchResult, OptionalPostalAddress,
56
PostalAddress, UploadFileResult,
@@ -188,6 +189,30 @@ impl IntoWeb<PostalAddressWeb> for PostalAddress {
188189
}
189190
}
190191

192+
#[derive(Tsify, Debug, Clone, Serialize, Deserialize, Default)]
193+
#[tsify(into_wasm_abi, from_wasm_abi)]
194+
pub struct NotificationFilters {
195+
pub active: Option<bool>,
196+
pub reference_id: Option<String>,
197+
pub notification_type: Option<String>,
198+
pub node_ids: Option<Vec<String>>,
199+
pub limit: Option<i64>,
200+
pub offset: Option<i64>,
201+
}
202+
203+
impl FromWeb<NotificationFilters> for NotificationFilter {
204+
fn from_web(value: NotificationFilters) -> Self {
205+
Self {
206+
active: value.active,
207+
reference_id: value.reference_id,
208+
notification_type: value.notification_type,
209+
node_ids: value.node_ids.unwrap_or_default(),
210+
limit: value.limit,
211+
offset: value.offset,
212+
}
213+
}
214+
}
215+
191216
#[derive(Tsify, Debug, Clone, Serialize, Deserialize)]
192217
#[tsify(into_wasm_abi, from_wasm_abi)]
193218
pub struct FileWeb {

crates/bcr-ebill-web/src/handlers/notifications.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@ use serde_json::Value;
1919
("active" = Option<bool>, Query, description = "Returns only active notifications when true, inactive when false and all when left out"),
2020
("reference_id" = Option<String>, Query, description = "The id of the entity to filter by (eg. a bill id)"),
2121
("notification_type" = Option<String>, Query, description = "The type of notifications to return (eg. Bill)"),
22+
("node_ids" = Option<Vec<String>>, Query, description = "The node_ids of the identity for which to query notifications"),
2223
("limit" = Option<i64>, Query, description = "The max number of notifications to return"),
2324
("offset" = Option<i64>, Query, description = "The number of notifications to skip at the start of the result")
2425
)
2526
)]
26-
#[get("/notifications?<active>&<reference_id>&<notification_type>&<limit>&<offset>")]
27+
#[get("/notifications?<active>&<reference_id>&<notification_type>&<limit>&<offset>&<node_ids>")]
2728
pub async fn list_notifications(
2829
state: &State<ServiceContext>,
2930
active: Option<bool>,
3031
reference_id: Option<String>,
3132
notification_type: Option<String>,
33+
node_ids: Option<Vec<String>>,
3234
limit: Option<i64>,
3335
offset: Option<i64>,
3436
) -> Result<Json<Vec<NotificationWeb>>> {
@@ -38,6 +40,7 @@ pub async fn list_notifications(
3840
active,
3941
reference_id,
4042
notification_type,
43+
node_ids: node_ids.unwrap_or_default(),
4144
limit,
4245
offset,
4346
})

0 commit comments

Comments
 (0)