Skip to content

Commit 6de9f43

Browse files
committed
refactor: use just pglistener
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent fb3ac69 commit 6de9f43

File tree

5 files changed

+81
-124
lines changed

5 files changed

+81
-124
lines changed

common/src/indexer_service/http/indexer_service.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use tracing::{info, info_span};
3737

3838
use crate::escrow_accounts::EscrowAccounts;
3939
use crate::escrow_accounts::EscrowAccountsError;
40-
use crate::tap::{ValueCheckReceiver, ValueCheckSender};
4140
use crate::{
4241
address::public_key,
4342
indexer_service::http::static_subgraph::static_subgraph_request_handler,
@@ -178,8 +177,6 @@ where
178177
pub release: IndexerServiceRelease,
179178
pub url_namespace: &'static str,
180179
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
181-
pub value_check_receiver: ValueCheckReceiver,
182-
pub value_check_sender: ValueCheckSender,
183180
}
184181

185182
pub struct IndexerServiceState<I>
@@ -194,7 +191,6 @@ where
194191
// tap
195192
pub escrow_accounts: Eventual<EscrowAccounts>,
196193
pub domain_separator: Eip712Domain,
197-
pub value_check_sender: ValueCheckSender,
198194
}
199195

200196
pub struct IndexerService {}
@@ -317,7 +313,6 @@ impl IndexerService {
317313
domain_separator.clone(),
318314
timestamp_error_tolerance,
319315
receipt_max_value,
320-
options.value_check_receiver,
321316
)
322317
.await;
323318

@@ -334,7 +329,6 @@ impl IndexerService {
334329
service_impl: Arc::new(options.service_impl),
335330
escrow_accounts,
336331
domain_separator,
337-
value_check_sender: options.value_check_sender,
338332
});
339333

340334
// Rate limits by allowing bursts of 10 requests and requiring 100ms of

common/src/tap.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use tracing::error;
2424
mod checks;
2525
mod receipt_store;
2626

27-
pub use checks::value_check::{
28-
create_value_check, AgoraQuery, CostModelSource, ValueCheckReceiver, ValueCheckSender,
29-
};
27+
pub use checks::value_check::{AgoraQuery, CostModelSource};
3028

3129
#[derive(Clone)]
3230
pub struct IndexerTapContext {
@@ -50,7 +48,6 @@ impl IndexerTapContext {
5048
domain_separator: Eip712Domain,
5149
timestamp_error_tolerance: Duration,
5250
receipt_max_value: u128,
53-
value_check_receiver: ValueCheckReceiver,
5451
) -> Vec<ReceiptCheck> {
5552
vec![
5653
Arc::new(AllocationEligible::new(indexer_allocations)),
@@ -59,9 +56,9 @@ impl IndexerTapContext {
5956
domain_separator.clone(),
6057
)),
6158
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
62-
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
59+
Arc::new(DenyListCheck::new(pgpool.clone(), escrow_accounts, domain_separator).await),
6360
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
64-
Arc::new(MinimumValue::new(value_check_receiver)),
61+
Arc::new(MinimumValue::new(pgpool).await),
6562
]
6663
}
6764

common/src/tap/checks/value_check.rs

Lines changed: 75 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,15 @@ use anyhow::anyhow;
55
use bigdecimal::ToPrimitive;
66
use cost_model::CostModel;
77
use sqlx::{postgres::PgListener, PgPool};
8-
use tracing::error;
98
use std::{
109
cmp::min,
1110
collections::HashMap,
1211
sync::{Arc, Mutex},
1312
time::Duration,
1413
};
1514
use thegraph_core::DeploymentId;
16-
use tokio::{
17-
sync::mpsc::{Receiver, Sender},
18-
task::JoinHandle,
19-
};
15+
use tokio::task::JoinHandle;
16+
use tracing::error;
2017
use ttl_cache::TtlCache;
2118

2219
use tap_core::receipt::{
@@ -30,60 +27,25 @@ pub struct MinimumValue {
3027
model_handle: JoinHandle<()>,
3128
}
3229

33-
#[derive(Clone)]
34-
pub struct ValueCheckSender {
35-
pub tx_cost_model: Sender<CostModelSource>,
36-
}
37-
38-
pub struct ValueCheckReceiver {
39-
rx_cost_model: Receiver<CostModelSource>,
40-
}
30+
impl MinimumValue {
31+
pub async fn new(pgpool: PgPool) -> Self {
32+
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
4133

42-
pub fn create_value_check(size: usize) -> (ValueCheckSender, ValueCheckReceiver) {
43-
let (tx_cost_model, rx_cost_model) = tokio::sync::mpsc::channel(size);
34+
let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
35+
pglistener.listen("cost_models_update_notify").await.expect(
36+
"should be able to subscribe to Postgres Notify events on the channel \
37+
'cost_models_update_notify'",
38+
);
4439

45-
(
46-
ValueCheckSender { tx_cost_model },
47-
ValueCheckReceiver { rx_cost_model },
48-
)
49-
}
40+
// TODO start watcher
41+
let cancel_token = tokio_util::sync::CancellationToken::new();
5042

51-
impl MinimumValue {
52-
pub fn new(ValueCheckReceiver { mut rx_cost_model }: ValueCheckReceiver) -> Self {
53-
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
54-
let cache = cost_model_cache.clone();
55-
let model_handle = tokio::spawn(async move {
56-
loop {
57-
let model = rx_cost_model.recv().await;
58-
match model {
59-
Some(value) => {
60-
let deployment_id = value.deployment_id;
61-
62-
if let Some(query) = cache.lock().unwrap().get_mut(&deployment_id) {
63-
let _ = query.insert_model(value).inspect_err(|err| {
64-
tracing::error!(
65-
"Error while compiling cost model for deployment id {}. Error: {}",
66-
deployment_id, err
67-
)
68-
});
69-
} else {
70-
match CostModelCache::new(value) {
71-
Ok(value) => {
72-
cache.lock().unwrap().insert(deployment_id, value);
73-
}
74-
Err(err) => {
75-
tracing::error!(
76-
"Error while compiling cost model for deployment id {}. Error: {}",
77-
deployment_id, err
78-
)
79-
}
80-
}
81-
}
82-
}
83-
None => break,
84-
}
85-
}
86-
});
43+
let model_handle = tokio::spawn(Self::cost_models_watcher(
44+
pgpool.clone(),
45+
pglistener,
46+
cost_model_cache.clone(),
47+
cancel_token.clone(),
48+
));
8749

8850
Self {
8951
cost_model_cache,
@@ -92,17 +54,11 @@ impl MinimumValue {
9254
}
9355

9456
async fn cost_models_watcher(
95-
pgpool: PgPool,
57+
_pgpool: PgPool,
9658
mut pglistener: PgListener,
97-
denylist: Arc<Mutex<HashMap<DeploymentId, CostModelCache>>>,
59+
cost_model_cache: Arc<Mutex<HashMap<DeploymentId, CostModelCache>>>,
9860
cancel_token: tokio_util::sync::CancellationToken,
9961
) {
100-
#[derive(serde::Deserialize)]
101-
struct DenylistNotification {
102-
tg_op: String,
103-
deployment: DeploymentId,
104-
}
105-
10662
loop {
10763
tokio::select! {
10864
_ = cancel_token.cancelled() => {
@@ -112,39 +68,58 @@ impl MinimumValue {
11268
pg_notification = pglistener.recv() => {
11369
let pg_notification = pg_notification.expect(
11470
"should be able to receive Postgres Notify events on the channel \
115-
'scalar_tap_deny_notification'",
71+
'cost_models_update_notify'",
11672
);
11773

118-
let denylist_notification: DenylistNotification =
74+
let cost_model_notification: CostModelNotification =
11975
serde_json::from_str(pg_notification.payload()).expect(
12076
"should be able to deserialize the Postgres Notify event payload as a \
121-
DenylistNotification",
77+
CostModelNotification",
12278
);
12379

124-
match denylist_notification.tg_op.as_str() {
80+
let deployment_id = cost_model_notification.deployment;
81+
82+
match cost_model_notification.tg_op.as_str() {
12583
"INSERT" => {
126-
denylist
127-
.write()
128-
.unwrap()
129-
.insert(denylist_notification.sender_address);
84+
let cost_model_source: CostModelSource = cost_model_notification.into();
85+
let mut cost_model_cache = cost_model_cache
86+
.lock()
87+
.unwrap();
88+
89+
match cost_model_cache.get_mut(&deployment_id) {
90+
Some(cache) => {
91+
let _ = cache.insert_model(cost_model_source);
92+
},
93+
None => {
94+
if let Ok(cache) = CostModelCache::new(cost_model_source).inspect_err(|err| {
95+
tracing::error!(
96+
"Error while compiling cost model for deployment id {}. Error: {}",
97+
deployment_id, err
98+
)
99+
}) {
100+
cost_model_cache.insert(deployment_id, cache);
101+
}
102+
},
103+
}
130104
}
131105
"DELETE" => {
132-
denylist
133-
.write()
106+
cost_model_cache
107+
.lock()
134108
.unwrap()
135-
.remove(&denylist_notification.sender_address);
109+
.remove(&cost_model_notification.deployment);
136110
}
137-
// UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist.
111+
// UPDATE and TRUNCATE are not expected to happen. Reload the entire cost
112+
// model cache.
138113
_ => {
139114
error!(
140-
"Received an unexpected denylist table notification: {}. Reloading entire \
141-
denylist.",
142-
denylist_notification.tg_op
115+
"Received an unexpected cost model table notification: {}. Reloading entire \
116+
cost model.",
117+
cost_model_notification.tg_op
143118
);
144119

145-
Self::sender_denylist_reload(pgpool.clone(), denylist.clone())
146-
.await
147-
.expect("should be able to reload the sender denylist")
120+
// Self::sender_denylist_reload(pgpool.clone(), denylist.clone())
121+
// .await
122+
// .expect("should be able to reload cost models")
148123
}
149124
}
150125
}
@@ -229,6 +204,24 @@ pub struct CostModelSource {
229204
pub variables: String,
230205
}
231206

207+
#[derive(serde::Deserialize)]
208+
struct CostModelNotification {
209+
tg_op: String,
210+
deployment: DeploymentId,
211+
model: String,
212+
variables: String,
213+
}
214+
215+
impl From<CostModelNotification> for CostModelSource {
216+
fn from(value: CostModelNotification) -> Self {
217+
CostModelSource {
218+
deployment_id: value.deployment,
219+
model: value.model,
220+
variables: value.variables,
221+
}
222+
}
223+
}
224+
232225
pub struct CostModelCache {
233226
models: TtlCache<CostModelSource, CostModel>,
234227
latest_model: CostModel,

service/src/routes/cost.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use std::sync::Arc;
77
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
88
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
99
use axum::extract::State;
10+
use indexer_common::tap::CostModelSource;
1011
use lazy_static::lazy_static;
1112
use prometheus::{
1213
register_counter, register_counter_vec, register_histogram, register_histogram_vec, Counter,
1314
CounterVec, Histogram, HistogramVec,
1415
};
15-
use indexer_common::tap::CostModelSource;
1616
use serde::{Deserialize, Serialize};
1717
use serde_json::Value;
1818
use thegraph_core::DeploymentId;
@@ -140,18 +140,9 @@ impl Query {
140140
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
141141
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
142142

143-
let cost_model_sender = &state.value_check_sender;
144-
145143
let pool = &state.database;
146144
let cost_models = database::cost_models(pool, &deployment_ids).await?;
147145

148-
for model in &cost_models {
149-
let _ = cost_model_sender
150-
.tx_cost_model
151-
.send(CostModelSource::from(model.clone()))
152-
.await;
153-
}
154-
155146
Ok(cost_models.into_iter().map(|m| m.into()).collect())
156147
}
157148

@@ -160,19 +151,9 @@ impl Query {
160151
ctx: &Context<'_>,
161152
deployment_id: DeploymentId,
162153
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
163-
164-
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
165-
let cost_model_sender = &state.value_check_sender;
166154
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
167155
let model = database::cost_model(pool, &deployment_id).await?;
168156

169-
if let Some(model) = &model {
170-
let _ = cost_model_sender
171-
.tx_cost_model
172-
.send(CostModelSource::from(model.clone()))
173-
.await;
174-
}
175-
176157
Ok(model.map(GraphQlCostModel::from))
177158
}
178159
}

service/src/service.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ use std::time::Duration;
77
use super::{config::Config, error::SubgraphServiceError, routes};
88
use anyhow::anyhow;
99
use axum::{async_trait, routing::post, Json, Router};
10-
use indexer_common::{
11-
indexer_service::http::{AttestationOutput, IndexerServiceImpl, IndexerServiceResponse},
12-
tap::{create_value_check, ValueCheckSender},
10+
use indexer_common::indexer_service::http::{
11+
AttestationOutput, IndexerServiceImpl, IndexerServiceResponse,
1312
};
1413
use indexer_config::Config as MainConfig;
1514
use reqwest::Url;
@@ -68,7 +67,6 @@ pub struct SubgraphServiceState {
6867
pub graph_node_client: reqwest::Client,
6968
pub graph_node_status_url: String,
7069
pub graph_node_query_base_url: String,
71-
pub value_check_sender: ValueCheckSender,
7270
}
7371

7472
struct SubgraphService {
@@ -148,9 +146,6 @@ pub async fn run() -> anyhow::Result<()> {
148146
build_info::build_info!(fn build_info);
149147
let release = IndexerServiceRelease::from(build_info());
150148

151-
// arbitrary value
152-
let (value_check_sender, value_check_receiver) = create_value_check(10);
153-
154149
// Some of the subgraph service configuration goes into the so-called
155150
// "state", which will be passed to any request handler, middleware etc.
156151
// that is involved in serving requests
@@ -177,7 +172,6 @@ pub async fn run() -> anyhow::Result<()> {
177172
.expect("config must have `common.graph_node.query_url` set")
178173
.query_base_url
179174
.clone(),
180-
value_check_sender: value_check_sender.clone(),
181175
});
182176

183177
IndexerService::run(IndexerServiceOptions {
@@ -189,8 +183,6 @@ pub async fn run() -> anyhow::Result<()> {
189183
.route("/cost", post(routes::cost::cost))
190184
.route("/status", post(routes::status))
191185
.with_state(state),
192-
value_check_receiver,
193-
value_check_sender,
194186
})
195187
.await
196188
}

0 commit comments

Comments
 (0)