Skip to content

Commit d097484

Browse files
committed
feat: add error logs for model insert error
1 parent 8d954bf commit d097484

File tree

5 files changed

+105
-13
lines changed

5 files changed

+105
-13
lines changed

common/src/indexer_service/http/indexer_service.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use tracing::{info, info_span};
3737

3838
use crate::escrow_accounts::EscrowAccounts;
3939
use crate::escrow_accounts::EscrowAccountsError;
40+
use crate::tap::{ValueCheckReceiver, ValueCheckSender};
4041
use crate::{
4142
address::public_key,
4243
indexer_service::http::static_subgraph::static_subgraph_request_handler,
@@ -177,6 +178,8 @@ where
177178
pub release: IndexerServiceRelease,
178179
pub url_namespace: &'static str,
179180
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
181+
pub value_check_receiver: ValueCheckReceiver,
182+
pub value_check_sender: ValueCheckSender,
180183
}
181184

182185
pub struct IndexerServiceState<I>
@@ -191,6 +194,7 @@ where
191194
// tap
192195
pub escrow_accounts: Eventual<EscrowAccounts>,
193196
pub domain_separator: Eip712Domain,
197+
pub value_check_sender: ValueCheckSender,
194198
}
195199

196200
pub struct IndexerService {}
@@ -313,6 +317,7 @@ impl IndexerService {
313317
domain_separator.clone(),
314318
timestamp_error_tolerance,
315319
receipt_max_value,
320+
options.value_check_receiver,
316321
)
317322
.await;
318323

@@ -329,6 +334,7 @@ impl IndexerService {
329334
service_impl: Arc::new(options.service_impl),
330335
escrow_accounts,
331336
domain_separator,
337+
value_check_sender: options.value_check_sender,
332338
});
333339

334340
// Rate limits by allowing bursts of 10 requests and requiring 100ms of

common/src/tap.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::tap::checks::deny_list_check::DenyListCheck;
66
use crate::tap::checks::receipt_max_val_check::ReceiptMaxValueCheck;
77
use crate::tap::checks::sender_balance_check::SenderBalanceCheck;
88
use crate::tap::checks::timestamp_check::TimestampCheck;
9+
use crate::tap::checks::value_check::MinimumValue;
910
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
1011
use alloy::dyn_abi::Eip712Domain;
1112
use eventuals::Eventual;
@@ -23,6 +24,11 @@ use tracing::error;
2324
mod checks;
2425
mod receipt_store;
2526

27+
pub use checks::value_check::{
28+
create_value_check, CostModelSource, ValueCheckReceiver, ValueCheckSender,
29+
};
30+
31+
#[derive(Clone)]
2632
pub struct IndexerTapContext {
2733
domain_separator: Arc<Eip712Domain>,
2834

@@ -44,6 +50,7 @@ impl IndexerTapContext {
4450
domain_separator: Eip712Domain,
4551
timestamp_error_tolerance: Duration,
4652
receipt_max_value: u128,
53+
value_check_receiver: ValueCheckReceiver,
4754
) -> Vec<ReceiptCheck> {
4855
vec![
4956
Arc::new(AllocationEligible::new(indexer_allocations)),
@@ -54,6 +61,7 @@ impl IndexerTapContext {
5461
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
5562
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
5663
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
64+
Arc::new(MinimumValue::new(value_check_receiver)),
5765
]
5866
}
5967

common/src/tap/checks/value_check.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{
1212
time::Duration,
1313
};
1414
use thegraph_core::DeploymentId;
15-
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
15+
use tokio::{
16+
sync::mpsc::{Receiver, Sender},
17+
task::JoinHandle,
18+
};
1619
use ttl_cache::TtlCache;
1720

1821
use tap_core::{
@@ -31,10 +34,39 @@ pub struct MinimumValue {
3134
query_handle: JoinHandle<()>,
3235
}
3336

37+
#[derive(Clone)]
38+
pub struct ValueCheckSender {
39+
pub tx_cost_model: Sender<CostModelSource>,
40+
pub tx_query: Sender<AgoraQuery>,
41+
}
42+
43+
pub struct ValueCheckReceiver {
44+
rx_cost_model: Receiver<CostModelSource>,
45+
rx_query: Receiver<AgoraQuery>,
46+
}
47+
48+
pub fn create_value_check(size: usize) -> (ValueCheckSender, ValueCheckReceiver) {
49+
let (tx_cost_model, rx_cost_model) = tokio::sync::mpsc::channel(size);
50+
let (tx_query, rx_query) = tokio::sync::mpsc::channel(size);
51+
52+
(
53+
ValueCheckSender {
54+
tx_query,
55+
tx_cost_model,
56+
},
57+
ValueCheckReceiver {
58+
rx_cost_model,
59+
rx_query,
60+
},
61+
)
62+
}
63+
3464
impl MinimumValue {
3565
pub fn new(
36-
mut rx_cost_model: Receiver<CostModelSource>,
37-
mut rx_query: Receiver<AgoraQuery>,
66+
ValueCheckReceiver {
67+
mut rx_query,
68+
mut rx_cost_model,
69+
}: ValueCheckReceiver,
3870
) -> Self {
3971
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
4072
let query_ids = Arc::new(Mutex::new(HashMap::new()));
@@ -48,7 +80,12 @@ impl MinimumValue {
4880
let deployment_id = value.deployment_id;
4981

5082
if let Some(query) = cache.lock().unwrap().get_mut(&deployment_id) {
51-
let _ = query.insert_model(value);
83+
let _ = query.insert_model(value).inspect_err(|err| {
84+
tracing::error!(
85+
"Error while compiling cost model for deployment id {}. Error: {}",
86+
deployment_id, err
87+
)
88+
});
5289
} else {
5390
match CostModelCache::new(value) {
5491
Ok(value) => {
@@ -172,9 +209,9 @@ pub struct AgoraQuery {
172209

173210
#[derive(Clone, Eq, Hash, PartialEq)]
174211
pub struct CostModelSource {
175-
deployment_id: DeploymentId,
176-
model: String,
177-
variables: String,
212+
pub deployment_id: DeploymentId,
213+
pub model: String,
214+
pub variables: String,
178215
}
179216

180217
pub struct CostModelCache {

service/src/routes/cost.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use prometheus::{
1212
register_counter, register_counter_vec, register_histogram, register_histogram_vec, Counter,
1313
CounterVec, Histogram, HistogramVec,
1414
};
15+
use indexer_common::tap::CostModelSource;
1516
use serde::{Deserialize, Serialize};
1617
use serde_json::Value;
1718
use thegraph_core::DeploymentId;
@@ -66,6 +67,16 @@ pub struct GraphQlCostModel {
6667
pub variables: Option<Value>,
6768
}
6869

70+
impl From<CostModel> for CostModelSource {
71+
fn from(value: CostModel) -> Self {
72+
Self {
73+
deployment_id: value.deployment,
74+
model: value.model.unwrap_or_default(),
75+
variables: value.variables.unwrap_or_default().to_string(),
76+
}
77+
}
78+
}
79+
6980
impl From<CostModel> for GraphQlCostModel {
7081
fn from(model: CostModel) -> Self {
7182
Self {
@@ -127,8 +138,20 @@ impl Query {
127138
ctx: &Context<'_>,
128139
deployment_ids: Vec<DeploymentId>,
129140
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
130-
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
141+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
142+
143+
let cost_model_sender = &state.value_check_sender;
144+
145+
let pool = &state.database;
131146
let cost_models = database::cost_models(pool, &deployment_ids).await?;
147+
148+
for model in &cost_models {
149+
let _ = cost_model_sender
150+
.tx_cost_model
151+
.send(CostModelSource::from(model.clone()))
152+
.await;
153+
}
154+
132155
Ok(cost_models.into_iter().map(|m| m.into()).collect())
133156
}
134157

@@ -137,10 +160,20 @@ impl Query {
137160
ctx: &Context<'_>,
138161
deployment_id: DeploymentId,
139162
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
163+
164+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
165+
let cost_model_sender = &state.value_check_sender;
140166
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
141-
database::cost_model(pool, &deployment_id)
142-
.await
143-
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
167+
let model = database::cost_model(pool, &deployment_id).await?;
168+
169+
if let Some(model) = &model {
170+
let _ = cost_model_sender
171+
.tx_cost_model
172+
.send(CostModelSource::from(model.clone()))
173+
.await;
174+
}
175+
176+
Ok(model.map(GraphQlCostModel::from))
144177
}
145178
}
146179

service/src/service.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::time::Duration;
77
use super::{config::Config, error::SubgraphServiceError, routes};
88
use anyhow::anyhow;
99
use axum::{async_trait, routing::post, Json, Router};
10-
use indexer_common::indexer_service::http::{
11-
AttestationOutput, IndexerServiceImpl, IndexerServiceResponse,
10+
use indexer_common::{
11+
indexer_service::http::{AttestationOutput, IndexerServiceImpl, IndexerServiceResponse},
12+
tap::{create_value_check, ValueCheckSender},
1213
};
1314
use indexer_config::Config as MainConfig;
1415
use reqwest::Url;
@@ -67,6 +68,7 @@ pub struct SubgraphServiceState {
6768
pub graph_node_client: reqwest::Client,
6869
pub graph_node_status_url: String,
6970
pub graph_node_query_base_url: String,
71+
pub value_check_sender: ValueCheckSender,
7072
}
7173

7274
struct SubgraphService {
@@ -146,6 +148,9 @@ pub async fn run() -> anyhow::Result<()> {
146148
build_info::build_info!(fn build_info);
147149
let release = IndexerServiceRelease::from(build_info());
148150

151+
// arbitrary value
152+
let (value_check_sender, value_check_receiver) = create_value_check(10);
153+
149154
// Some of the subgraph service configuration goes into the so-called
150155
// "state", which will be passed to any request handler, middleware etc.
151156
// that is involved in serving requests
@@ -172,6 +177,7 @@ pub async fn run() -> anyhow::Result<()> {
172177
.expect("config must have `common.graph_node.query_url` set")
173178
.query_base_url
174179
.clone(),
180+
value_check_sender: value_check_sender.clone(),
175181
});
176182

177183
IndexerService::run(IndexerServiceOptions {
@@ -183,6 +189,8 @@ pub async fn run() -> anyhow::Result<()> {
183189
.route("/cost", post(routes::cost::cost))
184190
.route("/status", post(routes::status))
185191
.with_state(state),
192+
value_check_receiver,
193+
value_check_sender,
186194
})
187195
.await
188196
}

0 commit comments

Comments
 (0)