Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
### Add subscription and defer observability: end reason span attributes and termination metrics ([PR #8858](https://github.com/apollographql/router/pull/8858))

Adds new span attributes and metrics to improve observability of streaming responses.

**Span attributes:**

- **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `subgraph_error`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`.
- **`apollo.defer.end_reason`**: Records the reason a deferred query ended. Possible values are `completed` (all deferred chunks were delivered successfully) and `client_disconnect` (the client disconnected before all deferred data was delivered).

Both attributes are added dynamically to router spans only when relevant (i.e., only on requests that actually use subscriptions or `@defer`), rather than being present on every router span.

**Metrics:**

The following counters are emitted when a subscription terminates:

- **`apollo.router.operations.subscriptions.server_close`** (attributes: `subgraph.service.name`): The subgraph gracefully closed the stream.
- **`apollo.router.operations.subscriptions.subgraph_error`** (attributes: `subgraph.service.name`): The subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop).
- **`apollo.router.operations.subscriptions.client_disconnect`** (attributes: `apollo.client.name`): The client disconnected before the subscription ended.
- **`apollo.router.operations.subscriptions.heartbeat_delivery_failed`** (attributes: `apollo.client.name`): A heartbeat could not be delivered to the client.
- **`apollo.router.operations.subscriptions.schema_reload`**: The subscription was terminated because the router schema was updated.
- **`apollo.router.operations.subscriptions.config_reload`**: The subscription was terminated because the router configuration was updated.

The following counter is emitted when a subscription request is rejected:

- **`apollo.router.operations.subscriptions.rejected.limit`**: A new subscription request was rejected because the router has reached its `max_opened_subscriptions` limit.

By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8858
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/subscription/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::services::execution;
use crate::services::execution::QueryPlan;
use crate::services::subgraph::BoxGqlStream;

const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD";
const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD";
pub(crate) const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD";
pub(crate) const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD";
const SUBSCRIPTION_JWT_EXPIRED_EXTENSION_CODE: &str = "SUBSCRIPTION_JWT_EXPIRED";
const SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE: &str = "SUBSCRIPTION_EXECUTION_ERROR";

Expand Down
106 changes: 106 additions & 0 deletions apollo-router/src/plugins/subscription/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::instrument::Instrumented;

use crate::error::Error;
use crate::http_ext;
use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY;
use crate::plugins::subscription::SubscriptionTaskParams;
use crate::query_planner::OperationKind;
use crate::query_planner::SUBSCRIBE_SPAN_NAME;
Expand Down Expand Up @@ -50,6 +51,12 @@ pub(crate) fn fetch_service_handle_subscription(
let service_name = service_name.clone();
let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64;

// Store the subgraph name in context so it's available for metrics at the router layer
let _ = context.insert(
SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY,
service_name.to_string(),
);

// Subscriptions are not supported for connectors, so they always go to the subgraph service
subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument(
tracing::info_span!(
Expand Down Expand Up @@ -91,6 +98,11 @@ fn subscription_with_subgraph_service(
.and_then(|s| s.max_opened_subscriptions)
&& OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions
{
u64_counter!(
"apollo.router.operations.subscriptions.rejected.limit",
"Number of subscription requests rejected because the maximum opened subscriptions limit was reached",
1
);
return Box::pin(async {
Ok((
Value::default(),
Expand Down Expand Up @@ -227,3 +239,97 @@ fn subscription_with_subgraph_service(
Ok((Value::default(), response))
})
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;

use apollo_federation::query_plan::serializable_document::SerializableDocument;
use serde_json_bytes::Value;
use tokio::sync::mpsc;

use super::subscription_with_subgraph_service;
use crate::Context;
use crate::json_ext::Path;
use crate::metrics::FutureMetricsExt;
use crate::plugins::subscription::SubscriptionConfig;
use crate::query_planner::OperationKind;
use crate::query_planner::fetch::Variables;
use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS;
use crate::query_planner::subscription::SubscriptionNode;
use crate::services::SubgraphServiceFactory;
use crate::services::fetch::SubscriptionRequest;

#[tokio::test]
async fn test_subscription_limit_reached_emits_metric() {
async {
let original_count = OPENED_SUBSCRIPTIONS.swap(1, Ordering::Relaxed);

let subscription_config = SubscriptionConfig {
max_opened_subscriptions: Some(1),
..Default::default()
};

let subscription_node = SubscriptionNode {
service_name: Arc::from("subgraph-a"),
variable_usages: Vec::new(),
operation: SerializableDocument::from_string("subscription { onEvent { id } }"),
operation_name: None,
operation_kind: OperationKind::Subscription,
input_rewrites: None,
output_rewrites: None,
};

let (sender, _receiver) = mpsc::channel(1);
let supergraph_request = Arc::new(
http::Request::builder()
.body(crate::graphql::Request::builder().build())
.unwrap(),
);

let schema = Arc::new(
crate::spec::Schema::parse(
include_str!("../../testdata/minimal_supergraph.graphql"),
&Default::default(),
)
.expect("could not parse schema"),
);

let factory = Arc::new(SubgraphServiceFactory {
services: Arc::new(HashMap::new()),
});

let request = SubscriptionRequest::builder()
.context(Context::new())
.subscription_node(subscription_node)
.supergraph_request(supergraph_request)
.variables(Variables::default())
.current_dir(Path(Vec::new()))
.sender(sender)
.subscription_config(subscription_config)
.build();

let (data, errors) = subscription_with_subgraph_service(schema, factory, request)
.await
.expect("call should not fail");

assert_eq!(data, Value::default());
assert_eq!(errors.len(), 1);
assert_eq!(
errors[0].message,
"can't open new subscription, limit reached"
);
assert_eq!(
errors[0].extensions.get("code").and_then(|v| v.as_str()),
Some("SUBSCRIPTION_MAX_LIMIT")
);
assert_counter!("apollo.router.operations.subscriptions.rejected.limit", 1);

OPENED_SUBSCRIPTIONS.store(original_count, Ordering::Relaxed);
}
.with_metrics()
.await;
}
}
4 changes: 4 additions & 0 deletions apollo-router/src/plugins/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub(crate) mod notification;
pub(crate) mod subgraph;

pub(crate) use callback::SUBSCRIPTION_CALLBACK_HMAC_KEY;
pub(crate) use execution::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE;
pub(crate) use execution::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE;
pub(crate) use execution::SubscriptionExecutionLayer;
pub(crate) use execution::SubscriptionTaskParams;
pub(crate) use fetch::fetch_service_handle_subscription;
Expand All @@ -45,6 +47,8 @@ pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN_NAME: &str = "subscription";
pub(crate) const SUBSCRIPTION_ERROR_EXTENSION_KEY: &str = "apollo::subscriptions::fatal_error";
pub(crate) const SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS: &str =
"apollo.subscription.custom_connection_params";
pub(crate) const SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY: &str =
"apollo::subscription::subgraph_name";

#[derive(Debug, Clone)]
pub(crate) struct Subscription {
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/telemetry/span_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SpanMode {
"otel.status_code" = ::tracing::field::Empty,
"apollo_private.duration_ns" = ::tracing::field::Empty,
"apollo_private.http.request_headers" = ::tracing::field::Empty,
"apollo_private.http.response_headers" = ::tracing::field::Empty
"apollo_private.http.response_headers" = ::tracing::field::Empty,
);
span
}
Expand Down Expand Up @@ -163,7 +163,7 @@ impl SpanMode {
apollo_private.graphql.variables = Telemetry::filter_variables_values(
&request.supergraph_request.body().variables,
&send_variable_values,
)
),
)
}
}
Expand Down
Loading