Skip to content

Commit ae833ac

Browse files
authored
Chain event handling (#554)
* Add block height to event * Refactor event handlers * Add block time as send date * Only move subscription offset on chain events * Rename handlers * Review fixes
1 parent 0d88139 commit ae833ac

File tree

12 files changed

+1121
-889
lines changed

12 files changed

+1121
-889
lines changed

crates/bcr-ebill-api/src/service/notification_service/default_service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl DefaultNotificationService {
177177
.send_public_chain_event(
178178
&block_event.data.bill_id,
179179
BlockchainType::Bill,
180+
block_event.data.block.timestamp,
180181
events.bill_keys.clone().try_into()?,
181182
block_event.clone().try_into()?,
182183
previous_event.clone().map(|e| e.payload),
@@ -650,6 +651,7 @@ mod tests {
650651
&self,
651652
id: &str,
652653
blockchain: BlockchainType,
654+
block_time: u64,
653655
keys: BcrKeys,
654656
event: EventEnvelope,
655657
previous_event: Option<nostr::event::Event>,
@@ -1100,7 +1102,7 @@ mod tests {
11001102
.times(2);
11011103

11021104
mock.expect_send_public_chain_event()
1103-
.returning(|_, _, _, _, _, _| Ok(get_test_nostr_event()))
1105+
.returning(|_, _, _, _, _, _, _| Ok(get_test_nostr_event()))
11041106
.times(2);
11051107

11061108
mock.expect_send_private_event()
@@ -1242,7 +1244,7 @@ mod tests {
12421244
.returning(|_, _| Err(Error::Network("Failed to send".to_string())));
12431245

12441246
mock.expect_send_public_chain_event()
1245-
.returning(|_, _, _, _, _, _| Ok(get_test_nostr_event()));
1247+
.returning(|_, _, _, _, _, _, _| Ok(get_test_nostr_event()));
12461248

12471249
mock.expect_send_private_event()
12481250
.withf(move |_, e| {
@@ -1357,15 +1359,15 @@ mod tests {
13571359
let mut mock_event_store: MockNostrChainEventStore = MockNostrChainEventStore::new();
13581360
if new_blocks {
13591361
mock.expect_send_public_chain_event()
1360-
.returning(|_, _, _, _, _, _| Ok(get_test_nostr_event()))
1362+
.returning(|_, _, _, _, _, _, _| Ok(get_test_nostr_event()))
13611363
.once();
13621364
mock_event_store = setup_event_store_expectations(
13631365
chain.get_latest_block().previous_hash.to_owned().as_str(),
13641366
bill.id.as_str(),
13651367
);
13661368
} else {
13671369
mock.expect_send_public_chain_event()
1368-
.returning(|_, _, _, _, _, _| Ok(get_test_nostr_event()))
1370+
.returning(|_, _, _, _, _, _, _| Ok(get_test_nostr_event()))
13691371
.never();
13701372
}
13711373

crates/bcr-ebill-api/src/service/notification_service/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use bcr_ebill_persistence::nostr::{
1212
};
1313
use bcr_ebill_transport::chain_keys::ChainKeyServiceApi;
1414
use bcr_ebill_transport::handler::{
15-
BillChainEventHandler, BillChainEventProcessor, BillInviteEventHandler, LoggingEventHandler,
16-
NotificationHandlerApi,
15+
BillActionEventHandler, BillChainEventHandler, BillChainEventProcessor, BillInviteEventHandler,
16+
LoggingEventHandler, NotificationHandlerApi,
1717
};
1818
use bcr_ebill_transport::{Error, EventType, Result};
1919
use bcr_ebill_transport::{NotificationServiceApi, PushApi};
@@ -134,7 +134,7 @@ pub async fn create_nostr_consumer(
134134

135135
let processor = Arc::new(BillChainEventProcessor::new(
136136
bill_blockchain_store,
137-
bill_store,
137+
bill_store.clone(),
138138
transport.clone(),
139139
nostr_contact_store,
140140
));
@@ -150,7 +150,7 @@ pub async fn create_nostr_consumer(
150150
Box::new(LoggingEventHandler {
151151
event_types: EventType::all(),
152152
}),
153-
Box::new(BillChainEventHandler::new(
153+
Box::new(BillActionEventHandler::new(
154154
notification_store,
155155
push_service,
156156
processor.clone(),
@@ -160,6 +160,11 @@ pub async fn create_nostr_consumer(
160160
processor.clone(),
161161
chain_event_store.clone(),
162162
)),
163+
Box::new(BillChainEventHandler::new(
164+
processor.clone(),
165+
bill_store.clone(),
166+
chain_event_store.clone(),
167+
)),
163168
];
164169
debug!("initializing nostr consumer for {} clients", clients.len());
165170
let consumer = NostrConsumer::new(

crates/bcr-ebill-api/src/service/notification_service/nostr.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub enum SortOrder {
8181
/// We use the latest GiftWrap and PrivateDirectMessage already with this if I
8282
/// understand the nostr-sdk docs and sources correctly.
8383
/// @see https://nips.nostr.com/59 and https://nips.nostr.com/17
84-
#[derive(Clone, Debug)]
84+
#[derive(Clone)]
8585
pub struct NostrClient {
8686
pub keys: BcrKeys,
8787
pub client: Client,
@@ -340,6 +340,7 @@ impl NotificationJsonTransportApi for NostrClient {
340340
&self,
341341
id: &str,
342342
blockchain: BlockchainType,
343+
block_time: u64,
343344
keys: BcrKeys,
344345
event: EventEnvelope,
345346
previous_event: Option<Event>,
@@ -348,7 +349,8 @@ impl NotificationJsonTransportApi for NostrClient {
348349
let event = create_public_chain_event(
349350
id,
350351
event,
351-
blockchain.to_owned(),
352+
block_time,
353+
blockchain,
352354
keys,
353355
previous_event,
354356
root_event,
@@ -506,7 +508,7 @@ impl NostrConsumer {
506508
)
507509
.await
508510
{
509-
let success = match event.kind {
511+
let (success, time) = match event.kind {
510512
Kind::EncryptedDirectMessage | Kind::GiftWrap => {
511513
trace!("Received encrypted direct message: {event:?}");
512514
match handle_direct_message(
@@ -519,9 +521,9 @@ impl NostrConsumer {
519521
{
520522
Err(e) => {
521523
error!("Failed to handle direct message: {e}");
522-
false
524+
(false, 0u64)
523525
}
524-
Ok(_) => true,
526+
Ok(_) => (true, event.created_at.as_u64()),
525527
}
526528
}
527529
Kind::TextNote => {
@@ -538,32 +540,32 @@ impl NostrConsumer {
538540
error!(
539541
"Failed to handle public chain event: {e}"
540542
);
541-
false
543+
(false, 0u64)
544+
}
545+
Ok(v) => {
546+
if v {
547+
(v, event.created_at.as_u64())
548+
} else {
549+
(false, 0u64)
550+
}
542551
}
543-
Ok(_) => true,
544552
}
545553
}
546554
Kind::RelayList => {
547555
// we have not subscribed to relaylist events yet
548556
info!("Received relay list: {event:?}");
549-
true
557+
(true, 0u64)
550558
}
551559
Kind::Metadata => {
552560
// we have not subscribed to metadata events yet
553561
info!("Received metadata: {event:?}");
554-
true
562+
(true, 0u64)
555563
}
556-
_ => true,
564+
_ => (true, 0u64),
557565
};
558566
// store the new event offset
559-
add_offset(
560-
&offset_store,
561-
event.id,
562-
event.created_at,
563-
success,
564-
&client_id,
565-
)
566-
.await;
567+
add_offset(&offset_store, event.id, time, success, &client_id)
568+
.await;
567569
}
568570
}
569571
Ok(false)
@@ -606,13 +608,13 @@ async fn handle_direct_message<T: NostrSigner>(
606608
client_id: &str,
607609
event_handlers: &Arc<Vec<Box<dyn NotificationHandlerApi>>>,
608610
) -> Result<()> {
609-
if let Some((envelope, sender, _, _)) = unwrap_direct_message(event, signer).await {
611+
if let Some((envelope, sender, _, _)) = unwrap_direct_message(event.clone(), signer).await {
610612
let sender_npub = sender.to_bech32();
611613
let sender_node_id = sender.to_hex();
612614
trace!(
613615
"Processing event: {envelope:?} from {sender_npub:?} (hex: {sender_node_id}) on client {client_id}"
614616
);
615-
handle_event(envelope, client_id, event_handlers).await?;
617+
handle_event(envelope, client_id, event_handlers, event).await?;
616618
}
617619
Ok(())
618620
}
@@ -622,18 +624,20 @@ async fn handle_public_event(
622624
node_id: &str,
623625
chain_key_store: &Arc<dyn ChainKeyServiceApi>,
624626
handlers: &Arc<Vec<Box<dyn NotificationHandlerApi>>>,
625-
) -> Result<()> {
626-
if let Some(encrypted_data) = unwrap_public_chain_event(event)? {
627+
) -> Result<bool> {
628+
if let Some(encrypted_data) = unwrap_public_chain_event(event.clone())? {
627629
if let Ok(Some(chain_keys)) = chain_key_store
628630
.get_chain_keys(&encrypted_data.id, encrypted_data.chain_type)
629631
.await
630632
{
631633
let decrypted = decrypt_public_chain_event(&encrypted_data.payload, &chain_keys)?;
632634
trace!("Handling public chain event: {decrypted:?}");
633-
handle_event(decrypted, node_id, handlers).await?
635+
handle_event(decrypted.clone(), node_id, handlers, event.clone()).await?;
634636
}
637+
Ok(true)
638+
} else {
639+
Ok(false)
635640
}
636-
Ok(())
637641
}
638642

639643
async fn valid_sender(
@@ -670,13 +674,13 @@ async fn get_offset(db: &Arc<dyn NostrEventOffsetStoreApi>, node_id: &str) -> Ti
670674
async fn add_offset(
671675
db: &Arc<dyn NostrEventOffsetStoreApi>,
672676
event_id: EventId,
673-
time: Timestamp,
677+
time: u64,
674678
success: bool,
675679
node_id: &str,
676680
) {
677681
db.add_event(NostrEventOffset {
678682
event_id: event_id.to_hex(),
679-
time: time.as_u64(),
683+
time,
680684
success,
681685
node_id: node_id.to_string(),
682686
})
@@ -690,12 +694,16 @@ async fn handle_event(
690694
event: EventEnvelope,
691695
node_id: &str,
692696
handlers: &Arc<Vec<Box<dyn NotificationHandlerApi>>>,
697+
original_event: Box<nostr::Event>,
693698
) -> Result<()> {
694699
let event_type = &event.event_type;
695700
let mut times = 0;
696701
for handler in handlers.iter() {
697702
if handler.handles_event(event_type) {
698-
match handler.handle_event(event.to_owned(), node_id).await {
703+
match handler
704+
.handle_event(event.to_owned(), node_id, original_event.clone())
705+
.await
706+
{
699707
Ok(_) => times += 1,
700708
Err(e) => error!("Nostr event handler failed: {e}"),
701709
}
@@ -736,7 +744,7 @@ mod tests {
736744
pub NotificationHandler {}
737745
#[async_trait::async_trait]
738746
impl NotificationHandlerApi for NotificationHandler {
739-
async fn handle_event(&self, event: EventEnvelope, identity: &str) -> bcr_ebill_transport::Result<()>;
747+
async fn handle_event(&self, event: EventEnvelope, identity: &str, original_event: Box<nostr::Event>) -> bcr_ebill_transport::Result<()>;
740748
fn handles_event(&self, event_type: &EventType) -> bool;
741749
}
742750
}
@@ -796,7 +804,7 @@ mod tests {
796804
let expected_event: Event<TestEventPayload> = event.clone();
797805
handler
798806
.expect_handle_event()
799-
.withf(move |e, i| {
807+
.withf(move |e, i, _| {
800808
let expected = expected_event.clone();
801809
let received: Event<TestEventPayload> =
802810
e.clone().try_into().expect("could not convert event");
@@ -805,7 +813,7 @@ mod tests {
805813
let valid_identity = i == keys2.get_public_key();
806814
valid_type && valid_payload && valid_identity
807815
})
808-
.returning(|_, _| Ok(()));
816+
.returning(|_, _, _| Ok(()));
809817

810818
let mut offset_store = MockNostrEventOffsetStoreApiMock::new();
811819

crates/bcr-ebill-api/src/service/notification_service/test_utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ impl NotificationHandlerApi for TestEventHandler<TestEventPayload> {
6969
}
7070
}
7171

72-
async fn handle_event(&self, event: EventEnvelope, _: &str) -> bcr_ebill_transport::Result<()> {
72+
async fn handle_event(
73+
&self,
74+
event: EventEnvelope,
75+
_: &str,
76+
_: Box<nostr::Event>,
77+
) -> bcr_ebill_transport::Result<()> {
7378
*self.called.lock().await = true;
7479
let event: Event<TestEventPayload> = event.try_into()?;
7580
*self.received_event.lock().await = Some(event);

crates/bcr-ebill-transport/src/event/bill_blockchain_event.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,6 @@ pub struct ChainKeys {
5959
#[derive(Serialize, Deserialize, Debug, Clone)]
6060
pub struct BillBlockEvent {
6161
pub bill_id: String,
62+
pub block_height: usize,
6263
pub block: BillBlock,
6364
}

crates/bcr-ebill-transport/src/event/bill_events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ impl BillChainEvent {
147147
}
148148
Some(Event::new_chain(BillBlockEvent {
149149
bill_id: self.bill.id.to_owned(),
150+
block_height: self.block_height(),
150151
block: self.latest_block(),
151152
}))
152153
}

0 commit comments

Comments
 (0)