Skip to content

Commit a14587a

Browse files
committed
all: Track wait stats in connection pool, add LoadManager
1 parent fd63d88 commit a14587a

File tree

14 files changed

+89
-45
lines changed

14 files changed

+89
-45
lines changed

graph/src/components/graphql.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::prelude::*;
22

3-
use crate::data::graphql::effort::QueryEffort;
3+
use crate::data::graphql::effort::LoadManager;
44
use crate::data::query::{Query, QueryError, QueryResult};
55
use crate::data::subscription::{Subscription, SubscriptionError, SubscriptionResult};
66

@@ -50,5 +50,5 @@ pub trait GraphQlRunner: Send + Sync + 'static {
5050
})
5151
}
5252

53-
fn effort(&self) -> Arc<QueryEffort>;
53+
fn load_manager(&self) -> Arc<LoadManager>;
5454
}

graph/src/components/store.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::env;
1111
use std::fmt;
1212
use std::str::FromStr;
1313
use std::sync::atomic::{AtomicUsize, Ordering};
14-
use std::sync::Arc;
14+
use std::sync::{Arc, RwLock};
1515
use std::time::{Duration, Instant};
1616
use web3::types::{Address, H256};
1717

@@ -1214,6 +1214,10 @@ mock! {
12141214
}
12151215
}
12161216

1217+
// The type that the connection pool uses to track wait times for
1218+
// connection checkouts
1219+
pub type PoolWaitStats = Arc<RwLock<MovingStats>>;
1220+
12171221
// The store trait must be implemented manually because mockall does not support async_trait, nor borrowing from arguments.
12181222
impl Store for MockStore {
12191223
fn block_ptr(

graph/src/data/graphql/effort.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::collections::HashMap;
44
use std::sync::{Arc, RwLock};
55
use std::time::{Duration, Instant};
66

7+
use crate::components::store::PoolWaitStats;
78
use crate::util::stats::{MovingStats, BIN_SIZE, WINDOW_SIZE};
89

910
pub struct QueryEffort {
@@ -61,3 +62,21 @@ impl QueryEffortInner {
6162
self.total.add_at(now, duration);
6263
}
6364
}
65+
66+
pub struct LoadManager {
67+
effort: QueryEffort,
68+
store_wait_stats: PoolWaitStats,
69+
}
70+
71+
impl LoadManager {
72+
pub fn new(store_wait_stats: PoolWaitStats) -> Self {
73+
Self {
74+
effort: QueryEffort::default(),
75+
store_wait_stats,
76+
}
77+
}
78+
79+
pub fn add_query(&self, shape_hash: u64, duration: Duration) {
80+
self.effort.add(shape_hash, duration);
81+
}
82+
}

graph/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ pub mod prelude {
101101
AttributeIndexDefinition, BlockNumber, ChainStore, ChildMultiplicity, EntityCache,
102102
EntityChange, EntityChangeOperation, EntityCollection, EntityFilter, EntityKey, EntityLink,
103103
EntityModification, EntityOperation, EntityOrder, EntityQuery, EntityRange, EntityWindow,
104-
EthereumCallCache, MetadataOperation, ParentLink, Store, StoreError, StoreEvent,
105-
StoreEventStream, StoreEventStreamBox, SubgraphDeploymentStore, TransactionAbortError,
106-
WindowAttribute, BLOCK_NUMBER_MAX, SUBSCRIPTION_THROTTLE_INTERVAL,
104+
EthereumCallCache, MetadataOperation, ParentLink, PoolWaitStats, Store, StoreError,
105+
StoreEvent, StoreEventStream, StoreEventStreamBox, SubgraphDeploymentStore,
106+
TransactionAbortError, WindowAttribute, BLOCK_NUMBER_MAX, SUBSCRIPTION_THROTTLE_INTERVAL,
107107
};
108108
pub use crate::components::subgraph::{
109109
BlockState, DataSourceLoader, DataSourceTemplateInfo, HostMetrics, RuntimeHost,

graphql/src/query/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::{atomic::AtomicBool, Arc};
55
use std::time::Instant;
66
use uuid::Uuid;
77

8-
use graph::data::graphql::effort::QueryEffort;
8+
use graph::data::graphql::effort::LoadManager;
99

1010
use crate::execution::*;
1111
use crate::schema::ast as sast;
@@ -33,7 +33,7 @@ where
3333
/// Maximum value for the `first` argument.
3434
pub max_first: u32,
3535

36-
pub effort: Arc<QueryEffort>,
36+
pub load_manager: Arc<LoadManager>,
3737
}
3838

3939
/// Executes a query and returns a result.
@@ -79,7 +79,7 @@ where
7979
let start = Instant::now();
8080
let result = execute_root_selection_set(&ctx, selection_set, query_type, block_ptr);
8181
let elapsed = start.elapsed();
82-
options.effort.add(query.shape_hash, elapsed);
82+
options.load_manager.add_query(query.shape_hash, elapsed);
8383
if *graph::log::LOG_GQL_TIMING {
8484
info!(
8585
query_logger,

graphql/src/runner.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use crate::prelude::{
1111
};
1212
use crate::query::execute_query;
1313
use crate::subscription::execute_prepared_subscription;
14-
use graph::data::graphql::effort::QueryEffort;
14+
use graph::data::graphql::effort::LoadManager;
1515
use graph::prelude::{
16-
o, shape_hash, EthereumBlockPointer, GraphQlRunner as GraphQlRunnerTrait, Logger, Query,
17-
QueryExecutionError, QueryResult, QueryResultFuture, Store, StoreError, SubgraphDeploymentId,
18-
SubgraphDeploymentStore, Subscription, SubscriptionError, SubscriptionResultFuture,
16+
o, shape_hash, EthereumBlockPointer, GraphQlRunner as GraphQlRunnerTrait, Logger,
17+
PoolWaitStats, Query, QueryExecutionError, QueryResult, QueryResultFuture, Store, StoreError,
18+
SubgraphDeploymentId, SubgraphDeploymentStore, Subscription, SubscriptionError,
19+
SubscriptionResultFuture,
1920
};
2021

2122
use lazy_static::lazy_static;
@@ -25,7 +26,7 @@ pub struct GraphQlRunner<S> {
2526
logger: Logger,
2627
store: Arc<S>,
2728
expensive: HashMap<u64, Arc<q::Document>>,
28-
effort: Arc<QueryEffort>,
29+
load_manager: Arc<LoadManager>,
2930
}
3031

3132
lazy_static! {
@@ -56,7 +57,12 @@ where
5657
S: Store + SubgraphDeploymentStore,
5758
{
5859
/// Creates a new query runner.
59-
pub fn new(logger: &Logger, store: Arc<S>, expensive: &Vec<Arc<q::Document>>) -> Self {
60+
pub fn new(
61+
logger: &Logger,
62+
store: Arc<S>,
63+
expensive: &Vec<Arc<q::Document>>,
64+
store_wait_stats: PoolWaitStats,
65+
) -> Self {
6066
let expensive = expensive
6167
.into_iter()
6268
.map(|doc| (shape_hash(&doc), doc.clone()))
@@ -65,7 +71,7 @@ where
6571
logger: logger.new(o!("component" => "GraphQlRunner")),
6672
store,
6773
expensive,
68-
effort: Arc::new(QueryEffort::default()),
74+
load_manager: Arc::new(LoadManager::new(store_wait_stats)),
6975
}
7076
}
7177

@@ -135,7 +141,7 @@ where
135141
resolver,
136142
deadline: GRAPHQL_QUERY_TIMEOUT.map(|t| Instant::now() + t),
137143
max_first: max_first.unwrap_or(*GRAPHQL_MAX_FIRST),
138-
effort: self.effort.clone(),
144+
load_manager: self.load_manager.clone(),
139145
},
140146
) {
141147
Err(errs) => errors.extend(errs),
@@ -215,7 +221,7 @@ where
215221
Box::new(future::result(result))
216222
}
217223

218-
fn effort(&self) -> Arc<QueryEffort> {
219-
self.effort.clone()
224+
fn load_manager(&self) -> Arc<LoadManager> {
225+
self.load_manager.clone()
220226
}
221227
}

graphql/tests/introspection.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@ extern crate pretty_assertions;
44
use graphql_parser::{query as q, schema as s};
55
use std::collections::HashMap;
66
use std::sync::Arc;
7-
use std::time::Duration;
87

9-
use graph::data::graphql::effort::QueryEffort;
108
use graph::prelude::{
119
o, slog, Logger, Query, QueryExecutionError, QueryResult, Schema, SubgraphDeploymentId,
1210
};
1311
use graph_graphql::prelude::{
1412
api_schema, execute_query, object, object_value, ExecutionContext, ObjectOrInterface,
1513
Query as PreparedQuery, QueryExecutionOptions, Resolver,
1614
};
15+
use test_store::LOAD_MANAGER;
1716

1817
/// Mock resolver used in tests that don't need a resolver.
1918
#[derive(Clone)]
@@ -560,10 +559,7 @@ fn introspection_query(schema: Schema, query: &str) -> QueryResult {
560559
resolver: MockResolver,
561560
deadline: None,
562561
max_first: std::u32::MAX,
563-
effort: Arc::new(QueryEffort::new(
564-
Duration::from_millis(0),
565-
Duration::from_millis(0),
566-
)),
562+
load_manager: LOAD_MANAGER.clone(),
567563
};
568564

569565
let result = PreparedQuery::new(query, None, 100)

graphql/tests/query.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use graphql_parser::{query as q, Pos};
55
use lazy_static::lazy_static;
66
use std::collections::HashMap;
77
use std::iter::FromIterator;
8-
use std::sync::Arc;
8+
use std::sync::{Arc, RwLock};
99
use std::time::{Duration, Instant};
1010

1111
use graph::prelude::{
1212
futures03::stream::StreamExt, futures03::FutureExt, futures03::TryFutureExt, o, slog, tokio,
13-
Entity, EntityKey, EntityOperation, EthereumBlockPointer, FutureExtension, Logger, Query,
14-
QueryError, QueryExecutionError, QueryResult, QueryVariables, Schema, Store,
13+
Entity, EntityKey, EntityOperation, EthereumBlockPointer, FutureExtension, Logger, MovingStats,
14+
Query, QueryError, QueryExecutionError, QueryResult, QueryVariables, Schema, Store,
1515
SubgraphDeploymentEntity, SubgraphDeploymentId, SubgraphDeploymentStore, SubgraphManifest,
1616
Subscription, SubscriptionError, Value,
1717
};
@@ -230,7 +230,8 @@ fn execute_query_document_with_variables(
230230
query: q::Document,
231231
variables: Option<QueryVariables>,
232232
) -> QueryResult {
233-
let runner = GraphQlRunner::new(&*LOGGER, STORE.clone(), &vec![]);
233+
let stats = Arc::new(RwLock::new(MovingStats::default()));
234+
let runner = GraphQlRunner::new(&*LOGGER, STORE.clone(), &vec![], stats);
234235
let query = Query::new(Arc::new(api_test_schema()), query, variables);
235236

236237
return_err!(runner.execute(query, None, None, None))

node/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::env;
88
use std::io::{BufRead, BufReader};
99
use std::path::Path;
1010
use std::str::FromStr;
11+
use std::sync::RwLock;
1112
use std::time::Duration;
1213
use tokio::sync::mpsc;
1314

@@ -518,12 +519,14 @@ async fn main() {
518519
let stores_error_logger = logger.clone();
519520
let stores_eth_adapters = eth_adapters.clone();
520521
let contention_logger = logger.clone();
522+
let wait_stats = Arc::new(RwLock::new(MovingStats::default()));
521523

522524
let postgres_conn_pool = create_connection_pool(
523525
postgres_url.clone(),
524526
store_conn_pool_size,
525527
&logger,
526528
connection_pool_registry,
529+
wait_stats.clone(),
527530
);
528531

529532
let chain_head_update_listener = Arc::new(PostgresChainHeadUpdateListener::new(
@@ -589,6 +592,7 @@ async fn main() {
589592
&logger,
590593
generic_store.clone(),
591594
&expensive_queries,
595+
wait_stats.clone(),
592596
));
593597
let mut graphql_server = GraphQLQueryServer::new(
594598
&logger_factory,

server/http/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ mod tests {
475475
use hyper::{Body, Method, Request};
476476
use std::collections::BTreeMap;
477477

478-
use graph::data::graphql::effort::QueryEffort;
478+
use graph::data::graphql::effort::LoadManager;
479479
use graph::prelude::*;
480480
use graph_mock::{mock_store_with_users_subgraph, MockMetricsRegistry};
481481
use graphql_parser::query as q;
@@ -515,7 +515,7 @@ mod tests {
515515
unreachable!();
516516
}
517517

518-
fn effort(&self) -> Arc<QueryEffort> {
518+
fn load_manager(&self) -> Arc<LoadManager> {
519519
unimplemented!()
520520
}
521521
}

0 commit comments

Comments
 (0)