Skip to content

Commit a668f13

Browse files
feat(aggregation-mode): store last fetched block in payments poller
1 parent d623ed0 commit a668f13

File tree

5 files changed

+72
-7
lines changed

5 files changed

+72
-7
lines changed

aggregation_mode/payments_poller/src/config.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fs::File, io::Read};
1+
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
22

33
use serde::{Deserialize, Serialize};
44

@@ -7,6 +7,12 @@ pub struct Config {
77
pub db_connection_url: String,
88
pub eth_rpc_url: String,
99
pub payment_service_address: String,
10+
pub last_block_fetched_filepath: String,
11+
}
12+
13+
#[derive(Debug, Deserialize, Serialize)]
14+
pub struct LastBlockFetched {
15+
pub last_block_fetched: u64,
1016
}
1117

1218
impl Config {
@@ -17,4 +23,30 @@ impl Config {
1723
let config: Config = serde_yaml::from_str(&contents)?;
1824
Ok(config)
1925
}
26+
27+
pub fn get_last_block_fetched(&self) -> Result<u64, Box<dyn std::error::Error>> {
28+
let mut file = File::open(&self.last_block_fetched_filepath)?;
29+
let mut contents = String::new();
30+
file.read_to_string(&mut contents)?;
31+
let lbf_struct: LastBlockFetched = serde_json::from_str(&contents)?;
32+
Ok(lbf_struct.last_block_fetched)
33+
}
34+
35+
pub fn update_last_block_fetched(
36+
&self,
37+
last_block_fetched: u64,
38+
) -> Result<(), Box<dyn std::error::Error>> {
39+
let last_block_fetched_struct = LastBlockFetched { last_block_fetched };
40+
41+
let mut file = OpenOptions::new()
42+
.write(true)
43+
.truncate(true)
44+
.create(true)
45+
.open(&self.last_block_fetched_filepath)?;
46+
47+
let content = serde_json::to_string(&last_block_fetched_struct)?;
48+
file.write_all(content.as_bytes())?;
49+
50+
Ok(())
51+
}
2052
}

aggregation_mode/payments_poller/src/main.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ async fn main() {
3030
.await
3131
.expect("db to start");
3232

33-
let payment_poller = PaymentsPoller::new(db, config);
34-
payment_poller.start().await;
33+
let payments_poller = match PaymentsPoller::new(db, config) {
34+
Ok(poller) => poller,
35+
Err(err) => {
36+
tracing::error!("Failed to create Payments Poller: {err}");
37+
return;
38+
}
39+
};
40+
41+
payments_poller.start().await;
3542
}

aggregation_mode/payments_poller/src/payments.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,17 @@ use alloy::{
1010
providers::{Provider, ProviderBuilder},
1111
};
1212
use sqlx::types::BigDecimal;
13+
use tracing::info;
1314

1415
pub struct PaymentsPoller {
1516
db: Db,
1617
proof_aggregation_service: AggregationModePaymentServiceContract,
1718
rpc_provider: RpcProvider,
19+
config: Config,
1820
}
1921

2022
impl PaymentsPoller {
21-
pub fn new(db: Db, config: Config) -> Self {
23+
pub fn new(db: Db, config: Config) -> Result<Self, Box<dyn std::error::Error>> {
2224
let rpc_url = config.eth_rpc_url.parse().expect("RPC URL should be valid");
2325
let rpc_provider = ProviderBuilder::new().connect_http(rpc_url);
2426
let proof_aggregation_service = AggregationModePaymentService::new(
@@ -27,16 +29,30 @@ impl PaymentsPoller {
2729
rpc_provider.clone(),
2830
);
2931

30-
Self {
32+
// This check is here to catch early failures on last block fetching
33+
let _ = config.get_last_block_fetched()?;
34+
35+
Ok(Self {
3136
db,
3237
proof_aggregation_service,
3338
rpc_provider,
34-
}
39+
config,
40+
})
3541
}
3642

3743
pub async fn start(&self) {
3844
let seconds_to_wait_between_polls = 12;
45+
3946
loop {
47+
let Ok(last_block_fetched) = self.config.get_last_block_fetched() else {
48+
tracing::warn!("Could not get last block fetched, skipping polling iteration...");
49+
tokio::time::sleep(std::time::Duration::from_secs(
50+
seconds_to_wait_between_polls,
51+
))
52+
.await;
53+
continue;
54+
};
55+
4056
let Ok(current_block) = self.rpc_provider.get_block_number().await else {
4157
tracing::warn!("Could not get current block skipping polling iteration...");
4258
tokio::time::sleep(std::time::Duration::from_secs(
@@ -46,10 +62,13 @@ impl PaymentsPoller {
4662
continue;
4763
};
4864

65+
let from = last_block_fetched.saturating_sub(5);
66+
info!("Fetching logs from block {from} to {current_block}");
67+
4968
let Ok(logs) = self
5069
.proof_aggregation_service
5170
.UserPayment_filter()
52-
.from_block(current_block - 5)
71+
.from_block(last_block_fetched - 5)
5372
.to_block(current_block)
5473
.query()
5574
.await
@@ -91,6 +110,11 @@ impl PaymentsPoller {
91110
}
92111
}
93112

113+
if let Err(err) = self.config.update_last_block_fetched(current_block) {
114+
tracing::error!("Failed to update the last aggregated block: {err}");
115+
continue;
116+
};
117+
94118
tokio::time::sleep(std::time::Duration::from_secs(
95119
seconds_to_wait_between_polls,
96120
))

config-files/config-agg-mode-gateway.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ eth_rpc_url: "http://localhost:8545"
44
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"
55
network: "devnet"
66
max_daily_proofs_per_user: 4
7+
last_block_fetched_filepath: "config-files/proof-aggregator.last_block_fetched.json"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"last_block_fetched":0}

0 commit comments

Comments
 (0)