Skip to content

Commit ee51bb6

Browse files
committed
feat: send cost model via state
1 parent 8b7cc86 commit ee51bb6

File tree

5 files changed

+101
-13
lines changed

5 files changed

+101
-13
lines changed

common/src/indexer_service/http/indexer_service.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};
3333
use tower_http::trace::TraceLayer;
3434
use tracing::{info, info_span};
3535

36+
use crate::tap::{ValueCheckReceiver, ValueCheckSender};
3637
use crate::{
3738
address::public_key,
3839
indexer_service::http::{
@@ -167,6 +168,8 @@ where
167168
pub url_namespace: &'static str,
168169
pub metrics_prefix: &'static str,
169170
pub extra_routes: Router<Arc<IndexerServiceState<I>>>,
171+
pub value_check_receiver: ValueCheckReceiver,
172+
pub value_check_sender: ValueCheckSender,
170173
}
171174

172175
pub struct IndexerServiceState<I>
@@ -178,6 +181,7 @@ where
178181
pub tap_manager: Manager<IndexerTapContext>,
179182
pub service_impl: Arc<I>,
180183
pub metrics: IndexerServiceMetrics,
184+
pub value_check_sender: ValueCheckSender,
181185
}
182186

183187
pub struct IndexerService {}
@@ -294,6 +298,7 @@ impl IndexerService {
294298
allocations,
295299
escrow_accounts,
296300
domain_separator.clone(),
301+
options.value_check_receiver,
297302
)
298303
.await;
299304

@@ -305,6 +310,7 @@ impl IndexerService {
305310
tap_manager,
306311
service_impl: Arc::new(options.service_impl),
307312
metrics,
313+
value_check_sender: options.value_check_sender,
308314
});
309315

310316
// Rate limits by allowing bursts of 10 requests and requiring 100ms of

common/src/tap.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use crate::tap::checks::allocation_eligible::AllocationEligible;
55
use crate::tap::checks::deny_list_check::DenyListCheck;
66
use crate::tap::checks::sender_balance_check::SenderBalanceCheck;
7+
use crate::tap::checks::value_check::MinimumValue;
78
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
89
use alloy_sol_types::Eip712Domain;
910
use eventuals::Eventual;
@@ -17,6 +18,10 @@ use tracing::error;
1718
mod checks;
1819
mod receipt_store;
1920

21+
pub use checks::value_check::{
22+
create_value_check, CostModelSource, ValueCheckReceiver, ValueCheckSender,
23+
};
24+
2025
#[derive(Clone)]
2126
pub struct IndexerTapContext {
2227
pgpool: PgPool,
@@ -35,6 +40,7 @@ impl IndexerTapContext {
3540
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
3641
escrow_accounts: Eventual<EscrowAccounts>,
3742
domain_separator: Eip712Domain,
43+
value_check_receiver: ValueCheckReceiver,
3844
) -> Vec<ReceiptCheck> {
3945
vec![
4046
Arc::new(AllocationEligible::new(indexer_allocations)),
@@ -43,6 +49,7 @@ impl IndexerTapContext {
4349
domain_separator.clone(),
4450
)),
4551
Arc::new(DenyListCheck::new(pgpool, escrow_accounts, domain_separator).await),
52+
Arc::new(MinimumValue::new(value_check_receiver)),
4653
]
4754
}
4855

common/src/tap/checks/value_check.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use std::{
1010
time::Duration,
1111
};
1212
use thegraph::types::DeploymentId;
13-
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
13+
use tokio::{
14+
sync::mpsc::{Receiver, Sender},
15+
task::JoinHandle,
16+
};
1417
use ttl_cache::TtlCache;
1518

1619
use anyhow::anyhow;
@@ -28,10 +31,39 @@ pub struct MinimumValue {
2831
query_handle: JoinHandle<()>,
2932
}
3033

34+
#[derive(Clone)]
35+
pub struct ValueCheckSender {
36+
pub tx_cost_model: Sender<CostModelSource>,
37+
pub tx_query: Sender<AgoraQuery>,
38+
}
39+
40+
pub struct ValueCheckReceiver {
41+
rx_cost_model: Receiver<CostModelSource>,
42+
rx_query: Receiver<AgoraQuery>,
43+
}
44+
45+
pub fn create_value_check(size: usize) -> (ValueCheckSender, ValueCheckReceiver) {
46+
let (tx_cost_model, rx_cost_model) = tokio::sync::mpsc::channel(size);
47+
let (tx_query, rx_query) = tokio::sync::mpsc::channel(size);
48+
49+
(
50+
ValueCheckSender {
51+
tx_query,
52+
tx_cost_model,
53+
},
54+
ValueCheckReceiver {
55+
rx_cost_model,
56+
rx_query,
57+
},
58+
)
59+
}
60+
3161
impl MinimumValue {
3262
pub fn new(
33-
mut rx_cost_model: Receiver<CostModelSource>,
34-
mut rx_query: Receiver<AgoraQuery>,
63+
ValueCheckReceiver {
64+
mut rx_query,
65+
mut rx_cost_model,
66+
}: ValueCheckReceiver,
3567
) -> Self {
3668
let cost_model_cache = Arc::new(Mutex::new(HashMap::<DeploymentId, CostModelCache>::new()));
3769
let query_ids = Arc::new(Mutex::new(HashMap::new()));
@@ -174,9 +206,9 @@ pub struct AgoraQuery {
174206

175207
#[derive(Clone, Eq, Hash, PartialEq)]
176208
pub struct CostModelSource {
177-
deployment_id: DeploymentId,
178-
model: String,
179-
variables: String,
209+
pub deployment_id: DeploymentId,
210+
pub model: String,
211+
pub variables: String,
180212
}
181213

182214
pub struct CostModelCache {

service/src/main.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ use axum::{
1212
Json, Router,
1313
};
1414
use clap::Parser;
15-
use indexer_common::indexer_service::http::{
16-
IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease,
17-
IndexerServiceResponse,
15+
use indexer_common::{
16+
indexer_service::http::{
17+
IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease,
18+
IndexerServiceResponse,
19+
},
20+
tap::{create_value_check, ValueCheckSender},
1821
};
1922
use reqwest::{StatusCode, Url};
2023
use serde_json::{json, Value};
@@ -104,6 +107,7 @@ pub struct SubgraphServiceState {
104107
pub graph_node_client: reqwest::Client,
105108
pub graph_node_status_url: String,
106109
pub graph_node_query_base_url: String,
110+
pub value_check_sender: ValueCheckSender,
107111
}
108112

109113
struct SubgraphService {
@@ -186,6 +190,9 @@ async fn main() -> Result<(), Error> {
186190
build_info::build_info!(fn build_info);
187191
let release = IndexerServiceRelease::from(build_info());
188192

193+
// arbitrary value
194+
let (value_check_sender, value_check_receiver) = create_value_check(10);
195+
189196
// Some of the subgrpah service configuration goes into the so-called
190197
// "state", which will be passed to any request handler, middleware etc.
191198
// that is involved in serving requests
@@ -212,6 +219,7 @@ async fn main() -> Result<(), Error> {
212219
.expect("config must have `common.graph_node.query_url` set")
213220
.query_base_url
214221
.clone(),
222+
value_check_sender: value_check_sender.clone(),
215223
});
216224

217225
IndexerService::run(IndexerServiceOptions {
@@ -224,6 +232,8 @@ async fn main() -> Result<(), Error> {
224232
.route("/cost", post(routes::cost::cost))
225233
.route("/status", post(routes::status))
226234
.with_state(state),
235+
value_check_receiver,
236+
value_check_sender,
227237
})
228238
.await
229239
}

service/src/routes/cost.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
88
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
99
use axum::extract::State;
10+
use indexer_common::tap::CostModelSource;
1011
use serde::{Deserialize, Serialize};
1112
use serde_json::Value;
1213
use thegraph::types::DeploymentId;
@@ -21,6 +22,16 @@ pub struct GraphQlCostModel {
2122
pub variables: Option<Value>,
2223
}
2324

25+
impl From<CostModel> for CostModelSource {
26+
fn from(value: CostModel) -> Self {
27+
Self {
28+
deployment_id: value.deployment,
29+
model: value.model.unwrap_or_default(),
30+
variables: value.variables.unwrap_or_default().to_string(),
31+
}
32+
}
33+
}
34+
2435
impl From<CostModel> for GraphQlCostModel {
2536
fn from(model: CostModel) -> Self {
2637
Self {
@@ -45,8 +56,20 @@ impl Query {
4556
.into_iter()
4657
.map(|s| DeploymentId::from_str(&s))
4758
.collect::<Result<Vec<DeploymentId>, _>>()?;
48-
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
59+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
60+
61+
let cost_model_sender = &state.value_check_sender;
62+
63+
let pool = &state.database;
4964
let cost_models = database::cost_models(pool, &deployment_ids).await?;
65+
66+
for model in &cost_models {
67+
let _ = cost_model_sender
68+
.tx_cost_model
69+
.send(CostModelSource::from(model.clone()))
70+
.await;
71+
}
72+
5073
Ok(cost_models.into_iter().map(|m| m.into()).collect())
5174
}
5275

@@ -56,10 +79,20 @@ impl Query {
5679
deployment: String,
5780
) -> Result<Option<GraphQlCostModel>, anyhow::Error> {
5881
let deployment_id = DeploymentId::from_str(&deployment)?;
82+
83+
let state = &ctx.data_unchecked::<Arc<SubgraphServiceState>>();
84+
let cost_model_sender = &state.value_check_sender;
5985
let pool = &ctx.data_unchecked::<Arc<SubgraphServiceState>>().database;
60-
database::cost_model(pool, &deployment_id)
61-
.await
62-
.map(|model_opt| model_opt.map(GraphQlCostModel::from))
86+
let model = database::cost_model(pool, &deployment_id).await?;
87+
88+
if let Some(model) = &model {
89+
let _ = cost_model_sender
90+
.tx_cost_model
91+
.send(CostModelSource::from(model.clone()))
92+
.await;
93+
}
94+
95+
Ok(model.map(GraphQlCostModel::from))
6396
}
6497
}
6598

0 commit comments

Comments
 (0)