Skip to content

Commit 0d8b99b

Browse files
authored
feat(aggregation_mode): save last processed block in a file (#1872)
1 parent fb7e037 commit 0d8b99b

File tree

8 files changed

+81
-42
lines changed

8 files changed

+81
-42
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ batcher/aligned/batch_inclusion_responses/*
1212
**/broadcast
1313
volume
1414
config-files/*.last_processed_batch.json
15+
config-files/*.last_aggregated_block.json
1516

1617
nonce_*.bin
1718

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
1-
use serde::Deserialize;
2-
use std::{fs::File, io::Read};
1+
use serde::{Deserialize, Serialize};
2+
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
33

4-
#[derive(Debug, Deserialize)]
4+
#[derive(Debug, Deserialize, Serialize)]
55
pub struct ECDSAConfig {
66
pub private_key_store_path: String,
77
pub private_key_store_password: String,
88
}
99

10-
#[derive(Debug, Deserialize)]
10+
#[derive(Debug, Deserialize, Serialize)]
11+
pub struct LastAggregatedBlock {
12+
pub last_aggregated_block: u64,
13+
}
14+
15+
#[derive(Debug, Deserialize, Serialize)]
1116
pub struct Config {
1217
pub eth_rpc_url: String,
1318
pub eth_ws_url: String,
1419
pub max_proofs_in_queue: u16,
1520
pub proof_aggregation_service_address: String,
1621
pub aligned_service_manager_address: String,
22+
pub last_aggregated_block_filepath: String,
1723
pub ecdsa: ECDSAConfig,
18-
pub fetch_logs_from_secs_ago: u64,
19-
pub block_time_secs: u64,
2024
}
2125

2226
impl Config {
@@ -27,4 +31,32 @@ impl Config {
2731
let config: Config = serde_yaml::from_str(&contents)?;
2832
Ok(config)
2933
}
34+
35+
pub fn get_last_aggregated_block(&self) -> Result<u64, Box<dyn std::error::Error>> {
36+
let mut file = File::open(&self.last_aggregated_block_filepath)?;
37+
let mut contents = String::new();
38+
file.read_to_string(&mut contents)?;
39+
let lab: LastAggregatedBlock = serde_json::from_str(&contents)?;
40+
Ok(lab.last_aggregated_block)
41+
}
42+
43+
pub fn update_last_aggregated_block(
44+
&self,
45+
last_aggregated_block: u64,
46+
) -> Result<(), Box<dyn std::error::Error>> {
47+
let last_aggregated_block_struct = LastAggregatedBlock {
48+
last_aggregated_block,
49+
};
50+
51+
let mut file = OpenOptions::new()
52+
.write(true)
53+
.truncate(true)
54+
.create(true)
55+
.open(&self.last_aggregated_block_filepath)?;
56+
57+
let content = serde_json::to_string(&last_aggregated_block_struct)?;
58+
file.write_all(content.as_bytes())?;
59+
60+
Ok(())
61+
}
3062
}

aggregation_mode/src/backend/fetcher.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ pub enum ProofsFetcherError {
2424
pub struct ProofsFetcher {
2525
rpc_provider: RPCProvider,
2626
aligned_service_manager: AlignedLayerServiceManagerContract,
27-
fetch_from_secs_ago: u64,
28-
block_time_secs: u64,
27+
last_aggregated_block: u64,
2928
}
3029

3130
impl ProofsFetcher {
@@ -38,31 +37,49 @@ impl ProofsFetcher {
3837
rpc_provider.clone(),
3938
);
4039

40+
let last_aggregated_block = config.get_last_aggregated_block().unwrap();
41+
4142
Self {
4243
rpc_provider,
4344
aligned_service_manager,
44-
fetch_from_secs_ago: config.fetch_logs_from_secs_ago,
45-
block_time_secs: config.block_time_secs,
45+
last_aggregated_block,
4646
}
4747
}
4848

49-
pub async fn fetch(&self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
50-
let from_block = self.get_block_number_to_fetch_from().await?;
49+
pub async fn fetch(&mut self) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
50+
// Get current block
51+
let current_block = self
52+
.rpc_provider
53+
.get_block_number()
54+
.await
55+
.map_err(|e| ProofsFetcherError::GetBlockNumber(e.to_string()))?;
56+
57+
if current_block < self.last_aggregated_block {
58+
return Err(ProofsFetcherError::GetBlockNumber(
59+
"Invalid last processed block".to_string(),
60+
));
61+
}
62+
5163
info!(
52-
"Fetching proofs from batch logs starting from block number {}",
53-
from_block
64+
"Fetching proofs from batch logs starting from block number {} upto {}",
65+
self.last_aggregated_block, current_block
5466
);
67+
5568
// Subscribe to NewBatch event from AlignedServiceManager
5669
let logs = self
5770
.aligned_service_manager
5871
.NewBatchV3_filter()
59-
.from_block(from_block)
72+
.from_block(self.last_aggregated_block)
73+
.to_block(current_block)
6074
.query()
6175
.await
6276
.map_err(|e| ProofsFetcherError::GetLogs(e.to_string()))?;
6377

6478
info!("Logs collected {}", logs.len());
6579

80+
// Update last processed block after collecting logs
81+
self.last_aggregated_block = current_block;
82+
6683
let mut proofs = vec![];
6784

6885
for (batch, _) in logs {
@@ -119,15 +136,7 @@ impl ProofsFetcher {
119136
Ok(proofs)
120137
}
121138

122-
async fn get_block_number_to_fetch_from(&self) -> Result<u64, ProofsFetcherError> {
123-
let block_number = self
124-
.rpc_provider
125-
.get_block_number()
126-
.await
127-
.map_err(|e| ProofsFetcherError::GetBlockNumber(e.to_string()))?;
128-
129-
let number_of_blocks_in_the_past = self.fetch_from_secs_ago / self.block_time_secs;
130-
131-
Ok(block_number.saturating_sub(number_of_blocks_in_the_past))
139+
pub fn get_last_aggregated_block(&self) -> u64 {
140+
self.last_aggregated_block
132141
}
133142
}

aggregation_mode/src/backend/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ impl ProofAggregator {
6969
}
7070
}
7171

72-
pub async fn start(&mut self) {
72+
pub async fn start(&mut self, config: &Config) {
7373
info!("Starting proof aggregator service",);
7474

7575
info!("About to aggregate and submit proof to be verified on chain");
7676
let res = self.aggregate_and_submit_proofs_on_chain().await;
7777

7878
match res {
7979
Ok(()) => {
80+
config
81+
.update_last_aggregated_block(self.fetcher.get_last_aggregated_block())
82+
.unwrap();
8083
info!("Process finished successfully");
8184
}
8285
Err(err) => {

aggregation_mode/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ async fn main() {
2727
tracing::info!("Config loaded");
2828

2929
let mut proof_aggregator = ProofAggregator::new(&config);
30-
proof_aggregator.start().await;
30+
proof_aggregator.start(&config).await;
3131
}
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
aligned_service_manager_address: "0x851356ae760d987E095750cCeb3bC6014560891C"
2-
proof_aggregation_service_address: "0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07"
3-
eth_rpc_url: "http://localhost:8545"
4-
eth_ws_url: "ws://localhost:8545"
1+
eth_rpc_url: http://localhost:8545
2+
eth_ws_url: ws://localhost:8545
53
max_proofs_in_queue: 1000
6-
# How far in the past should the service go to fetch batch logs
7-
fetch_logs_from_secs_ago: 86400 # 24hs
8-
# Anvil start with block time is 7 seconds
9-
block_time_secs: 7
10-
4+
proof_aggregation_service_address: 0xB0D4afd8879eD9F52b28595d31B441D079B2Ca07
5+
aligned_service_manager_address: 0x851356ae760d987E095750cCeb3bC6014560891C
6+
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
117
ecdsa:
12-
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"
13-
private_key_store_password: ""
8+
private_key_store_path: config-files/anvil.proof-aggregator.ecdsa.key.json
9+
private_key_store_password: ''

config-files/config-proof-aggregator.yaml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@ proof_aggregation_service_address: "0xcbEAF3BDe82155F56486Fb5a1072cb8baAf547cc"
33
eth_rpc_url: "http://localhost:8545"
44
eth_ws_url: "ws://localhost:8545"
55
max_proofs_in_queue: 1000
6-
# How far in the past should the service go to fetch batch logs
7-
fetch_logs_from_secs_ago: 86400 # 24hs
8-
# Anvil start with block time is 7 seconds
9-
block_time_secs: 7
6+
last_aggregated_block_filepath: config-files/proof-aggregator.last_aggregated_block.json
107

118
ecdsa:
129
private_key_store_path: "config-files/anvil.proof-aggregator.ecdsa.key.json"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"last_aggregated_block":0}

0 commit comments

Comments
 (0)