Skip to content

Commit 0dc3d85

Browse files
committed
feat: add error logs for model insert error
1 parent bdf1581 commit 0dc3d85

File tree

5 files changed

+105
-13
lines changed

5 files changed

+105
-13
lines changed

common/src/indexer_service/http/indexer_service.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
3333
use tower_http::{cors, cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer};
3434
use tracing::{info, info_span};
3535

36+
use crate::tap::{ValueCheckReceiver, ValueCheckSender};
3637
use crate::{
3738
address::public_key,
3839
indexer_service::http::{
@@ -168,6 +169,8 @@ where
168169
pub url_namespace: &'static str,
169170
pub metrics_prefix: &'static str,
170171
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
172+
pub value_check_receiver: ValueCheckReceiver,
173+
pub value_check_sender: ValueCheckSender,
171174
}
172175

173176
pub struct IndexerServiceState<I>
@@ -179,6 +182,7 @@ where
179182
pub tap_manager: Manager<IndexerTapContext>,
180183
pub service_impl: Arc<I>,
181184
pub metrics: IndexerServiceMetrics,
185+
pub value_check_sender: ValueCheckSender,
182186
}
183187

184188
pub struct IndexerService {}
@@ -303,6 +307,7 @@ impl IndexerService {
303307
domain_separator.clone(),
304308
timestamp_error_tolerance,
305309
receipt_max_value,
310+
options.value_check_receiver,
306311
)
307312
.await;
308313

@@ -314,6 +319,7 @@ impl IndexerService {
314319
tap_manager,
315320
service_impl: Arc::new(options.service_impl),
316321
metrics,
322+
value_check_sender: options.value_check_sender,
317323
});
318324

319325
// Rate limits by allowing bursts of 10 requests and requiring 100ms of

common/src/tap.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::tap::checks::deny_list_check::DenyListCheck;
66
use crate::tap::checks::receipt_max_val_check::ReceiptMaxValueCheck;
77
use crate::tap::checks::sender_balance_check::SenderBalanceCheck;
88
use crate::tap::checks::timestamp_check::TimestampCheck;
9+
use crate::tap::checks::value_check::MinimumValue;
910
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
1011
use alloy::dyn_abi::Eip712Domain;
1112
use eventuals::Eventual;
@@ -23,6 +24,11 @@ use tracing::error;
2324
mod checks;
2425
mod receipt_store;
2526

27+
pub use checks::value_check::{
28+
create_value_check, CostModelSource, ValueCheckReceiver, ValueCheckSender,
29+
};
30+
31+
#[derive(Clone)]
2632
pub struct IndexerTapContext {
2733
domain_separator: Arc<Eip712Domain>,
2834

@@ -44,6 +50,7 @@ impl IndexerTapContext {
4450
domain_separator: Eip712Domain,
4551
timestamp_error_tolerance: Duration,
4652
receipt_max_value: u128,
53+
value_check_receiver: ValueCheckReceiver,
4754
) -> Vec<ReceiptCheck> {
4855
vec![
4956
Arc::new(AllocationEligible::new(indexer_allocations)),
@@ -54,6 +61,7 @@ impl IndexerTapContext {
5461
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
5562
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
5663
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
64+
Arc::new(MinimumValue::new(value_check_receiver)),
5765
]
5866
}
5967

common/src/tap/checks/value_check.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{
1212
time::Duration,
1313
};
1414
use thegraph_core::DeploymentId;
15-
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
15+
use tokio::{
16+
sync::mpsc::{Receiver, Sender},
17+
task::JoinHandle,
18+
};
1619
use ttl_cache::TtlCache;
1720

1821
use tap_core::{
@@ -31,10 +34,39 @@ pub struct MinimumValue {
3134
query_handle: JoinHandle<()>,
3235
}
3336

37+
#[derive(Clone)]
38+
pub struct ValueCheckSender {
39+
pub tx_cost_model: Sender<CostModelSource>,
40+
pub tx_query: Sender<AgoraQuery>,
41+
}
42+
43+
pub struct ValueCheckReceiver {
44+
rx_cost_model: Receiver<CostModelSource>,
45+
rx_query: Receiver<AgoraQuery>,
46+
}
47+
48+
pub fn create_value_check(size: usize) -> (ValueCheckSender, ValueCheckReceiver) {
49+
let (tx_cost_model, rx_cost_model) = tokio::sync::mpsc::channel(size);
50+
let (tx_query, rx_query) = tokio::sync::mpsc::channel(size);
51+
52+
(
53+
ValueCheckSender {
54+
tx_query,
55+
tx_cost_model,
56+
},
57+
ValueCheckReceiver {
58+
rx_cost_model,
59+
rx_query,
60+
},
61+
)
62+
}
63+
3464
impl MinimumValue {
3565
pub fn new(
36-
mut rx_cost_model: Receiver<CostModelSource>,
37-
mut rx_query: Receiver<AgoraQuery>,
66+
ValueCheckReceiver {
67+
mut rx_query,
68+
mut rx_cost_model,
69+
}: ValueCheckReceiver,
3870
) -> Self {
3971
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
4072
let query_ids = Arc::new(Mutex::new(HashMap::new()));
@@ -48,7 +80,12 @@ impl MinimumValue {
4880
let deployment_id = value.deployment_id;
4981

5082
if let Some(query) = cache.lock().unwrap().get_mut(&deployment_id) {
51-
let _ = query.insert_model(value);
83+
let _ = query.insert_model(value).inspect_err(|err| {
84+
tracing::error!(
85+
"Error while compiling cost model for deployment id {}. Error: {}",
86+
deployment_id, err
87+
)
88+
});
5289
} else {
5390
match CostModelCache::new(value) {
5491
Ok(value) => {
@@ -172,9 +209,9 @@ pub struct AgoraQuery {
172209

173210
#[derive(Clone, Eq, Hash, PartialEq)]
174211
pub struct CostModelSource {
175-
deployment_id: DeploymentId,
176-
model: String,
177-
variables: String,
212+
pub deployment_id: DeploymentId,
213+
pub model: String,
214+
pub variables: String,
178215
}
179216

180217
pub struct CostModelCache {

service/src/routes/cost.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
88
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
99
use axum::extract::State;
10+
use indexer_common::tap::CostModelSource;
1011
use serde::{Deserialize, Serialize};
1112
use serde_json::Value;
1213
use thegraph_core::DeploymentId;
@@ -21,6 +22,16 @@ pub struct GraphQlCostModel {
2122
pub variables: Option<Value>,
2223
}
2324

25+
impl From<CostModel> for CostModelSource {
26+
fn from(value: CostModel) -> Self {
27+
Self {
28+
deployment_id: value.deployment,
29+
model: value.model.unwrap_or_default(),
30+
variables: value.variables.unwrap_or_default().to_string(),
31+
}
32+
}
33+
}
34+
2435
impl From<CostModel> for GraphQlCostModel {
2536
fn from(model: CostModel) -> Self {
2637
Self {
@@ -45,8 +56,20 @@ impl Query {
4556
.into_iter()
4657
.map(|s| DeploymentId::from_str(&s))
4758
.collect::<Result<Vec<DeploymentId>, _>>()?;
48-
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
59+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
60+
61+
let cost_model_sender = &state.value_check_sender;
62+
63+
let pool = &state.database;
4964
let cost_models = database::cost_models(pool, &deployment_ids).await?;
65+
66+
for model in &cost_models {
67+
let _ = cost_model_sender
68+
.tx_cost_model
69+
.send(CostModelSource::from(model.clone()))
70+
.await;
71+
}
72+
5073
Ok(cost_models.into_iter().map(|m| m.into()).collect())
5174
}
5275

@@ -56,10 +79,20 @@ impl Query {
5679
deployment: String,
5780
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
5881
let deployment_id = DeploymentId::from_str(&deployment)?;
82+
83+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
84+
let cost_model_sender = &state.value_check_sender;
5985
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
60-
database::cost_model(pool, &deployment_id)
61-
.await
62-
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
86+
let model = database::cost_model(pool, &deployment_id).await?;
87+
88+
if let Some(model) = &model {
89+
let _ = cost_model_sender
90+
.tx_cost_model
91+
.send(CostModelSource::from(model.clone()))
92+
.await;
93+
}
94+
95+
Ok(model.map(GraphQlCostModel::from))
6396
}
6497
}
6598

service/src/service.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::time::Duration;
77
use super::{config::Config, error::SubgraphServiceError, routes};
88
use anyhow::anyhow;
99
use axum::{async_trait, routing::post, Json, Router};
10-
use indexer_common::indexer_service::http::{
11-
AttestationOutput, IndexerServiceImpl, IndexerServiceResponse,
10+
use indexer_common::{
11+
indexer_service::http::{AttestationOutput, IndexerServiceImpl, IndexerServiceResponse},
12+
tap::{create_value_check, ValueCheckSender},
1213
};
1314
use indexer_config::Config as MainConfig;
1415
use reqwest::Url;
@@ -67,6 +68,7 @@ pub struct SubgraphServiceState {
6768
pub graph_node_client: reqwest::Client,
6869
pub graph_node_status_url: String,
6970
pub graph_node_query_base_url: String,
71+
pub value_check_sender: ValueCheckSender,
7072
}
7173

7274
struct SubgraphService {
@@ -145,6 +147,9 @@ pub async fn run() -> anyhow::Result<()> {
145147
build_info::build_info!(fn build_info);
146148
let release = IndexerServiceRelease::from(build_info());
147149

150+
// arbitrary value
151+
let (value_check_sender, value_check_receiver) = create_value_check(10);
152+
148153
// Some of the subgraph service configuration goes into the so-called
149154
// "state", which will be passed to any request handler, middleware etc.
150155
// that is involved in serving requests
@@ -171,6 +176,7 @@ pub async fn run() -> anyhow::Result<()> {
171176
.expect("config must have `common.graph_node.query_url` set")
172177
.query_base_url
173178
.clone(),
179+
value_check_sender: value_check_sender.clone(),
174180
});
175181

176182
IndexerService::run(IndexerServiceOptions {
@@ -183,6 +189,8 @@ pub async fn run() -> anyhow::Result<()> {
183189
.route("/cost", post(routes::cost::cost))
184190
.route("/status", post(routes::status))
185191
.with_state(state),
192+
value_check_receiver,
193+
value_check_sender,
186194
})
187195
.await
188196
}

0 commit comments

Comments
 (0)