Skip to content

Commit fcf6e7b

Browse files
committed
feat: add error logs for model insert error
1 parent 8d0be21 commit fcf6e7b

File tree

5 files changed

+105
-13
lines changed

5 files changed

+105
-13
lines changed

common/src/indexer_service/http/indexer_service.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use tracing::{info, info_span};
3636

3737
use crate::escrow_accounts::EscrowAccounts;
3838
use crate::escrow_accounts::EscrowAccountsError;
39+
use crate::tap::{ValueCheckReceiver, ValueCheckSender};
3940
use crate::{
4041
address::public_key,
4142
indexer_service::http::static_subgraph::static_subgraph_request_handler,
@@ -176,6 +177,8 @@ where
176177
pub release: IndexerServiceRelease,
177178
pub url_namespace: &'static str,
178179
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
180+
pub value_check_receiver: ValueCheckReceiver,
181+
pub value_check_sender: ValueCheckSender,
179182
}
180183

181184
pub struct IndexerServiceState<I>
@@ -190,6 +193,7 @@ where
190193
// tap
191194
pub escrow_accounts: Eventual<EscrowAccounts>,
192195
pub domain_separator: Eip712Domain,
196+
pub value_check_sender: ValueCheckSender,
193197
}
194198

195199
pub struct IndexerService {}
@@ -311,6 +315,7 @@ impl IndexerService {
311315
domain_separator.clone(),
312316
timestamp_error_tolerance,
313317
receipt_max_value,
318+
options.value_check_receiver,
314319
)
315320
.await;
316321

@@ -327,6 +332,7 @@ impl IndexerService {
327332
service_impl: Arc::new(options.service_impl),
328333
escrow_accounts,
329334
domain_separator,
335+
value_check_sender: options.value_check_sender,
330336
});
331337

332338
// Rate limits by allowing bursts of 10 requests and requiring 100ms of

common/src/tap.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::tap::checks::deny_list_check::DenyListCheck;
66
use crate::tap::checks::receipt_max_val_check::ReceiptMaxValueCheck;
77
use crate::tap::checks::sender_balance_check::SenderBalanceCheck;
88
use crate::tap::checks::timestamp_check::TimestampCheck;
9+
use crate::tap::checks::value_check::MinimumValue;
910
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
1011
use alloy::dyn_abi::Eip712Domain;
1112
use alloy::primitives::Address;
@@ -24,6 +25,11 @@ use tracing::error;
2425
mod checks;
2526
mod receipt_store;
2627

28+
pub use checks::value_check::{
29+
create_value_check, CostModelSource, ValueCheckReceiver, ValueCheckSender,
30+
};
31+
32+
#[derive(Clone)]
2733
pub struct IndexerTapContext {
2834
domain_separator: Arc<Eip712Domain>,
2935
receipt_producer: Sender<DatabaseReceipt>,
@@ -44,6 +50,7 @@ impl IndexerTapContext {
4450
domain_separator: Eip712Domain,
4551
timestamp_error_tolerance: Duration,
4652
receipt_max_value: u128,
53+
value_check_receiver: ValueCheckReceiver,
4754
) -> Vec<ReceiptCheck> {
4855
vec![
4956
Arc::new(AllocationEligible::new(indexer_allocations)),
@@ -54,6 +61,7 @@ impl IndexerTapContext {
5461
Arc::new(TimestampCheck::new(timestamp_error_tolerance)),
5562
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
5663
Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)),
64+
Arc::new(MinimumValue::new(value_check_receiver)),
5765
]
5866
}
5967

common/src/tap/checks/value_check.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{
1212
time::Duration,
1313
};
1414
use thegraph_core::DeploymentId;
15-
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
15+
use tokio::{
16+
sync::mpsc::{Receiver, Sender},
17+
task::JoinHandle,
18+
};
1619
use ttl_cache::TtlCache;
1720

1821
use tap_core::{
@@ -31,10 +34,39 @@ pub struct MinimumValue {
3134
query_handle: JoinHandle<()>,
3235
}
3336

37+
#[derive(Clone)]
38+
pub struct ValueCheckSender {
39+
pub tx_cost_model: Sender<CostModelSource>,
40+
pub tx_query: Sender<AgoraQuery>,
41+
}
42+
43+
pub struct ValueCheckReceiver {
44+
rx_cost_model: Receiver<CostModelSource>,
45+
rx_query: Receiver<AgoraQuery>,
46+
}
47+
48+
pub fn create_value_check(size: usize) -> (ValueCheckSender, ValueCheckReceiver) {
49+
let (tx_cost_model, rx_cost_model) = tokio::sync::mpsc::channel(size);
50+
let (tx_query, rx_query) = tokio::sync::mpsc::channel(size);
51+
52+
(
53+
ValueCheckSender {
54+
tx_query,
55+
tx_cost_model,
56+
},
57+
ValueCheckReceiver {
58+
rx_cost_model,
59+
rx_query,
60+
},
61+
)
62+
}
63+
3464
impl MinimumValue {
3565
pub fn new(
36-
mut rx_cost_model: Receiver<CostModelSource>,
37-
mut rx_query: Receiver<AgoraQuery>,
66+
ValueCheckReceiver {
67+
mut rx_query,
68+
mut rx_cost_model,
69+
}: ValueCheckReceiver,
3870
) -> Self {
3971
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
4072
let query_ids = Arc::new(Mutex::new(HashMap::new()));
@@ -48,7 +80,12 @@ impl MinimumValue {
4880
let deployment_id = value.deployment_id;
4981

5082
if let Some(query) = cache.lock().unwrap().get_mut(&deployment_id) {
51-
let _ = query.insert_model(value);
83+
let _ = query.insert_model(value).inspect_err(|err| {
84+
tracing::error!(
85+
"Error while compiling cost model for deployment id {}. Error: {}",
86+
deployment_id, err
87+
)
88+
});
5289
} else {
5390
match CostModelCache::new(value) {
5491
Ok(value) => {
@@ -172,9 +209,9 @@ pub struct AgoraQuery {
172209

173210
#[derive(Clone, Eq, Hash, PartialEq)]
174211
pub struct CostModelSource {
175-
deployment_id: DeploymentId,
176-
model: String,
177-
variables: String,
212+
pub deployment_id: DeploymentId,
213+
pub model: String,
214+
pub variables: String,
178215
}
179216

180217
pub struct CostModelCache {

service/src/routes/cost.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use prometheus::{
1212
register_counter, register_counter_vec, register_histogram, register_histogram_vec, Counter,
1313
CounterVec, Histogram, HistogramVec,
1414
};
15+
use indexer_common::tap::CostModelSource;
1516
use serde::{Deserialize, Serialize};
1617
use serde_json::Value;
1718
use thegraph_core::DeploymentId;
@@ -66,6 +67,16 @@ pub struct GraphQlCostModel {
6667
pub variables: Option<Value>,
6768
}
6869

70+
impl From<CostModel> for CostModelSource {
71+
fn from(value: CostModel) -> Self {
72+
Self {
73+
deployment_id: value.deployment,
74+
model: value.model.unwrap_or_default(),
75+
variables: value.variables.unwrap_or_default().to_string(),
76+
}
77+
}
78+
}
79+
6980
impl From<CostModel> for GraphQlCostModel {
7081
fn from(model: CostModel) -> Self {
7182
Self {
@@ -127,8 +138,20 @@ impl Query {
127138
ctx: &Context<'_>,
128139
deployment_ids: Vec<DeploymentId>,
129140
) -> Result<Vec<GraphQlCostModel>, anyhow::Error> {
130-
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
141+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
142+
143+
let cost_model_sender = &state.value_check_sender;
144+
145+
let pool = &state.database;
131146
let cost_models = database::cost_models(pool, &deployment_ids).await?;
147+
148+
for model in &cost_models {
149+
let _ = cost_model_sender
150+
.tx_cost_model
151+
.send(CostModelSource::from(model.clone()))
152+
.await;
153+
}
154+
132155
Ok(cost_models.into_iter().map(|m| m.into()).collect())
133156
}
134157

@@ -137,10 +160,20 @@ impl Query {
137160
ctx: &Context<'_>,
138161
deployment_id: DeploymentId,
139162
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
163+
164+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
165+
let cost_model_sender = &state.value_check_sender;
140166
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
141-
database::cost_model(pool, &deployment_id)
142-
.await
143-
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
167+
let model = database::cost_model(pool, &deployment_id).await?;
168+
169+
if let Some(model) = &model {
170+
let _ = cost_model_sender
171+
.tx_cost_model
172+
.send(CostModelSource::from(model.clone()))
173+
.await;
174+
}
175+
176+
Ok(model.map(GraphQlCostModel::from))
144177
}
145178
}
146179

service/src/service.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::time::Duration;
77
use super::{config::Config, error::SubgraphServiceError, routes};
88
use anyhow::anyhow;
99
use axum::{async_trait, routing::post, Json, Router};
10-
use indexer_common::indexer_service::http::{
11-
AttestationOutput, IndexerServiceImpl, IndexerServiceResponse,
10+
use indexer_common::{
11+
indexer_service::http::{AttestationOutput, IndexerServiceImpl, IndexerServiceResponse},
12+
tap::{create_value_check, ValueCheckSender},
1213
};
1314
use indexer_config::Config as MainConfig;
1415
use reqwest::Url;
@@ -67,6 +68,7 @@ pub struct SubgraphServiceState {
6768
pub graph_node_client: reqwest::Client,
6869
pub graph_node_status_url: String,
6970
pub graph_node_query_base_url: String,
71+
pub value_check_sender: ValueCheckSender,
7072
}
7173

7274
struct SubgraphService {
@@ -146,6 +148,9 @@ pub async fn run() -> anyhow::Result<()> {
146148
build_info::build_info!(fn build_info);
147149
let release = IndexerServiceRelease::from(build_info());
148150

151+
// arbitrary value
152+
let (value_check_sender, value_check_receiver) = create_value_check(10);
153+
149154
// Some of the subgraph service configuration goes into the so-called
150155
// "state", which will be passed to any request handler, middleware etc.
151156
// that is involved in serving requests
@@ -172,6 +177,7 @@ pub async fn run() -> anyhow::Result<()> {
172177
.expect("config must have `common.graph_node.query_url` set")
173178
.query_base_url
174179
.clone(),
180+
value_check_sender: value_check_sender.clone(),
175181
});
176182

177183
IndexerService::run(IndexerServiceOptions {
@@ -183,6 +189,8 @@ pub async fn run() -> anyhow::Result<()> {
183189
.route("/cost", post(routes::cost::cost))
184190
.route("/status", post(routes::status))
185191
.with_state(state),
192+
value_check_receiver,
193+
value_check_sender,
186194
})
187195
.await
188196
}

0 commit comments

Comments
 (0)