Skip to content

Commit d472330

Browse files
committed
feat: orchestrator for payment poller
1 parent 440dc33 commit d472330

File tree

4 files changed

+36
-24
lines changed

4 files changed

+36
-24
lines changed

aggregation_mode/db/src/orchestrator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl DbOrchestartor {
7474
}
7575

7676
let nodes = connection_urls
77-
.into_iter()
77+
.iter()
7878
.map(|url| {
7979
let pool = PgPoolOptions::new().max_connections(5).connect_lazy(url)?;
8080

aggregation_mode/payments_poller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ serde = { workspace = true }
88
serde_json = { workspace = true }
99
serde_yaml = { workspace = true }
1010
aligned-sdk = { workspace = true }
11+
db = { workspace = true }
1112
tracing = { version = "0.1", features = ["log"] }
1213
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
1314
actix-web = "4"
Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use sqlx::{postgres::PgPoolOptions, types::BigDecimal, Pool, Postgres};
1+
use db::{orchestrator::DbOrchestartor, retry::RetryConfig};
2+
use sqlx::types::BigDecimal;
23

34
#[derive(Clone, Debug)]
45
pub struct Db {
5-
pool: Pool<Postgres>,
6+
orchestartor: DbOrchestartor,
67
}
78

89
#[derive(Debug, Clone)]
@@ -11,14 +12,19 @@ pub enum DbError {
1112
}
1213

1314
impl Db {
14-
pub async fn try_new(connection_url: &str) -> Result<Self, DbError> {
15-
let pool = PgPoolOptions::new()
16-
.max_connections(5)
17-
.connect(connection_url)
18-
.await
19-
.map_err(|e| DbError::ConnectError(e.to_string()))?;
15+
pub async fn try_new(connection_urls: &[&str]) -> Result<Self, DbError> {
16+
let orchestartor = DbOrchestartor::try_new(
17+
connection_urls,
18+
RetryConfig {
19+
factor: 0.0,
20+
max_delay_seconds: 0,
21+
max_times: 0,
22+
min_delay_millis: 0,
23+
},
24+
)
25+
.map_err(|e| DbError::ConnectError(e.to_string()))?;
2026

21-
Ok(Self { pool })
27+
Ok(Self { orchestartor })
2228
}
2329

2430
pub async fn insert_payment_event(
@@ -29,18 +35,23 @@ impl Db {
2935
valid_until: &BigDecimal,
3036
tx_hash: &str,
3137
) -> Result<(), sqlx::Error> {
32-
sqlx::query(
33-
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
34-
VALUES ($1, $2, $3, $4, $5)
35-
ON CONFLICT (tx_hash) DO NOTHING",
36-
)
37-
.bind(address.to_lowercase())
38-
.bind(started_at)
39-
.bind(amount)
40-
.bind(valid_until)
41-
.bind(tx_hash)
42-
.execute(&self.pool)
43-
.await
44-
.map(|_| ())
38+
self.orchestartor
39+
.write(async |pool| {
40+
sqlx::query(
41+
"INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash)
42+
VALUES ($1, $2, $3, $4, $5)
43+
ON CONFLICT (tx_hash) DO NOTHING",
44+
)
45+
.bind(address.to_lowercase())
46+
.bind(started_at)
47+
.bind(amount)
48+
.bind(valid_until)
49+
.bind(tx_hash)
50+
.execute(&pool)
51+
.await?;
52+
53+
Ok(())
54+
})
55+
.await
4556
}
4657
}

aggregation_mode/payments_poller/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn main() {
2626
let config = Config::from_file(&config_file_path).expect("Config is valid");
2727
tracing::info!("Config loaded");
2828

29-
let db = Db::try_new(&config.db_connection_url)
29+
let db = Db::try_new(&[config.db_connection_url.as_str()])
3030
.await
3131
.expect("db to start");
3232

0 commit comments

Comments
 (0)