Skip to content

Commit 9a529b0

Browse files
feat(aggregation-mode): store last fetched block in payments poller (#2205)
1 parent d623ed0 commit 9a529b0

File tree

5 files changed

+78
-7
lines changed

5 files changed

+78
-7
lines changed

aggregation_mode/payments_poller/src/config.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fs::File, io::Read};
1+
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
22

33
use serde::{Deserialize, Serialize};
44

@@ -7,6 +7,12 @@ pub struct Config {
77
pub db_connection_url: String,
88
pub eth_rpc_url: String,
99
pub payment_service_address: String,
10+
pub last_block_fetched_filepath: String,
11+
}
12+
13+
#[derive(Debug, Deserialize, Serialize)]
14+
pub struct LastBlockFetched {
15+
pub last_block_fetched: u64,
1016
}
1117

1218
impl Config {
@@ -17,4 +23,30 @@ impl Config {
1723
let config: Config = serde_yaml::from_str(&contents)?;
1824
Ok(config)
1925
}
26+
27+
pub fn get_last_block_fetched(&self) -> Result<u64, Box<dyn std::error::Error>> {
28+
let mut file = File::open(&self.last_block_fetched_filepath)?;
29+
let mut contents = String::new();
30+
file.read_to_string(&mut contents)?;
31+
let lbf_struct: LastBlockFetched = serde_json::from_str(&contents)?;
32+
Ok(lbf_struct.last_block_fetched)
33+
}
34+
35+
pub fn update_last_block_fetched(
36+
&self,
37+
last_block_fetched: u64,
38+
) -> Result<(), Box<dyn std::error::Error>> {
39+
let last_block_fetched_struct = LastBlockFetched { last_block_fetched };
40+
41+
let mut file = OpenOptions::new()
42+
.write(true)
43+
.truncate(true)
44+
.create(true)
45+
.open(&self.last_block_fetched_filepath)?;
46+
47+
let content = serde_json::to_string(&last_block_fetched_struct)?;
48+
file.write_all(content.as_bytes())?;
49+
50+
Ok(())
51+
}
2052
}

aggregation_mode/payments_poller/src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ async fn main() {
3030
.await
3131
.expect("db to start");
3232

33-
let payment_poller = PaymentsPoller::new(db, config);
34-
payment_poller.start().await;
33+
let payments_poller = match PaymentsPoller::new(db, config) {
34+
Ok(poller) => poller,
35+
Err(err) => {
36+
tracing::error!("Failed to create Payments Poller: {err:?}");
37+
return;
38+
}
39+
};
40+
41+
payments_poller.start().await;
3542
}

aggregation_mode/payments_poller/src/payments.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@ use alloy::{
1111
};
1212
use sqlx::types::BigDecimal;
1313

14+
#[derive(Debug, Clone)]
15+
pub enum PaymentsPollerError {
16+
ReadLastBlockError(String),
17+
}
18+
1419
pub struct PaymentsPoller {
1520
db: Db,
1621
proof_aggregation_service: AggregationModePaymentServiceContract,
1722
rpc_provider: RpcProvider,
23+
config: Config,
1824
}
1925

2026
impl PaymentsPoller {
21-
pub fn new(db: Db, config: Config) -> Self {
27+
pub fn new(db: Db, config: Config) -> Result<Self, PaymentsPollerError> {
2228
let rpc_url = config.eth_rpc_url.parse().expect("RPC URL should be valid");
2329
let rpc_provider = ProviderBuilder::new().connect_http(rpc_url);
2430
let proof_aggregation_service = AggregationModePaymentService::new(
@@ -27,16 +33,32 @@ impl PaymentsPoller {
2733
rpc_provider.clone(),
2834
);
2935

30-
Self {
36+
// This check is here to catch early failures on last block fetching
37+
let _ = config
38+
.get_last_block_fetched()
39+
.map_err(|err| PaymentsPollerError::ReadLastBlockError(err.to_string()));
40+
41+
Ok(Self {
3142
db,
3243
proof_aggregation_service,
3344
rpc_provider,
34-
}
45+
config,
46+
})
3547
}
3648

3749
pub async fn start(&self) {
3850
let seconds_to_wait_between_polls = 12;
51+
3952
loop {
53+
let Ok(last_block_fetched) = self.config.get_last_block_fetched() else {
54+
tracing::warn!("Could not get last block fetched, skipping polling iteration...");
55+
tokio::time::sleep(std::time::Duration::from_secs(
56+
seconds_to_wait_between_polls,
57+
))
58+
.await;
59+
continue;
60+
};
61+
4062
let Ok(current_block) = self.rpc_provider.get_block_number().await else {
4163
tracing::warn!("Could not get current block skipping polling iteration...");
4264
tokio::time::sleep(std::time::Duration::from_secs(
@@ -46,10 +68,13 @@ impl PaymentsPoller {
4668
continue;
4769
};
4870

71+
let start_block = last_block_fetched.saturating_sub(5);
72+
tracing::info!("Fetching logs from block {start_block} to {current_block}");
73+
4974
let Ok(logs) = self
5075
.proof_aggregation_service
5176
.UserPayment_filter()
52-
.from_block(current_block - 5)
77+
.from_block(start_block)
5378
.to_block(current_block)
5479
.query()
5580
.await
@@ -91,6 +116,11 @@ impl PaymentsPoller {
91116
}
92117
}
93118

119+
if let Err(err) = self.config.update_last_block_fetched(current_block) {
120+
tracing::error!("Failed to update the last aggregated block: {err}");
121+
continue;
122+
};
123+
94124
tokio::time::sleep(std::time::Duration::from_secs(
95125
seconds_to_wait_between_polls,
96126
))

config-files/config-agg-mode-gateway.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ eth_rpc_url: "http://localhost:8545"
44
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
55
network: "devnet"
66
max_daily_proofs_per_user: 4
7+
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"last_block_fetched":0}

0 commit comments

Comments
 (0)