Skip to content

Commit 99ab769

Browse files
committed
added lossy option to events_observers
1 parent 11912b6 commit 99ab769

File tree

6 files changed

+72
-25
lines changed

6 files changed

+72
-25
lines changed

stackslib/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,7 @@ impl Config {
915915
endpoint: observer.endpoint,
916916
events_keys,
917917
timeout_ms: observer.timeout_ms.unwrap_or(1_000),
918+
lossy: observer.lossy.unwrap_or(false),
918919
});
919920
}
920921
observers
@@ -928,6 +929,7 @@ impl Config {
928929
endpoint: val,
929930
events_keys: vec![EventKeyType::AnyEvent],
930931
timeout_ms: 1_000,
932+
lossy: false,
931933
});
932934
};
933935

@@ -2824,13 +2826,15 @@ pub struct EventObserverConfigFile {
28242826
pub endpoint: String,
28252827
pub events_keys: Vec<String>,
28262828
pub timeout_ms: Option<u64>,
2829+
pub lossy: Option<bool>,
28272830
}
28282831

28292832
#[derive(Clone, Default, Debug, Hash, PartialEq, Eq, PartialOrd)]
28302833
pub struct EventObserverConfig {
28312834
pub endpoint: String,
28322835
pub events_keys: Vec<EventKeyType>,
28332836
pub timeout_ms: u64,
2837+
pub lossy: bool,
28342838
}
28352839

28362840
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd)]

testnet/stacks-node/src/event_dispatcher.rs

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ struct EventObserver {
9090
endpoint: String,
9191
/// Timeout for sending events to this observer
9292
timeout: Duration,
93+
/// Lossy observers do not retry on error
94+
lossy: bool,
9395
}
9496

9597
struct ReceiptPayloadInfo<'a> {
@@ -439,7 +441,7 @@ impl EventObserver {
439441

440442
for (id, url, payload, timeout_ms) in pending_payloads {
441443
let timeout = Duration::from_millis(timeout_ms);
442-
Self::send_payload_directly(&payload, &url, timeout);
444+
Self::send_payload_directly(&payload, &url, timeout, false);
443445

444446
#[cfg(test)]
445447
if TEST_EVENT_OBSERVER_SKIP_RETRY.get() {
@@ -456,7 +458,12 @@ impl EventObserver {
456458
}
457459
}
458460

459-
fn send_payload_directly(payload: &serde_json::Value, full_url: &str, timeout: Duration) {
461+
fn send_payload_directly(
462+
payload: &serde_json::Value,
463+
full_url: &str,
464+
timeout: Duration,
465+
lossy: bool,
466+
) {
460467
debug!(
461468
"Event dispatcher: Sending payload"; "url" => %full_url, "payload" => ?payload
462469
);
@@ -506,6 +513,11 @@ impl EventObserver {
506513
}
507514
}
508515

516+
if lossy {
517+
warn!("Observer is configured in lossy mode: skipping retry of payload");
518+
return;
519+
}
520+
509521
#[cfg(test)]
510522
if TEST_EVENT_OBSERVER_SKIP_RETRY.get() {
511523
warn!("Fault injection: skipping retry of payload");
@@ -522,7 +534,7 @@ impl EventObserver {
522534
}
523535
}
524536

525-
fn new(working_dir: Option<PathBuf>, endpoint: String, timeout: Duration) -> Self {
537+
fn new(working_dir: Option<PathBuf>, endpoint: String, timeout: Duration, lossy: bool) -> Self {
526538
let db_path = if let Some(mut db_path) = working_dir {
527539
db_path.push("event_observers.sqlite");
528540

@@ -541,6 +553,7 @@ impl EventObserver {
541553
db_path,
542554
endpoint,
543555
timeout,
556+
lossy,
544557
}
545558
}
546559

@@ -555,18 +568,23 @@ impl EventObserver {
555568
};
556569
let full_url = format!("http://{url_str}");
557570

558-
if let Some(db_path) = &self.db_path {
559-
let conn =
560-
Connection::open(db_path).expect("Failed to open database for event observer");
571+
// if the observer is in lossy mode quickly send the payload without checking for the db
572+
if self.lossy {
573+
Self::send_payload_directly(payload, &full_url, self.timeout, true);
574+
} else {
575+
if let Some(db_path) = &self.db_path {
576+
let conn =
577+
Connection::open(db_path).expect("Failed to open database for event observer");
561578

562-
// Insert the new payload into the database
563-
Self::insert_payload_with_retry(&conn, &full_url, payload, self.timeout);
579+
// Insert the new payload into the database
580+
Self::insert_payload_with_retry(&conn, &full_url, payload, self.timeout);
564581

565-
// Process all pending payloads
566-
Self::process_pending_payloads(&conn);
567-
} else {
568-
// No database, just send the payload
569-
Self::send_payload_directly(payload, &full_url, self.timeout);
582+
// Process all pending payloads
583+
Self::process_pending_payloads(&conn);
584+
} else {
585+
// No database, just send the payload
586+
Self::send_payload_directly(payload, &full_url, self.timeout, false);
587+
}
570588
}
571589
}
572590

@@ -1666,6 +1684,7 @@ impl EventDispatcher {
16661684
Some(working_dir),
16671685
conf.endpoint.clone(),
16681686
Duration::from_millis(conf.timeout_ms),
1687+
conf.lossy,
16691688
);
16701689

16711690
let observer_index = self.registered_observers.len() as u16;
@@ -1770,7 +1789,8 @@ mod test {
17701789

17711790
#[test]
17721791
fn build_block_processed_event() {
1773-
let observer = EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3));
1792+
let observer =
1793+
EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3), false);
17741794

17751795
let filtered_events = vec![];
17761796
let block = StacksBlock::genesis_block();
@@ -1830,7 +1850,8 @@ mod test {
18301850

18311851
#[test]
18321852
fn test_block_processed_event_nakamoto() {
1833-
let observer = EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3));
1853+
let observer =
1854+
EventObserver::new(None, "nowhere".to_string(), Duration::from_secs(3), false);
18341855

18351856
let filtered_events = vec![];
18361857
let mut block_header = NakamotoBlockHeader::empty();
@@ -2087,7 +2108,8 @@ mod test {
20872108
let endpoint = "http://example.com".to_string();
20882109
let timeout = Duration::from_secs(5);
20892110

2090-
let observer = EventObserver::new(Some(working_dir.clone()), endpoint.clone(), timeout);
2111+
let observer =
2112+
EventObserver::new(Some(working_dir.clone()), endpoint.clone(), timeout, false);
20912113

20922114
// Verify fields
20932115
assert_eq!(observer.endpoint, endpoint);
@@ -2104,7 +2126,7 @@ mod test {
21042126
let endpoint = "http://example.com".to_string();
21052127
let timeout = Duration::from_secs(5);
21062128

2107-
let observer = EventObserver::new(None, endpoint.clone(), timeout);
2129+
let observer = EventObserver::new(None, endpoint.clone(), timeout, false);
21082130

21092131
// Verify fields
21102132
assert_eq!(observer.endpoint, endpoint);
@@ -2133,7 +2155,7 @@ mod test {
21332155
let endpoint = server.url().strip_prefix("http://").unwrap().to_string();
21342156
let timeout = Duration::from_secs(5);
21352157

2136-
let observer = EventObserver::new(Some(working_dir), endpoint, timeout);
2158+
let observer = EventObserver::new(Some(working_dir), endpoint, timeout, false);
21372159

21382160
TEST_EVENT_OBSERVER_SKIP_RETRY.set(false);
21392161

@@ -2170,7 +2192,7 @@ mod test {
21702192

21712193
let endpoint = server.url().strip_prefix("http://").unwrap().to_string();
21722194

2173-
let observer = EventObserver::new(None, endpoint, timeout);
2195+
let observer = EventObserver::new(None, endpoint, timeout, false);
21742196

21752197
// Call send_payload
21762198
observer.send_payload(&payload, "/test");
@@ -2201,8 +2223,12 @@ mod test {
22012223
tx.send(()).unwrap();
22022224
});
22032225

2204-
let observer =
2205-
EventObserver::new(None, format!("127.0.0.1:{port}"), Duration::from_secs(3));
2226+
let observer = EventObserver::new(
2227+
None,
2228+
format!("127.0.0.1:{port}"),
2229+
Duration::from_secs(3),
2230+
false,
2231+
);
22062232

22072233
let payload = json!({"key": "value"});
22082234

@@ -2250,8 +2276,12 @@ mod test {
22502276
}
22512277
});
22522278

2253-
let observer =
2254-
EventObserver::new(None, format!("127.0.0.1:{port}"), Duration::from_secs(3));
2279+
let observer = EventObserver::new(
2280+
None,
2281+
format!("127.0.0.1:{port}"),
2282+
Duration::from_secs(3),
2283+
false,
2284+
);
22552285

22562286
let payload = json!({"key": "value"});
22572287

@@ -2298,7 +2328,7 @@ mod test {
22982328
}
22992329
});
23002330

2301-
let observer = EventObserver::new(None, format!("127.0.0.1:{port}"), timeout);
2331+
let observer = EventObserver::new(None, format!("127.0.0.1:{port}"), timeout, false);
23022332

23032333
let payload = json!({"key": "value"});
23042334

@@ -2391,7 +2421,12 @@ mod test {
23912421
}
23922422
});
23932423

2394-
let observer = EventObserver::new(Some(working_dir), format!("127.0.0.1:{port}"), timeout);
2424+
let observer = EventObserver::new(
2425+
Some(working_dir),
2426+
format!("127.0.0.1:{port}"),
2427+
timeout,
2428+
false,
2429+
);
23952430

23962431
let payload = json!({"key": "value"});
23972432
let payload2 = json!({"key": "value2"});

testnet/stacks-node/src/tests/epoch_22.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ fn pox_2_unlock_all() {
644644
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
645645
events_keys: vec![EventKeyType::AnyEvent],
646646
timeout_ms: 1000,
647+
lossy: false,
647648
});
648649
conf.initial_balances.append(&mut initial_balances);
649650

testnet/stacks-node/src/tests/neon_integrations.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ pub mod test_observer {
657657
endpoint: format!("localhost:{EVENT_OBSERVER_PORT}"),
658658
events_keys: event_keys.to_vec(),
659659
timeout_ms: 1000,
660+
lossy: false,
660661
});
661662
}
662663

@@ -7490,6 +7491,7 @@ fn atlas_integration_test() {
74907491
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
74917492
events_keys: vec![EventKeyType::AnyEvent],
74927493
timeout_ms: 1000,
7494+
lossy: false,
74937495
});
74947496

74957497
conf_follower_node.node.always_use_affirmation_maps = false;
@@ -8023,6 +8025,7 @@ fn antientropy_integration_test() {
80238025
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
80248026
events_keys: vec![EventKeyType::AnyEvent],
80258027
timeout_ms: 1000,
8028+
lossy: false,
80268029
});
80278030

80288031
conf_follower_node.node.mine_microblocks = true;

testnet/stacks-node/src/tests/signer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,7 @@ fn setup_stx_btc_node<G: FnMut(&mut NeonConfig)>(
845845
EventKeyType::BurnchainBlocks,
846846
],
847847
timeout_ms: 1000,
848+
lossy: false,
848849
});
849850
}
850851

@@ -860,6 +861,7 @@ fn setup_stx_btc_node<G: FnMut(&mut NeonConfig)>(
860861
EventKeyType::BurnchainBlocks,
861862
],
862863
timeout_ms: 1000,
864+
lossy: false,
863865
});
864866

865867
// The signers need some initial balances in order to pay for epoch 2.5 transaction votes

testnet/stacks-node/src/tests/signer/v0.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4238,6 +4238,7 @@ fn signer_set_rollover() {
42384238
EventKeyType::BurnchainBlocks,
42394239
],
42404240
timeout_ms: 1000,
4241+
lossy: false,
42414242
});
42424243
}
42434244
naka_conf.node.rpc_bind = rpc_bind.clone();
@@ -11114,6 +11115,7 @@ fn injected_signatures_are_ignored_across_boundaries() {
1111411115
EventKeyType::BurnchainBlocks,
1111511116
],
1111611117
timeout_ms: 1000,
11118+
lossy: false,
1111711119
});
1111811120
naka_conf.node.rpc_bind = rpc_bind.clone();
1111911121
},

0 commit comments

Comments
 (0)