Skip to content

Commit 79f9460

Browse files
committed
feat: payment poller component
1 parent 23ba2f4 commit 79f9460

File tree

9 files changed

+197
-13
lines changed

9 files changed

+197
-13
lines changed

aggregation_mode/Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/batcher/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ aligned-sdk = { workspace = true }
1111
tracing = { version = "0.1", features = ["log"] }
1212
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
1313
actix-web = "4"
14+
alloy = { workspace = true }
15+
tokio = { version = "1", features = ["time"]}
1416
# TODO: enable tls
15-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid" ] }
17+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal" ] }
1618
hex = "0.4"

aggregation_mode/batcher/abi/AggregationModePaymentService.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

aggregation_mode/batcher/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub struct Config {
77
pub port: u16,
88
pub db_connection_url: String,
99
pub eth_rpc_url: String,
10+
pub payment_service_address: String,
1011
}
1112

1213
impl Config {

aggregation_mode/batcher/src/db.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use sqlx::{postgres::PgPoolOptions, types::Uuid, Pool, Postgres};
1+
use sqlx::{
2+
postgres::PgPoolOptions,
3+
types::{BigDecimal, Uuid},
4+
Pool, Postgres,
5+
};
26

37
#[derive(Clone, Debug)]
48
pub struct Db {
@@ -74,10 +78,33 @@ impl Db {
7478
.await
7579
}
7680

81+
pub async fn insert_payment_event(
82+
&self,
83+
address: &str,
84+
started_at: &BigDecimal,
85+
amount: &BigDecimal,
86+
valid_until: &BigDecimal,
87+
tx_hash: &str,
88+
) -> Result<(), sqlx::Error> {
89+
sqlx::query(
90+
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
91+
VALUES ($1, $2, $3, $4, $5)
92+
ON CONFLICT (tx_hash) DO NOTHING",
93+
)
94+
.bind(address)
95+
.bind(started_at)
96+
.bind(amount)
97+
.bind(valid_until)
98+
.bind(tx_hash)
99+
.execute(&self.pool)
100+
.await
101+
.map(|_| ())
102+
}
103+
77104
pub async fn has_active_payment_event(
78105
&self,
79106
address: &str,
80-
now_ts: i64,
107+
epoch: i64,
81108
) -> Result<bool, sqlx::Error> {
82109
sqlx::query_scalar::<_, bool>(
83110
"SELECT EXISTS(
@@ -86,7 +113,7 @@ impl Db {
86113
)",
87114
)
88115
.bind(address)
89-
.bind(now_ts)
116+
.bind(epoch)
90117
.fetch_one(&self.pool)
91118
.await
92119
}

aggregation_mode/batcher/src/main.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::env;
22

33
use agg_mode_batcher::config::Config;
4+
use agg_mode_batcher::payments::PaymentsPooler;
45
use agg_mode_batcher::{db::Db, server::http::BatcherServer};
56
use tracing_subscriber::{EnvFilter, FmtSubscriber};
67

@@ -16,7 +17,7 @@ fn read_config_filepath_from_args() -> String {
1617
args[1].clone()
1718
}
1819

19-
#[actix_web::main]
20+
#[tokio::main]
2021
async fn main() {
2122
let filter = EnvFilter::new("info,sp1_cuda=warn");
2223
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
@@ -31,8 +32,12 @@ async fn main() {
3132
.await
3233
.expect("db to start");
3334

34-
let http_server = BatcherServer::new(db.clone(), config.clone());
35+
let payment_poller = PaymentsPooler::new(db.clone(), config.clone());
36+
let http_server = BatcherServer::new(db, config.clone());
3537

36-
tracing::info!("Starting server at port {}", config.port);
37-
http_server.start().await.expect("Server to keep running");
38+
let payment_poller_handle = tokio::spawn(async move { payment_poller.start().await });
39+
let http_server_handle = tokio::spawn(async move { http_server.start().await });
40+
41+
// TODO: abort the process if one stops instead of waiting for them both
42+
let _ = tokio::join!(payment_poller_handle, http_server_handle);
3843
}
Lines changed: 129 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,139 @@
1+
use std::str::FromStr;
2+
13
use crate::{config::Config, db::Db};
4+
use alloy::{
5+
primitives::Address,
6+
providers::{Provider, ProviderBuilder},
7+
sol,
8+
};
9+
use sqlx::types::BigDecimal;
10+
11+
sol!(
12+
#[sol(rpc)]
13+
AggregationModePaymentService,
14+
"abi/AggregationModePaymentService.json"
15+
);
16+
17+
type AggregationModePaymentServiceContract =
18+
AggregationModePaymentService::AggregationModePaymentServiceInstance<
19+
alloy::providers::fillers::FillProvider<
20+
alloy::providers::fillers::JoinFill<
21+
alloy::providers::Identity,
22+
alloy::providers::fillers::JoinFill<
23+
alloy::providers::fillers::GasFiller,
24+
alloy::providers::fillers::JoinFill<
25+
alloy::providers::fillers::BlobGasFiller,
26+
alloy::providers::fillers::JoinFill<
27+
alloy::providers::fillers::NonceFiller,
28+
alloy::providers::fillers::ChainIdFiller,
29+
>,
30+
>,
31+
>,
32+
>,
33+
alloy::providers::RootProvider,
34+
>,
35+
>;
36+
type RpcProvider = alloy::providers::fillers::FillProvider<
37+
alloy::providers::fillers::JoinFill<
38+
alloy::providers::Identity,
39+
alloy::providers::fillers::JoinFill<
40+
alloy::providers::fillers::GasFiller,
41+
alloy::providers::fillers::JoinFill<
42+
alloy::providers::fillers::BlobGasFiller,
43+
alloy::providers::fillers::JoinFill<
44+
alloy::providers::fillers::NonceFiller,
45+
alloy::providers::fillers::ChainIdFiller,
46+
>,
47+
>,
48+
>,
49+
>,
50+
alloy::providers::RootProvider,
51+
>;
252

3-
struct PaymentsPooler {
53+
pub struct PaymentsPooler {
454
db: Db,
55+
proof_aggregation_service: AggregationModePaymentServiceContract,
56+
rpc_provider: RpcProvider,
557
}
658

759
impl PaymentsPooler {
860
pub fn new(db: Db, config: Config) -> Self {
9-
Self { db }
61+
let rpc_url = config.eth_rpc_url.parse().expect("RPC URL should be valid");
62+
let rpc_provider = ProviderBuilder::new().connect_http(rpc_url);
63+
let proof_aggregation_service = AggregationModePaymentService::new(
64+
Address::from_str(&config.payment_service_address)
65+
.expect("AggregationModePaymentService address should be valid"),
66+
rpc_provider.clone(),
67+
);
68+
69+
Self {
70+
db,
71+
proof_aggregation_service,
72+
rpc_provider: rpc_provider,
73+
}
1074
}
1175

12-
pub async fn start() {}
76+
pub async fn start(&self) {
77+
let seconds_to_wait_between_polls = 12;
78+
loop {
79+
let Ok(current_block) = self.rpc_provider.get_block_number().await else {
80+
tracing::warn!("Could not get current block skipping polling iteration...");
81+
tokio::time::sleep(std::time::Duration::from_secs(
82+
seconds_to_wait_between_polls,
83+
))
84+
.await;
85+
continue;
86+
};
87+
88+
let Ok(logs) = self
89+
.proof_aggregation_service
90+
.UserPayment_filter()
91+
.from_block(current_block - 5)
92+
.to_block(current_block)
93+
.query()
94+
.await
95+
else {
96+
tracing::warn!("Could not get payment log events skipping polling iteration...");
97+
tokio::time::sleep(std::time::Duration::from_secs(
98+
seconds_to_wait_between_polls,
99+
))
100+
.await;
101+
continue;
102+
};
103+
104+
tracing::info!("Logs collected {}", logs.len());
105+
106+
for (payment_event, log) in logs {
107+
let address = format!("{:#x}", payment_event.user);
108+
let Some(tx_hash) = log.transaction_hash else {
109+
tracing::warn!("Skipping payment event for {address}: missing tx hash");
110+
continue;
111+
};
112+
let tx_hash = format!("{:#x}", tx_hash);
113+
114+
let Ok(amount) = BigDecimal::from_str(&payment_event.amount.to_string()) else {
115+
continue;
116+
};
117+
let Ok(started_at) = BigDecimal::from_str(&payment_event.from.to_string()) else {
118+
continue;
119+
};
120+
let Ok(valid_until) = BigDecimal::from_str(&payment_event.until.to_string()) else {
121+
continue;
122+
};
123+
124+
if let Err(err) = self
125+
.db
126+
.insert_payment_event(&address, &started_at, &amount, &valid_until, &tx_hash)
127+
.await
128+
{
129+
tracing::error!("Failed to insert payment event for {address}: {err}");
130+
}
131+
}
132+
133+
tokio::time::sleep(std::time::Duration::from_secs(
134+
seconds_to_wait_between_polls,
135+
))
136+
.await;
137+
}
138+
}
13139
}

aggregation_mode/db/migrations/001_init.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ CREATE TABLE proofs (
1818
CREATE TABLE payment_events (
1919
payment_event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
2020
address CHAR(42),
21-
started_at INTEGER,
22-
valid_until INTEGER
21+
amount BIGINT,
22+
started_at BIGINT,
23+
valid_until BIGINT,
24+
tx_hash CHAR(66) UNIQUE
2325
);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
port: 8089
22
db_connection_url: "postgres://postgres:postgres@localhost:5435/"
33
eth_rpc_url: "http://localhost:8545"
4+
payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe"

0 commit comments

Comments
 (0)