Skip to content

Commit cf1854f

Browse files
committed
graph, graphql, node: Record GraphQL query result sizes in metrics
1 parent b515e25 commit cf1854f

File tree

10 files changed

+108
-6
lines changed

10 files changed

+108
-6
lines changed

graph/src/components/metrics/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,18 @@ pub trait MetricsRegistry: Send + Sync + 'static {
213213
Ok(histogram)
214214
}
215215

216+
fn new_histogram(
217+
&self,
218+
name: &str,
219+
help: &str,
220+
buckets: Vec<f64>,
221+
) -> Result<Box<Histogram>, PrometheusError> {
222+
let opts = HistogramOpts::new(name, help).buckets(buckets);
223+
let histogram = Box::new(Histogram::with_opts(opts)?);
224+
self.register(name, histogram.clone());
225+
Ok(histogram)
226+
}
227+
216228
fn new_histogram_vec(
217229
&self,
218230
name: &str,

graphql/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@ pub mod prelude {
4040

4141
#[cfg(debug_assertions)]
4242
pub mod test_support {
43+
pub use super::runner::ResultSizeMetrics;
4344
pub use super::runner::INITIAL_DEPLOYMENT_STATE_FOR_TESTS;
4445
}

graphql/src/runner.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::env;
23
use std::str::FromStr;
34
use std::sync::Arc;
@@ -6,6 +7,8 @@ use std::time::{Duration, Instant};
67
use crate::prelude::{QueryExecutionOptions, StoreResolver, SubscriptionExecutionOptions};
78
use crate::query::execute_query;
89
use crate::subscription::execute_prepared_subscription;
10+
use graph::prelude::MetricsRegistry;
11+
use graph::prometheus::{Gauge, Histogram};
912
use graph::{
1013
components::store::SubscriptionManager,
1114
prelude::{
@@ -21,12 +24,59 @@ use graph::{
2124

2225
use lazy_static::lazy_static;
2326

27+
pub struct ResultSizeMetrics {
28+
histogram: Box<Histogram>,
29+
max_gauge: Box<Gauge>,
30+
}
31+
32+
impl ResultSizeMetrics {
33+
fn new(registry: Arc<impl MetricsRegistry>) -> Self {
34+
// Divide the Histogram into exponentially sized buckets between 1k and 32G
35+
let bins = (10..26).map(|n| 2u64.pow(n) as f64).collect::<Vec<_>>();
36+
let histogram = registry
37+
.new_histogram(
38+
"query_result_size",
39+
"the size of the result of successful GraphQL queries (in CacheWeight)",
40+
bins,
41+
)
42+
.unwrap();
43+
44+
let max_gauge = registry
45+
.new_gauge(
46+
"query_result_max",
47+
"the maximum size of a query result (in CacheWeight)",
48+
HashMap::new(),
49+
)
50+
.unwrap();
51+
52+
Self {
53+
histogram,
54+
max_gauge,
55+
}
56+
}
57+
58+
// Tests need to construct one of these, but normal code doesn't
59+
#[cfg(debug_assertions)]
60+
pub fn make(registry: Arc<impl MetricsRegistry>) -> Self {
61+
Self::new(registry)
62+
}
63+
64+
pub fn observe(&self, size: usize) {
65+
let size = size as f64;
66+
self.histogram.observe(size);
67+
if self.max_gauge.get() < size {
68+
self.max_gauge.set(size);
69+
}
70+
}
71+
}
72+
2473
/// GraphQL runner implementation for The Graph.
2574
pub struct GraphQlRunner<S, SM> {
2675
logger: Logger,
2776
store: Arc<S>,
2877
subscription_manager: Arc<SM>,
2978
load_manager: Arc<LoadManager>,
79+
result_size: Arc<ResultSizeMetrics>,
3080
}
3181

3282
lazy_static! {
@@ -81,13 +131,16 @@ where
81131
store: Arc<S>,
82132
subscription_manager: Arc<SM>,
83133
load_manager: Arc<LoadManager>,
134+
registry: Arc<impl MetricsRegistry>,
84135
) -> Self {
85136
let logger = logger.new(o!("component" => "GraphQlRunner"));
137+
let result_size = Arc::new(ResultSizeMetrics::new(registry));
86138
GraphQlRunner {
87139
logger,
88140
store,
89141
subscription_manager,
90142
load_manager,
143+
result_size,
91144
}
92145
}
93146

@@ -129,6 +182,7 @@ where
129182
max_depth: Option<u8>,
130183
max_first: Option<u32>,
131184
max_skip: Option<u32>,
185+
result_size: Arc<ResultSizeMetrics>,
132186
) -> Result<QueryResults, QueryResults> {
133187
// We need to use the same `QueryStore` for the entire query to ensure
134188
// we have a consistent view if the world, even when replicas, which
@@ -181,6 +235,7 @@ where
181235
bc,
182236
error_policy,
183237
query.schema.id().clone(),
238+
result_size.cheap_clone(),
184239
)
185240
.await?;
186241
max_block = max_block.max(resolver.block_number());
@@ -242,6 +297,7 @@ where
242297
max_depth,
243298
max_first,
244299
max_skip,
300+
self.result_size.cheap_clone(),
245301
)
246302
.await
247303
.unwrap_or_else(|e| e)
@@ -288,6 +344,7 @@ where
288344
max_depth: *GRAPHQL_MAX_DEPTH,
289345
max_first: *GRAPHQL_MAX_FIRST,
290346
max_skip: *GRAPHQL_MAX_SKIP,
347+
result_size: self.result_size.clone(),
291348
},
292349
)
293350
.await

graphql/src/store/prefetch.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use graph::{
2424

2525
use crate::execution::{ExecutionContext, Resolver};
2626
use crate::query::ast as qast;
27+
use crate::runner::ResultSizeMetrics;
2728
use crate::schema::ast as sast;
2829
use crate::store::{build_query, StoreResolver};
2930

@@ -497,8 +498,10 @@ pub fn run(
497498
resolver: &StoreResolver,
498499
ctx: &ExecutionContext<impl Resolver>,
499500
selection_set: &q::SelectionSet,
501+
result_size: &ResultSizeMetrics,
500502
) -> Result<q::Value, Vec<QueryExecutionError>> {
501503
execute_root_selection_set(resolver, ctx, selection_set).map(|nodes| {
504+
result_size.observe(nodes.weight());
502505
let map = BTreeMap::default();
503506
q::Value::Object(nodes.into_iter().fold(map, |mut map, node| {
504507
// For root nodes, we only care about the children

graphql/src/store/resolver.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use graph::prelude::*;
1010
use graph::{components::store::*, data::schema::BLOCK_FIELD_TYPE};
1111

1212
use crate::query::ext::BlockConstraint;
13+
use crate::runner::ResultSizeMetrics;
1314
use crate::schema::ast as sast;
1415
use crate::{prelude::*, schema::api::ErrorPolicy};
1516

@@ -25,6 +26,7 @@ pub struct StoreResolver {
2526
deployment: DeploymentHash,
2627
has_non_fatal_errors: bool,
2728
error_policy: ErrorPolicy,
29+
result_size: Arc<ResultSizeMetrics>,
2830
}
2931

3032
impl CheapClone for StoreResolver {}
@@ -39,6 +41,7 @@ impl StoreResolver {
3941
deployment: DeploymentHash,
4042
store: Arc<dyn QueryStore>,
4143
subscription_manager: Arc<dyn SubscriptionManager>,
44+
result_size: Arc<ResultSizeMetrics>,
4245
) -> Self {
4346
StoreResolver {
4447
logger: logger.new(o!("component" => "StoreResolver")),
@@ -50,6 +53,7 @@ impl StoreResolver {
5053
// Checking for non-fatal errors does not work with subscriptions.
5154
has_non_fatal_errors: false,
5255
error_policy: ErrorPolicy::Deny,
56+
result_size,
5357
}
5458
}
5559

@@ -65,6 +69,7 @@ impl StoreResolver {
6569
bc: BlockConstraint,
6670
error_policy: ErrorPolicy,
6771
deployment: DeploymentHash,
72+
result_size: Arc<ResultSizeMetrics>,
6873
) -> Result<Self, QueryExecutionError> {
6974
let store_clone = store.cheap_clone();
7075
let deployment2 = deployment.clone();
@@ -87,6 +92,7 @@ impl StoreResolver {
8792
deployment,
8893
has_non_fatal_errors,
8994
error_policy,
95+
result_size,
9096
};
9197
Ok(resolver)
9298
}
@@ -221,7 +227,7 @@ impl Resolver for StoreResolver {
221227
ctx: &ExecutionContext<Self>,
222228
selection_set: &q::SelectionSet,
223229
) -> Result<Option<q::Value>, Vec<QueryExecutionError>> {
224-
super::prefetch::run(self, ctx, selection_set).map(Some)
230+
super::prefetch::run(self, ctx, selection_set, &self.result_size).map(Some)
225231
}
226232

227233
fn resolve_objects(

graphql/src/subscription/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
55

66
use graph::{components::store::SubscriptionManager, prelude::*};
77

8+
use crate::runner::ResultSizeMetrics;
89
use crate::{
910
execution::*,
1011
prelude::{BlockConstraint, StoreResolver},
@@ -35,6 +36,8 @@ pub struct SubscriptionExecutionOptions {
3536

3637
/// Maximum value for the `skip` argument.
3738
pub max_skip: u32,
39+
40+
pub result_size: Arc<ResultSizeMetrics>,
3841
}
3942

4043
pub async fn execute_subscription(
@@ -83,6 +86,7 @@ async fn create_source_event_stream(
8386
query.schema.id().clone(),
8487
options.store.clone(),
8588
options.subscription_manager.cheap_clone(),
89+
options.result_size.cheap_clone(),
8690
);
8791
let ctx = ExecutionContext {
8892
logger: options.logger.cheap_clone(),
@@ -158,6 +162,7 @@ fn map_source_to_response_stream(
158162
max_depth: _,
159163
max_first,
160164
max_skip,
165+
result_size,
161166
} = options;
162167

163168
Box::new(
@@ -177,6 +182,7 @@ fn map_source_to_response_stream(
177182
timeout,
178183
max_first,
179184
max_skip,
185+
result_size.cheap_clone(),
180186
)
181187
.boxed(),
182188
}),
@@ -192,6 +198,7 @@ async fn execute_subscription_event(
192198
timeout: Option<Duration>,
193199
max_first: u32,
194200
max_skip: u32,
201+
result_size: Arc<ResultSizeMetrics>,
195202
) -> Arc<QueryResult> {
196203
debug!(logger, "Execute subscription event"; "event" => format!("{:?}", event));
197204

@@ -202,6 +209,7 @@ async fn execute_subscription_event(
202209
BlockConstraint::Latest,
203210
ErrorPolicy::Deny,
204211
query.schema.id().clone(),
212+
result_size,
205213
)
206214
.await
207215
{

graphql/tests/query.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ use graph::{
3030
use graph_graphql::{prelude::*, subscription::execute_subscription};
3131
use test_store::{
3232
deployment_state, execute_subgraph_query_with_complexity, execute_subgraph_query_with_deadline,
33-
revert_block, run_test_sequentially, transact_entity_operations, transact_errors, Store,
34-
BLOCK_ONE, GENESIS_PTR, LOAD_MANAGER, LOGGER, STORE, SUBSCRIPTION_MANAGER,
33+
result_size_metrics, revert_block, run_test_sequentially, transact_entity_operations,
34+
transact_errors, Store, BLOCK_ONE, GENESIS_PTR, LOAD_MANAGER, LOGGER, METRICS_REGISTRY, STORE,
35+
SUBSCRIPTION_MANAGER,
3536
};
3637

3738
const NETWORK_NAME: &str = "fake_network";
@@ -258,6 +259,7 @@ async fn execute_query_document_with_variables(
258259
STORE.clone(),
259260
SUBSCRIPTION_MANAGER.clone(),
260261
LOAD_MANAGER.clone(),
262+
METRICS_REGISTRY.clone(),
261263
));
262264
let target = QueryTarget::Deployment(id.clone());
263265
let query = Query::new(query, variables);
@@ -885,6 +887,7 @@ fn query_complexity_subscriptions() {
885887
max_depth: 100,
886888
max_first: std::u32::MAX,
887889
max_skip: std::u32::MAX,
890+
result_size: result_size_metrics(),
888891
};
889892
let schema = STORE.subgraph_store().api_schema(&deployment.hash).unwrap();
890893

@@ -933,6 +936,7 @@ fn query_complexity_subscriptions() {
933936
max_depth: 100,
934937
max_first: std::u32::MAX,
935938
max_skip: std::u32::MAX,
939+
result_size: result_size_metrics(),
936940
};
937941

938942
// The extra introspection causes the complexity to go over.
@@ -1311,6 +1315,7 @@ fn subscription_gets_result_even_without_events() {
13111315
max_depth: 100,
13121316
max_first: std::u32::MAX,
13131317
max_skip: std::u32::MAX,
1318+
result_size: result_size_metrics(),
13141319
};
13151320
// Execute the subscription and expect at least one result to be
13161321
// available in the result stream

node/src/bin/manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,13 +432,14 @@ impl Context {
432432
let store = self.store();
433433

434434
let subscription_manager = Arc::new(PanicSubscriptionManager);
435-
let load_manager = Arc::new(LoadManager::new(&logger, vec![], registry));
435+
let load_manager = Arc::new(LoadManager::new(&logger, vec![], registry.clone()));
436436

437437
Arc::new(GraphQlRunner::new(
438438
&logger,
439439
store,
440440
subscription_manager,
441441
load_manager,
442+
registry,
442443
))
443444
}
444445
}

node/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ async fn main() {
231231
network_store.clone(),
232232
subscription_manager.clone(),
233233
load_manager,
234+
metrics_registry.clone(),
234235
));
235236
let mut graphql_server = GraphQLQueryServer::new(
236237
&logger_factory,

store/test-store/src/store.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use graph::{
1414
use graph_graphql::prelude::{
1515
execute_query, Query as PreparedQuery, QueryExecutionOptions, StoreResolver,
1616
};
17+
use graph_graphql::test_support::ResultSizeMetrics;
1718
use graph_mock::MockMetricsRegistry;
1819
use graph_node::config::{Config, Opt};
1920
use graph_node::store_builder::StoreBuilder;
@@ -45,10 +46,12 @@ lazy_static! {
4546
static ref SEQ_LOCK: Mutex<()> = Mutex::new(());
4647
pub static ref STORE_RUNTIME: Runtime =
4748
Builder::new_multi_thread().enable_all().build().unwrap();
49+
pub static ref METRICS_REGISTRY: Arc<MockMetricsRegistry> =
50+
Arc::new(MockMetricsRegistry::new());
4851
pub static ref LOAD_MANAGER: Arc<LoadManager> = Arc::new(LoadManager::new(
4952
&*LOGGER,
5053
Vec::new(),
51-
Arc::new(MockMetricsRegistry::new()),
54+
METRICS_REGISTRY.clone(),
5255
));
5356
static ref STORE_POOL_CONFIG: (Arc<Store>, ConnectionPool, Config, Arc<SubscriptionManager>) =
5457
build_store();
@@ -359,6 +362,10 @@ macro_rules! return_err {
359362
};
360363
}
361364

365+
pub fn result_size_metrics() -> Arc<ResultSizeMetrics> {
366+
Arc::new(ResultSizeMetrics::make(METRICS_REGISTRY.clone()))
367+
}
368+
362369
fn execute_subgraph_query_internal(
363370
query: Query,
364371
target: QueryTarget,
@@ -403,7 +410,8 @@ fn execute_subgraph_query_internal(
403410
SUBSCRIPTION_MANAGER.clone(),
404411
bc,
405412
error_policy,
406-
query.schema.id().clone()
413+
query.schema.id().clone(),
414+
result_size_metrics()
407415
)));
408416
result.append(rt.block_on(execute_query(
409417
query.clone(),

0 commit comments

Comments
 (0)