Skip to content

Commit 807c4d4

Browse files
committed
feat: make the timeout for event observers configurable
1 parent 3261d0d commit 807c4d4

File tree

14 files changed

+165
-6
lines changed

14 files changed

+165
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE
1717
- `get-tenure-info?` added
1818
- `get-block-info?` removed
1919
- Added `/v3/signer/{signer_pubkey}/{reward_cycle}` endpoint
20+
- Added optional `timeout_ms` to `events_observer` configuration
2021

2122
## [2.5.0.0.7]
2223

testnet/stacks-node/src/config.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,11 +1128,10 @@ impl Config {
11281128
.map(|e| EventKeyType::from_string(e).unwrap())
11291129
.collect();
11301130

1131-
let endpoint = format!("{}", observer.endpoint);
1132-
11331131
observers.insert(EventObserverConfig {
1134-
endpoint,
1132+
endpoint: observer.endpoint,
11351133
events_keys,
1134+
timeout_ms: observer.timeout_ms.unwrap_or(1_000),
11361135
});
11371136
}
11381137
observers
@@ -1146,6 +1145,7 @@ impl Config {
11461145
events_observers.insert(EventObserverConfig {
11471146
endpoint: val,
11481147
events_keys: vec![EventKeyType::AnyEvent],
1148+
timeout_ms: 1_000,
11491149
});
11501150
()
11511151
}
@@ -2921,12 +2921,14 @@ impl AtlasConfigFile {
29212921
pub struct EventObserverConfigFile {
29222922
pub endpoint: String,
29232923
pub events_keys: Vec<String>,
2924+
pub timeout_ms: Option<u64>,
29242925
}
29252926

29262927
#[derive(Clone, Default, Debug, Hash, PartialEq, Eq, PartialOrd)]
29272928
pub struct EventObserverConfig {
29282929
pub endpoint: String,
29292930
pub events_keys: Vec<EventKeyType>,
2931+
pub timeout_ms: u64,
29302932
}
29312933

29322934
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd)]

testnet/stacks-node/src/event_dispatcher.rs

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ use super::config::{EventKeyType, EventObserverConfig};
6969
#[derive(Debug, Clone)]
7070
struct EventObserver {
7171
endpoint: String,
72+
timeout: Duration,
7273
}
7374

7475
struct ReceiptPayloadInfo<'a> {
@@ -335,8 +336,7 @@ impl EventObserver {
335336
.parse()
336337
.unwrap_or(PeerHost::DNS(host.to_string(), port));
337338

338-
let backoff = Duration::from_millis(1000); // 1 second
339-
339+
let mut backoff = Duration::from_millis(100);
340340
loop {
341341
let mut request = StacksHttpRequest::new_for_peer(
342342
peerhost.clone(),
@@ -347,7 +347,7 @@ impl EventObserver {
347347
.unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request"));
348348
request.add_header("Connection".into(), "close".into());
349349

350-
match send_http_request(host, port, request, backoff) {
350+
match send_http_request(host, port, request, self.timeout) {
351351
Ok(response) => {
352352
if response.preamble().status_code == 200 {
353353
debug!(
@@ -368,6 +368,7 @@ impl EventObserver {
368368
}
369369
}
370370
sleep(backoff);
371+
backoff *= 2;
371372
}
372373
}
373374

@@ -1406,6 +1407,7 @@ impl EventDispatcher {
14061407
info!("Registering event observer at: {}", conf.endpoint);
14071408
let event_observer = EventObserver {
14081409
endpoint: conf.endpoint.clone(),
1410+
timeout: Duration::from_millis(conf.timeout_ms),
14091411
};
14101412

14111413
let observer_index = self.registered_observers.len() as u16;
@@ -1498,6 +1500,7 @@ mod test {
14981500
fn build_block_processed_event() {
14991501
let observer = EventObserver {
15001502
endpoint: "nowhere".to_string(),
1503+
timeout: Duration::from_secs(3),
15011504
};
15021505

15031506
let filtered_events = vec![];
@@ -1558,6 +1561,7 @@ mod test {
15581561
fn test_block_processed_event_nakamoto() {
15591562
let observer = EventObserver {
15601563
endpoint: "nowhere".to_string(),
1564+
timeout: Duration::from_secs(3),
15611565
};
15621566

15631567
let filtered_events = vec![];
@@ -1699,6 +1703,7 @@ mod test {
16991703

17001704
let observer = EventObserver {
17011705
endpoint: format!("127.0.0.1:{}", port),
1706+
timeout: Duration::from_secs(3),
17021707
};
17031708

17041709
let payload = json!({"key": "value"});
@@ -1749,6 +1754,7 @@ mod test {
17491754

17501755
let observer = EventObserver {
17511756
endpoint: format!("127.0.0.1:{}", port),
1757+
timeout: Duration::from_secs(3),
17521758
};
17531759

17541760
let payload = json!({"key": "value"});
@@ -1759,4 +1765,69 @@ mod test {
17591765
rx.recv_timeout(Duration::from_secs(5))
17601766
.expect("Server did not receive request in time");
17611767
}
1768+
1769+
#[test]
1770+
fn test_send_payload_timeout() {
1771+
let port = get_random_port();
1772+
let timeout = Duration::from_secs(3);
1773+
1774+
// Set up a channel to notify when the server has processed the request
1775+
let (tx, rx) = channel();
1776+
1777+
// Start a mock server in a separate thread
1778+
let server = Server::http(format!("127.0.0.1:{}", port)).unwrap();
1779+
thread::spawn(move || {
1780+
let mut attempt = 0;
1781+
let mut _request_holder = None;
1782+
while let Ok(request) = server.recv() {
1783+
attempt += 1;
1784+
if attempt == 1 {
1785+
debug!("Mock server received request attempt 1");
1786+
// Do not reply, forcing the sender to timeout and retry,
1787+
// but don't drop the request or it will receive a 500 error,
1788+
_request_holder = Some(request);
1789+
} else {
1790+
debug!("Mock server received request attempt 2");
1791+
// Simulate a successful response on the second attempt
1792+
let response = Response::from_string("HTTP/1.1 200 OK");
1793+
request.respond(response).unwrap();
1794+
1795+
// Notify the test that the request was processed successfully
1796+
tx.send(()).unwrap();
1797+
break;
1798+
}
1799+
}
1800+
});
1801+
1802+
let observer = EventObserver {
1803+
endpoint: format!("127.0.0.1:{}", port),
1804+
timeout,
1805+
};
1806+
1807+
let payload = json!({"key": "value"});
1808+
1809+
// Record the time before sending the payload
1810+
let start_time = Instant::now();
1811+
1812+
// Call the function being tested
1813+
observer.send_payload(&payload, "/test");
1814+
1815+
// Record the time after the function returns
1816+
let elapsed_time = start_time.elapsed();
1817+
1818+
println!("Elapsed time: {:?}", elapsed_time);
1819+
assert!(
1820+
elapsed_time >= timeout,
1821+
"Expected a timeout, but the function returned too quickly"
1822+
);
1823+
1824+
assert!(
1825+
elapsed_time < timeout + Duration::from_secs(1),
1826+
"Expected a timeout, but the function took too long"
1827+
);
1828+
1829+
// Wait for the server to process the request
1830+
rx.recv_timeout(Duration::from_secs(5))
1831+
.expect("Server did not receive request in time");
1832+
}
17621833
}

testnet/stacks-node/src/tests/epoch_205.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ fn test_exact_block_costs() {
112112
conf.events_observers.insert(EventObserverConfig {
113113
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
114114
events_keys: vec![EventKeyType::AnyEvent, EventKeyType::MinedBlocks],
115+
timeout_ms: 1000,
115116
});
116117

117118
let mut btcd_controller = BitcoinCoreController::new(conf.clone());
@@ -338,6 +339,7 @@ fn test_dynamic_db_method_costs() {
338339
conf.events_observers.insert(EventObserverConfig {
339340
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
340341
events_keys: vec![EventKeyType::AnyEvent],
342+
timeout_ms: 1000,
341343
});
342344

343345
let mut btcd_controller = BitcoinCoreController::new(conf.clone());
@@ -775,6 +777,7 @@ fn test_cost_limit_switch_version205() {
775777
conf.events_observers.insert(EventObserverConfig {
776778
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
777779
events_keys: vec![EventKeyType::AnyEvent],
780+
timeout_ms: 1000,
778781
});
779782

780783
let mut btcd_controller = BitcoinCoreController::new(conf.clone());
@@ -1032,6 +1035,7 @@ fn bigger_microblock_streams_in_2_05() {
10321035
conf.events_observers.insert(EventObserverConfig {
10331036
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
10341037
events_keys: vec![EventKeyType::AnyEvent],
1038+
timeout_ms: 1000,
10351039
});
10361040

10371041
let mut btcd_controller = BitcoinCoreController::new(conf.clone());

testnet/stacks-node/src/tests/epoch_21.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ fn advance_to_2_1(
7575
conf.events_observers.insert(EventObserverConfig {
7676
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
7777
events_keys: vec![EventKeyType::AnyEvent],
78+
timeout_ms: 1000,
7879
});
7980

8081
let mut epochs = core::STACKS_EPOCHS_REGTEST.to_vec();
@@ -579,6 +580,7 @@ fn transition_fixes_bitcoin_rigidity() {
579580
conf.events_observers.insert(EventObserverConfig {
580581
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
581582
events_keys: vec![EventKeyType::AnyEvent],
583+
timeout_ms: 1000,
582584
});
583585

584586
let mut epochs = core::STACKS_EPOCHS_REGTEST.to_vec();
@@ -1476,6 +1478,7 @@ fn transition_removes_pox_sunset() {
14761478
conf.events_observers.insert(EventObserverConfig {
14771479
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
14781480
events_keys: vec![EventKeyType::AnyEvent],
1481+
timeout_ms: 1000,
14791482
});
14801483

14811484
conf.initial_balances.push(InitialBalance {
@@ -1791,6 +1794,7 @@ fn transition_empty_blocks() {
17911794
conf.events_observers.insert(EventObserverConfig {
17921795
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
17931796
events_keys: vec![EventKeyType::AnyEvent],
1797+
timeout_ms: 1000,
17941798
});
17951799

17961800
let keychain = Keychain::default(conf.node.seed.clone());
@@ -4740,6 +4744,7 @@ fn trait_invocation_cross_epoch() {
47404744
conf.events_observers.insert(EventObserverConfig {
47414745
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
47424746
events_keys: vec![EventKeyType::AnyEvent],
4747+
timeout_ms: 1000,
47434748
});
47444749
let mut epochs = core::STACKS_EPOCHS_REGTEST.to_vec();
47454750
epochs[1].end_height = epoch_2_05;
@@ -4986,6 +4991,7 @@ fn test_v1_unlock_height_with_current_stackers() {
49864991
conf.events_observers.insert(EventObserverConfig {
49874992
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
49884993
events_keys: vec![EventKeyType::AnyEvent],
4994+
timeout_ms: 1000,
49894995
});
49904996
conf.initial_balances.append(&mut initial_balances);
49914997

@@ -5251,6 +5257,7 @@ fn test_v1_unlock_height_with_delay_and_current_stackers() {
52515257
conf.events_observers.insert(EventObserverConfig {
52525258
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
52535259
events_keys: vec![EventKeyType::AnyEvent],
5260+
timeout_ms: 1000,
52545261
});
52555262
conf.initial_balances.append(&mut initial_balances);
52565263

testnet/stacks-node/src/tests/epoch_22.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ fn disable_pox() {
138138
conf.events_observers.insert(EventObserverConfig {
139139
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
140140
events_keys: vec![EventKeyType::AnyEvent],
141+
timeout_ms: 1000,
141142
});
142143
conf.initial_balances.append(&mut initial_balances);
143144

@@ -671,6 +672,7 @@ fn pox_2_unlock_all() {
671672
conf.events_observers.insert(EventObserverConfig {
672673
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
673674
events_keys: vec![EventKeyType::AnyEvent],
675+
timeout_ms: 1000,
674676
});
675677
conf.initial_balances.append(&mut initial_balances);
676678

testnet/stacks-node/src/tests/epoch_23.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ fn trait_invocation_behavior() {
104104
conf.events_observers.insert(EventObserverConfig {
105105
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
106106
events_keys: vec![EventKeyType::AnyEvent],
107+
timeout_ms: 1000,
107108
});
108109
conf.initial_balances.append(&mut initial_balances);
109110

testnet/stacks-node/src/tests/epoch_24.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ fn fix_to_pox_contract() {
156156
conf.events_observers.insert(EventObserverConfig {
157157
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
158158
events_keys: vec![EventKeyType::AnyEvent],
159+
timeout_ms: 1000,
159160
});
160161
conf.initial_balances.append(&mut initial_balances);
161162

@@ -795,6 +796,7 @@ fn verify_auto_unlock_behavior() {
795796
conf.events_observers.insert(EventObserverConfig {
796797
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
797798
events_keys: vec![EventKeyType::AnyEvent],
799+
timeout_ms: 1000,
798800
});
799801
conf.initial_balances.append(&mut initial_balances);
800802

testnet/stacks-node/src/tests/epoch_25.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ fn microblocks_disabled() {
8787
conf.events_observers.insert(EventObserverConfig {
8888
endpoint: format!("localhost:{}", test_observer::EVENT_OBSERVER_PORT),
8989
events_keys: vec![EventKeyType::AnyEvent],
90+
timeout_ms: 1000,
9091
});
9192
conf.initial_balances.append(&mut initial_balances);
9293

0 commit comments

Comments
 (0)