Skip to content

Commit fcc9f9d

Browse files
committed
Use history in keeper
1 parent 55af6c1 commit fcc9f9d

File tree

7 files changed

+95
-130
lines changed

7 files changed

+95
-130
lines changed

apps/fortuna/src/api.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use {
2626
tokio::sync::RwLock,
2727
url::Url,
2828
};
29-
pub use {chain_ids::*, index::*, live::*, metrics::*, ready::*, revelation::*, explorer::*};
29+
pub use {chain_ids::*, explorer::*, index::*, live::*, metrics::*, ready::*, revelation::*};
3030

3131
mod chain_ids;
3232
mod explorer;
@@ -72,6 +72,15 @@ pub struct TimedJournalLog {
7272
pub log: JournalLog,
7373
}
7474

75+
impl TimedJournalLog {
76+
pub fn with_current_time(log: JournalLog) -> Self {
77+
TimedJournalLog {
78+
timestamp: chrono::Utc::now(),
79+
log,
80+
}
81+
}
82+
}
83+
7584
#[derive(Clone, Debug, Serialize, ToSchema)]
7685
pub struct RequestJournal {
7786
pub chain_id: ChainId,
@@ -161,9 +170,9 @@ impl History {
161170
min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC),
162171
)
163172
..(
164-
chain_id.clone(),
165-
max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
166-
),
173+
chain_id.clone(),
174+
max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
175+
),
167176
);
168177
range
169178
.rev()
@@ -407,7 +416,12 @@ mod test {
407416
ApiBlockChainState::Initialized(avax_state),
408417
);
409418

410-
let api_state = ApiState::new(Arc::new(RwLock::new(chains)), metrics_registry, Default::default()).await;
419+
let api_state = ApiState::new(
420+
Arc::new(RwLock::new(chains)),
421+
metrics_registry,
422+
Default::default(),
423+
)
424+
.await;
411425

412426
let app = api::routes(api_state);
413427
(TestServer::new(app).unwrap(), eth_read, avax_read)

apps/fortuna/src/api/explorer.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::api::{
2-
ChainId, RequestJournal, RestError
3-
,
4-
};
1+
use crate::api::{ChainId, RequestJournal, RestError};
52
use axum::extract::{Query, State};
63
use axum::Json;
74
use ethers::types::TxHash;

apps/fortuna/src/chain/ethereum.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ethers::contract::LogMeta;
12
use {
23
crate::{
34
api::ChainId,
@@ -285,14 +286,15 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
285286
.to_block(to_block)
286287
.topic1(provider);
287288

288-
let res: Vec<RequestedWithCallbackFilter> = event.query().await?;
289+
let res: Vec<(RequestedWithCallbackFilter, LogMeta)> = event.query_with_meta().await?;
289290

290291
Ok(res
291292
.iter()
292-
.map(|r| RequestedWithCallbackEvent {
293+
.map(|(r, meta)| RequestedWithCallbackEvent {
293294
sequence_number: r.sequence_number,
294295
user_random_number: r.user_random_number,
295296
provider_address: r.request.provider,
297+
tx_hash: meta.transaction_hash,
296298
})
297299
.filter(|r| r.provider_address == provider)
298300
.collect())

apps/fortuna/src/chain/reader.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ethers::types::TxHash;
12
use {
23
anyhow::Result,
34
axum::async_trait,
@@ -34,6 +35,7 @@ pub struct RequestedWithCallbackEvent {
3435
pub sequence_number: u64,
3536
pub user_random_number: [u8; 32],
3637
pub provider_address: Address,
38+
pub tx_hash: TxHash,
3739
}
3840

3941
/// EntropyReader is the read-only interface of the Entropy contract.

apps/fortuna/src/keeper.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::api::History;
2+
use crate::keeper::block::ProcessParams;
23
use crate::keeper::track::track_block_timestamp_lag;
34
use {
45
crate::{
@@ -60,7 +61,7 @@ pub async fn run_keeper_threads(
6061
chain_eth_config: EthereumConfig,
6162
chain_state: BlockchainState,
6263
metrics: Arc<KeeperMetrics>,
63-
_history: Arc<RwLock<History>>,
64+
history: Arc<RwLock<History>>,
6465
rpc_metrics: Arc<RpcMetrics>,
6566
) -> anyhow::Result<()> {
6667
tracing::info!("Starting keeper");
@@ -82,18 +83,22 @@ pub async fn run_keeper_threads(
8283

8384
// Spawn a thread to handle the events from last backlog_range blocks.
8485
let gas_limit: U256 = chain_eth_config.gas_limit.into();
86+
let process_params = ProcessParams {
87+
chain_state: chain_state.clone(),
88+
contract: contract.clone(),
89+
gas_limit,
90+
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
91+
metrics: metrics.clone(),
92+
fulfilled_requests_cache,
93+
history,
94+
};
8595
spawn(
8696
process_backlog(
97+
process_params.clone(),
8798
BlockRange {
8899
from: latest_safe_block.saturating_sub(chain_eth_config.backlog_range),
89100
to: latest_safe_block,
90101
},
91-
contract.clone(),
92-
gas_limit,
93-
chain_eth_config.escalation_policy.to_policy(),
94-
chain_state.clone(),
95-
metrics.clone(),
96-
fulfilled_requests_cache.clone(),
97102
chain_eth_config.block_delays.clone(),
98103
)
99104
.in_current_span(),
@@ -114,13 +119,8 @@ pub async fn run_keeper_threads(
114119
// Spawn a thread for block processing with configured delays
115120
spawn(
116121
process_new_blocks(
117-
chain_state.clone(),
122+
process_params.clone(),
118123
rx,
119-
Arc::clone(&contract),
120-
gas_limit,
121-
chain_eth_config.escalation_policy.to_policy(),
122-
metrics.clone(),
123-
fulfilled_requests_cache.clone(),
124124
chain_eth_config.block_delays.clone(),
125125
)
126126
.in_current_span(),

apps/fortuna/src/keeper/block.rs

Lines changed: 36 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
crate::{
3-
api::{self, BlockchainState},
3+
api::{BlockchainState, History},
44
chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
55
eth_utils::utils::EscalationPolicy,
66
keeper::keeper_metrics::KeeperMetrics,
@@ -36,6 +36,17 @@ pub struct BlockRange {
3636
pub to: BlockNumber,
3737
}
3838

39+
#[derive(Clone)]
40+
pub struct ProcessParams {
41+
pub contract: Arc<InstrumentedSignablePythContract>,
42+
pub gas_limit: U256,
43+
pub escalation_policy: EscalationPolicy,
44+
pub chain_state: BlockchainState,
45+
pub metrics: Arc<KeeperMetrics>,
46+
pub history: Arc<RwLock<History>>,
47+
pub fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
48+
}
49+
3950
/// Get the latest safe block number for the chain. Retry internally if there is an error.
4051
pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
4152
loop {
@@ -63,15 +74,7 @@ pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber
6374
#[tracing::instrument(skip_all, fields(
6475
range_from_block = block_range.from, range_to_block = block_range.to
6576
))]
66-
pub async fn process_block_range(
67-
block_range: BlockRange,
68-
contract: Arc<InstrumentedSignablePythContract>,
69-
gas_limit: U256,
70-
escalation_policy: EscalationPolicy,
71-
chain_state: api::BlockchainState,
72-
metrics: Arc<KeeperMetrics>,
73-
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
74-
) {
77+
pub async fn process_block_range(block_range: BlockRange, process_params: ProcessParams) {
7578
let BlockRange {
7679
from: first_block,
7780
to: last_block,
@@ -89,12 +92,7 @@ pub async fn process_block_range(
8992
from: current_block,
9093
to: to_block,
9194
},
92-
contract.clone(),
93-
gas_limit,
94-
escalation_policy.clone(),
95-
chain_state.clone(),
96-
metrics.clone(),
97-
fulfilled_requests_cache.clone(),
95+
process_params.clone(),
9896
)
9997
.in_current_span()
10098
.await;
@@ -110,22 +108,15 @@ pub async fn process_block_range(
110108
#[tracing::instrument(name = "batch", skip_all, fields(
111109
batch_from_block = block_range.from, batch_to_block = block_range.to
112110
))]
113-
pub async fn process_single_block_batch(
114-
block_range: BlockRange,
115-
contract: Arc<InstrumentedSignablePythContract>,
116-
gas_limit: U256,
117-
escalation_policy: EscalationPolicy,
118-
chain_state: api::BlockchainState,
119-
metrics: Arc<KeeperMetrics>,
120-
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
121-
) {
111+
pub async fn process_single_block_batch(block_range: BlockRange, process_params: ProcessParams) {
122112
loop {
123-
let events_res = chain_state
113+
let events_res = process_params
114+
.chain_state
124115
.contract
125116
.get_request_with_callback_events(
126117
block_range.from,
127118
block_range.to,
128-
chain_state.provider_address,
119+
process_params.chain_state.provider_address,
129120
)
130121
.await;
131122

@@ -134,21 +125,15 @@ pub async fn process_single_block_batch(
134125
tracing::info!(num_of_events = &events.len(), "Processing",);
135126
for event in &events {
136127
// the write lock guarantees we spawn only one task per sequence number
137-
let newly_inserted = fulfilled_requests_cache
128+
let newly_inserted = process_params
129+
.fulfilled_requests_cache
138130
.write()
139131
.await
140132
.insert(event.sequence_number);
141133
if newly_inserted {
142134
spawn(
143-
process_event_with_backoff(
144-
event.clone(),
145-
chain_state.clone(),
146-
contract.clone(),
147-
gas_limit,
148-
escalation_policy.clone(),
149-
metrics.clone(),
150-
)
151-
.in_current_span(),
135+
process_event_with_backoff(event.clone(), process_params.clone())
136+
.in_current_span(),
152137
);
153138
}
154139
}
@@ -288,100 +273,56 @@ pub async fn watch_blocks(
288273
/// It waits on rx channel to receive block ranges and then calls process_block_range to process them
289274
/// for each configured block delay.
290275
#[tracing::instrument(skip_all)]
291-
#[allow(clippy::too_many_arguments)]
292276
pub async fn process_new_blocks(
293-
chain_state: BlockchainState,
277+
process_params: ProcessParams,
294278
mut rx: mpsc::Receiver<BlockRange>,
295-
contract: Arc<InstrumentedSignablePythContract>,
296-
gas_limit: U256,
297-
escalation_policy: EscalationPolicy,
298-
metrics: Arc<KeeperMetrics>,
299-
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
300279
block_delays: Vec<u64>,
301280
) {
302281
tracing::info!("Waiting for new block ranges to process");
303282
loop {
304283
if let Some(block_range) = rx.recv().await {
305284
// Process blocks immediately first
306-
process_block_range(
307-
block_range.clone(),
308-
Arc::clone(&contract),
309-
gas_limit,
310-
escalation_policy.clone(),
311-
chain_state.clone(),
312-
metrics.clone(),
313-
fulfilled_requests_cache.clone(),
314-
)
315-
.in_current_span()
316-
.await;
285+
process_block_range(block_range.clone(), process_params.clone())
286+
.in_current_span()
287+
.await;
317288

318289
// Then process with each configured delay
319290
for delay in &block_delays {
320291
let adjusted_range = BlockRange {
321292
from: block_range.from.saturating_sub(*delay),
322293
to: block_range.to.saturating_sub(*delay),
323294
};
324-
process_block_range(
325-
adjusted_range,
326-
Arc::clone(&contract),
327-
gas_limit,
328-
escalation_policy.clone(),
329-
chain_state.clone(),
330-
metrics.clone(),
331-
fulfilled_requests_cache.clone(),
332-
)
333-
.in_current_span()
334-
.await;
295+
process_block_range(adjusted_range, process_params.clone())
296+
.in_current_span()
297+
.await;
335298
}
336299
}
337300
}
338301
}
339302

340303
/// Processes the backlog_range for a chain.
341304
/// It processes the backlog range for each configured block delay.
342-
#[allow(clippy::too_many_arguments)]
343305
#[tracing::instrument(skip_all)]
344306
pub async fn process_backlog(
307+
process_params: ProcessParams,
345308
backlog_range: BlockRange,
346-
contract: Arc<InstrumentedSignablePythContract>,
347-
gas_limit: U256,
348-
escalation_policy: EscalationPolicy,
349-
chain_state: BlockchainState,
350-
metrics: Arc<KeeperMetrics>,
351-
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
352309
block_delays: Vec<u64>,
353310
) {
354311
tracing::info!("Processing backlog");
355312
// Process blocks immediately first
356-
process_block_range(
357-
backlog_range.clone(),
358-
Arc::clone(&contract),
359-
gas_limit,
360-
escalation_policy.clone(),
361-
chain_state.clone(),
362-
metrics.clone(),
363-
fulfilled_requests_cache.clone(),
364-
)
365-
.in_current_span()
366-
.await;
313+
process_block_range(backlog_range.clone(), process_params.clone())
314+
.in_current_span()
315+
.await;
367316

368317
// Then process with each configured delay
369318
for delay in &block_delays {
370319
let adjusted_range = BlockRange {
371320
from: backlog_range.from.saturating_sub(*delay),
372321
to: backlog_range.to.saturating_sub(*delay),
373322
};
374-
process_block_range(
375-
adjusted_range,
376-
Arc::clone(&contract),
377-
gas_limit,
378-
escalation_policy.clone(),
379-
chain_state.clone(),
380-
metrics.clone(),
381-
fulfilled_requests_cache.clone(),
382-
)
383-
.in_current_span()
384-
.await;
323+
process_block_range(adjusted_range, process_params.clone())
324+
.in_current_span()
325+
.await;
385326
}
386327
tracing::info!("Backlog processed");
387328
}

0 commit comments

Comments
 (0)