Skip to content

Commit e691bde

Browse files
authored
use a parking-lot mutex in Context (#2885)
The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster.
1 parent 028f6d9 commit e691bde

File tree

5 files changed

+30
-22
lines changed

5 files changed

+30
-22
lines changed

.changesets/fix_geal_context_mutex.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### use a parking-lot mutex in Context ([Issue #2751](https://github.com/apollographql/router/issues/2751))
2+
3+
The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster.
4+
5+
By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885

apollo-router/src/axum_factory/axum_http_server_factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ async fn handle_graphql(
436436
let context = request.context.clone();
437437

438438
let res = service.oneshot(request).await;
439-
let dur = context.busy_time().await;
439+
let dur = context.busy_time();
440440
let processing_seconds = dur.as_secs_f64();
441441

442442
tracing::info!(histogram.apollo_router_processing_time = processing_seconds,);

apollo-router/src/context/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use std::time::Instant;
1010
use dashmap::mapref::multiple::RefMulti;
1111
use dashmap::mapref::multiple::RefMutMulti;
1212
use dashmap::DashMap;
13-
use futures::lock::Mutex;
13+
use derivative::Derivative;
14+
use parking_lot::Mutex;
1415
use serde::Deserialize;
1516
use serde::Serialize;
1617
use tower::BoxError;
@@ -34,7 +35,8 @@ pub(crate) type Entries = Arc<DashMap<String, Value>>;
3435
/// [`crate::services::SubgraphResponse`] processing. At such times,
3536
/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
3637
/// functions to minimise the possibility of mis-sequenced updates.
37-
#[derive(Clone, Debug, Deserialize, Serialize)]
38+
#[derive(Clone, Deserialize, Serialize, Derivative)]
39+
#[derivative(Debug)]
3840
pub struct Context {
3941
// Allows adding custom entries to the context.
4042
entries: Entries,
@@ -48,6 +50,7 @@ pub struct Context {
4850
pub(crate) created_at: Instant,
4951

5052
#[serde(skip)]
53+
#[derivative(Debug = "ignore")]
5154
busy_timer: Arc<Mutex<BusyTimer>>,
5255
}
5356

@@ -193,18 +196,18 @@ impl Context {
193196
}
194197

195198
/// Notify the busy timer that we're waiting on a network request
196-
pub(crate) async fn enter_active_request(&self) {
197-
self.busy_timer.lock().await.increment_active_requests()
199+
pub(crate) fn enter_active_request(&self) {
200+
self.busy_timer.lock().increment_active_requests()
198201
}
199202

200203
/// Notify the busy timer that we stopped waiting on a network request
201-
pub(crate) async fn leave_active_request(&self) {
202-
self.busy_timer.lock().await.decrement_active_requests()
204+
pub(crate) fn leave_active_request(&self) {
205+
self.busy_timer.lock().decrement_active_requests()
203206
}
204207

205208
/// How much time was spent working on the request
206-
pub(crate) async fn busy_time(&self) -> Duration {
207-
self.busy_timer.lock().await.current()
209+
pub(crate) fn busy_time(&self) -> Duration {
210+
self.busy_timer.lock().current()
208211
}
209212
}
210213

apollo-router/src/plugins/coprocessor.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,9 @@ where
516516
};
517517

518518
tracing::debug!(?payload, "externalized output");
519-
request.context.enter_active_request().await;
519+
request.context.enter_active_request();
520520
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
521-
request.context.leave_active_request().await;
521+
request.context.leave_active_request();
522522
tracing::debug!(?co_processor_result, "co-processor returned");
523523
let co_processor_output = co_processor_result?;
524524

@@ -642,9 +642,9 @@ where
642642

643643
// Second, call our co-processor and get a reply.
644644
tracing::debug!(?payload, "externalized output");
645-
response.context.enter_active_request().await;
645+
response.context.enter_active_request();
646646
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
647-
response.context.leave_active_request().await;
647+
response.context.leave_active_request();
648648
tracing::debug!(?co_processor_result, "co-processor returned");
649649
let co_processor_output = co_processor_result?;
650650

@@ -728,9 +728,9 @@ where
728728
};
729729

730730
tracing::debug!(?payload, "externalized output");
731-
request.context.enter_active_request().await;
731+
request.context.enter_active_request();
732732
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
733-
request.context.leave_active_request().await;
733+
request.context.leave_active_request();
734734
tracing::debug!(?co_processor_result, "co-processor returned");
735735
let co_processor_output = co_processor_result?;
736736
validate_coprocessor_output(&co_processor_output, PipelineStep::SubgraphRequest)?;
@@ -858,9 +858,9 @@ where
858858
};
859859

860860
tracing::debug!(?payload, "externalized output");
861-
response.context.enter_active_request().await;
861+
response.context.enter_active_request();
862862
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
863-
response.context.leave_active_request().await;
863+
response.context.leave_active_request();
864864
tracing::debug!(?co_processor_result, "co-processor returned");
865865
let co_processor_output = co_processor_result?;
866866

apollo-router/src/services/subgraph_service.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,13 @@ async fn call_http(
329329
let cloned_service_name = service_name.clone();
330330
let cloned_context = context.clone();
331331
let (parts, body) = async move {
332-
cloned_context.enter_active_request().await;
332+
cloned_context.enter_active_request();
333333
let response = match client
334334
.call(request)
335335
.await {
336336
Err(err) => {
337337
tracing::error!(fetch_error = format!("{err:?}").as_str());
338-
cloned_context.leave_active_request().await;
338+
cloned_context.leave_active_request();
339339

340340
return Err(FetchError::SubrequestHttpError {
341341
status_code: None,
@@ -359,7 +359,7 @@ async fn call_http(
359359
if !content_type_str.contains(APPLICATION_JSON.essence_str())
360360
&& !content_type_str.contains(GRAPHQL_JSON_RESPONSE_HEADER_VALUE)
361361
{
362-
cloned_context.leave_active_request().await;
362+
cloned_context.leave_active_request();
363363

364364
return if !parts.status.is_success() {
365365

@@ -387,7 +387,7 @@ async fn call_http(
387387
.instrument(tracing::debug_span!("aggregate_response_data"))
388388
.await {
389389
Err(err) => {
390-
cloned_context.leave_active_request().await;
390+
cloned_context.leave_active_request();
391391

392392
tracing::error!(fetch_error = format!("{err:?}").as_str());
393393

@@ -400,7 +400,7 @@ async fn call_http(
400400
}, Ok(body) => body,
401401
};
402402

403-
cloned_context.leave_active_request().await;
403+
cloned_context.leave_active_request();
404404

405405
Ok((parts, body))
406406
}.instrument(subgraph_req_span).await?;

0 commit comments

Comments
 (0)