diff --git a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md new file mode 100644 index 0000000000..181f0f6712 --- /dev/null +++ b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md @@ -0,0 +1,27 @@ +### Add subscription and defer observability: end reason span attributes and termination metrics ([PR #8858](https://github.com/apollographql/router/pull/8858)) + +Adds new span attributes and metrics to improve observability of streaming responses. + +**Span attributes:** + +- **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `subgraph_error`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`. +- **`apollo.defer.end_reason`**: Records the reason a deferred query ended. Possible values are `completed` (all deferred chunks were delivered successfully) and `client_disconnect` (the client disconnected before all deferred data was delivered). + +Both attributes are added dynamically to router spans only when relevant (i.e., only on requests that actually use subscriptions or `@defer`), rather than being present on every router span. + +**Metrics:** + +The following counters are emitted when a subscription terminates: + +- **`apollo.router.operations.subscriptions.server_close`** (attributes: `subgraph.service.name`): The subgraph gracefully closed the stream. +- **`apollo.router.operations.subscriptions.subgraph_error`** (attributes: `subgraph.service.name`): The subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop). +- **`apollo.router.operations.subscriptions.client_disconnect`** (attributes: `apollo.client.name`): The client disconnected before the subscription ended. +- **`apollo.router.operations.subscriptions.heartbeat_delivery_failed`** (attributes: `apollo.client.name`): A heartbeat could not be delivered to the client. +- **`apollo.router.operations.subscriptions.schema_reload`**: The subscription was terminated because the router schema was updated. +- **`apollo.router.operations.subscriptions.config_reload`**: The subscription was terminated because the router configuration was updated. + +The following counter is emitted when a subscription request is rejected: + +- **`apollo.router.operations.subscriptions.rejected.limit`**: A new subscription request was rejected because the router has reached its `max_opened_subscriptions` limit. + +By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8858 \ No newline at end of file diff --git a/apollo-router/src/plugins/subscription/execution.rs b/apollo-router/src/plugins/subscription/execution.rs index 45594e7ee9..856b06025a 100644 --- a/apollo-router/src/plugins/subscription/execution.rs +++ b/apollo-router/src/plugins/subscription/execution.rs @@ -35,8 +35,8 @@ use crate::services::execution; use crate::services::execution::QueryPlan; use crate::services::subgraph::BoxGqlStream; -const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD"; -const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD"; +pub(crate) const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD"; +pub(crate) const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD"; const SUBSCRIPTION_JWT_EXPIRED_EXTENSION_CODE: &str = "SUBSCRIPTION_JWT_EXPIRED"; const SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE: &str = "SUBSCRIPTION_EXECUTION_ERROR"; diff --git a/apollo-router/src/plugins/subscription/fetch.rs b/apollo-router/src/plugins/subscription/fetch.rs index 737cdfeafb..b06bed120c 100644 --- a/apollo-router/src/plugins/subscription/fetch.rs +++ b/apollo-router/src/plugins/subscription/fetch.rs @@ -14,6 +14,7 @@ use tracing::instrument::Instrumented; use crate::error::Error; use crate::http_ext; +use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY; use crate::plugins::subscription::SubscriptionTaskParams; use crate::query_planner::OperationKind; use crate::query_planner::SUBSCRIBE_SPAN_NAME; @@ -50,6 +51,12 @@ pub(crate) fn fetch_service_handle_subscription( let service_name = service_name.clone(); let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64; + // Store the subgraph name in context so it's available for metrics at the router layer + let _ = context.insert( + SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY, + service_name.to_string(), + ); + // Subscriptions are not supported for connectors, so they always go to the subgraph service subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument( tracing::info_span!( @@ -91,6 +98,11 @@ fn subscription_with_subgraph_service( .and_then(|s| s.max_opened_subscriptions) && OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions { + u64_counter!( + "apollo.router.operations.subscriptions.rejected.limit", + "Number of subscription requests rejected because the maximum opened subscriptions limit was reached", + 1 + ); return Box::pin(async { Ok(( Value::default(), @@ -227,3 +239,97 @@ fn subscription_with_subgraph_service( Ok((Value::default(), response)) }) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::Ordering; + + use apollo_federation::query_plan::serializable_document::SerializableDocument; + use serde_json_bytes::Value; + use tokio::sync::mpsc; + + use super::subscription_with_subgraph_service; + use crate::Context; + use crate::json_ext::Path; + use crate::metrics::FutureMetricsExt; + use crate::plugins::subscription::SubscriptionConfig; + use crate::query_planner::OperationKind; + use crate::query_planner::fetch::Variables; + use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS; + use crate::query_planner::subscription::SubscriptionNode; + use crate::services::SubgraphServiceFactory; + use crate::services::fetch::SubscriptionRequest; + + #[tokio::test] + async fn test_subscription_limit_reached_emits_metric() { + async { + let original_count = OPENED_SUBSCRIPTIONS.swap(1, Ordering::Relaxed); + + let subscription_config = SubscriptionConfig { + max_opened_subscriptions: Some(1), + ..Default::default() + }; + + let subscription_node = SubscriptionNode { + service_name: Arc::from("subgraph-a"), + variable_usages: Vec::new(), + operation: SerializableDocument::from_string("subscription { onEvent { id } }"), + operation_name: None, + operation_kind: OperationKind::Subscription, + input_rewrites: None, + output_rewrites: None, + }; + + let (sender, _receiver) = mpsc::channel(1); + let supergraph_request = Arc::new( + http::Request::builder() + .body(crate::graphql::Request::builder().build()) + .unwrap(), + ); + + let schema = Arc::new( + crate::spec::Schema::parse( + include_str!("../../testdata/minimal_supergraph.graphql"), + &Default::default(), + ) + .expect("could not parse schema"), + ); + + let factory = Arc::new(SubgraphServiceFactory { + services: Arc::new(HashMap::new()), + }); + + let request = SubscriptionRequest::builder() + .context(Context::new()) + .subscription_node(subscription_node) + .supergraph_request(supergraph_request) + .variables(Variables::default()) + .current_dir(Path(Vec::new())) + .sender(sender) + .subscription_config(subscription_config) + .build(); + + let (data, errors) = subscription_with_subgraph_service(schema, factory, request) + .await + .expect("call should not fail"); + + assert_eq!(data, Value::default()); + assert_eq!(errors.len(), 1); + assert_eq!( + errors[0].message, + "can't open new subscription, limit reached" + ); + assert_eq!( + errors[0].extensions.get("code").and_then(|v| v.as_str()), + Some("SUBSCRIPTION_MAX_LIMIT") + ); + assert_counter!("apollo.router.operations.subscriptions.rejected.limit", 1); + + OPENED_SUBSCRIPTIONS.store(original_count, Ordering::Relaxed); + } + .with_metrics() + .await; + } +} diff --git a/apollo-router/src/plugins/subscription/mod.rs b/apollo-router/src/plugins/subscription/mod.rs index 7ccecf1799..5297c16eb3 100644 --- a/apollo-router/src/plugins/subscription/mod.rs +++ b/apollo-router/src/plugins/subscription/mod.rs @@ -36,6 +36,8 @@ pub(crate) mod notification; pub(crate) mod subgraph; pub(crate) use callback::SUBSCRIPTION_CALLBACK_HMAC_KEY; +pub(crate) use execution::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE; +pub(crate) use execution::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE; pub(crate) use execution::SubscriptionExecutionLayer; pub(crate) use execution::SubscriptionTaskParams; pub(crate) use fetch::fetch_service_handle_subscription; @@ -45,6 +47,8 @@ pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN_NAME: &str = "subscription"; pub(crate) const SUBSCRIPTION_ERROR_EXTENSION_KEY: &str = "apollo::subscriptions::fatal_error"; pub(crate) const SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS: &str = "apollo.subscription.custom_connection_params"; +pub(crate) const SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY: &str = + "apollo::subscription::subgraph_name"; #[derive(Debug, Clone)] pub(crate) struct Subscription { diff --git a/apollo-router/src/plugins/telemetry/span_factory.rs b/apollo-router/src/plugins/telemetry/span_factory.rs index b7a0f044a5..cecacde346 100644 --- a/apollo-router/src/plugins/telemetry/span_factory.rs +++ b/apollo-router/src/plugins/telemetry/span_factory.rs @@ -93,7 +93,7 @@ impl SpanMode { "otel.status_code" = ::tracing::field::Empty, "apollo_private.duration_ns" = ::tracing::field::Empty, "apollo_private.http.request_headers" = ::tracing::field::Empty, - "apollo_private.http.response_headers" = ::tracing::field::Empty + "apollo_private.http.response_headers" = ::tracing::field::Empty, ); span } @@ -163,7 +163,7 @@ impl SpanMode { apollo_private.graphql.variables = Telemetry::filter_variables_values( &request.supergraph_request.body().variables, &send_variable_values, - ) + ), ) } } diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 5e191ffbb9..8c844c6639 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -10,15 +10,24 @@ use serde::Serialize; use serde_json_bytes::Value; use tokio_stream::once; use tokio_stream::wrappers::IntervalStream; +use tracing::Span; use crate::graphql; +use crate::plugins::subscription::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE; use crate::plugins::subscription::SUBSCRIPTION_ERROR_EXTENSION_KEY; +use crate::plugins::subscription::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE; +use crate::plugins::telemetry::dynamic_attribute::SpanDynAttribute; #[cfg(test)] const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(10); #[cfg(not(test))] const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +const SUBSCRIPTION_END_REASON_KEY: opentelemetry::Key = + opentelemetry::Key::from_static_str("apollo.subscription.end_reason"); +const DEFER_END_REASON_KEY: opentelemetry::Key = + opentelemetry::Key::from_static_str("apollo.defer.end_reason"); + #[derive(thiserror::Error, Debug)] pub(crate) enum Error { #[error("serialization error")] @@ -50,6 +59,18 @@ pub(crate) struct Multipart { is_first_chunk: bool, is_terminated: bool, mode: ProtocolMode, + /// Tracks whether a heartbeat was sent but not yet followed by another poll. + /// Used to detect if a heartbeat was the last thing sent before connection closed. + heartbeat_pending: bool, + /// The span captured at creation time, used to record attributes on connection close. + creation_span: Span, + /// The end reason determined during polling, written to the span on Drop. + /// If `None` when dropped and `!is_terminated`, an abnormal reason is inferred. + end_reason: Option, + /// The subgraph name for subscription streams, used in server_close and subgraph_error metrics. + subgraph_name: Option, + /// The client name for subscription streams, used in client_disconnect and heartbeat_delivery_failed metrics. + client_name: Option, } impl Multipart { @@ -73,6 +94,219 @@ impl Multipart { is_first_chunk: true, is_terminated: false, mode, + heartbeat_pending: false, + // Capture the current span so we can record attributes later + creation_span: Span::current(), + end_reason: None, + subgraph_name: None, + client_name: None, + } + } + + /// Set the subgraph name for server_close and subgraph_error metrics attribution. + pub(crate) fn with_subgraph_name(mut self, name: Option) -> Self { + self.subgraph_name = name; + self + } + + /// Set the client name for client_disconnect and heartbeat_delivery_failed metrics attribution. + pub(crate) fn with_client_name(mut self, name: Option) -> Self { + self.client_name = name; + self + } + + /// Checks if the errors indicate a reload-related termination and returns the appropriate end reason + fn detect_reload_end_reason(errors: &[graphql::Error]) -> Option { + for error in errors { + match error.extensions.get("code").and_then(|v| v.as_str()) { + Some(code) if code == SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE => { + return Some(SubscriptionEndReason::SchemaReload); + } + Some(code) if code == SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE => { + return Some(SubscriptionEndReason::ConfigReload); + } + _ => {} + } + } + None + } + + /// Infer the end reason for a subscription that was not terminated properly. + fn infer_abnormal_end_reason(&self) -> EndReason { + match self.mode { + ProtocolMode::Subscription => { + // Stream wasn't terminated properly + let reason = if self.heartbeat_pending { + // Heartbeat was the last thing sent - likely failed to deliver + SubscriptionEndReason::HeartbeatDeliveryFailed + } else { + // Connection closed after a message was sent + SubscriptionEndReason::ClientDisconnect + }; + EndReason::Subscription(reason) + } + ProtocolMode::Defer => { + // Defer stream wasn't terminated properly - client disconnected + EndReason::Defer(DeferEndReason::ClientDisconnect) + } + } + } +} + +/// Unified end reason for both subscription and defer modes, +/// stored in the Multipart struct and written to the span on Drop. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum EndReason { + Subscription(SubscriptionEndReason), + Defer(DeferEndReason), +} + +/// Reasons why a subscription ended +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum SubscriptionEndReason { + /// The subgraph gracefully closed the subscription stream. This fires when + /// the terminating response is the "magic empty response" (no data, no errors, + /// no extensions) that `filter_stream` injects after the subgraph sends a + /// WebSocket `complete` message. + ServerClose, + /// The subgraph closed the subscription stream unexpectedly (e.g. process + /// killed, network drop). This fires when the terminating response contains + /// errors (such as `WEBSOCKET_MESSAGE_ERROR` or `WEBSOCKET_CLOSE_ERROR`) + /// indicating the connection was lost rather than cleanly completed. + SubgraphError, + /// Heartbeat could not be delivered - client likely disconnected + HeartbeatDeliveryFailed, + /// Client disconnected unexpectedly (after a message was sent) + ClientDisconnect, + /// Subscription terminated due to router schema reload + SchemaReload, + /// Subscription terminated due to router configuration reload + ConfigReload, +} + +impl SubscriptionEndReason { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::ServerClose => "server_close", + Self::SubgraphError => "subgraph_error", + Self::HeartbeatDeliveryFailed => "heartbeat_delivery_failed", + Self::ClientDisconnect => "client_disconnect", + Self::SchemaReload => "schema_reload", + Self::ConfigReload => "config_reload", + } + } + + pub(crate) fn as_value(&self) -> opentelemetry::Value { + opentelemetry::Value::String(self.as_str().into()) + } +} + +/// Reasons why a defer request ended +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DeferEndReason { + /// All deferred chunks were delivered successfully + Completed, + /// Client disconnected before all deferred data was delivered + ClientDisconnect, +} + +impl DeferEndReason { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Completed => "completed", + Self::ClientDisconnect => "client_disconnect", + } + } + + pub(crate) fn as_value(&self) -> opentelemetry::Value { + opentelemetry::Value::String(self.as_str().into()) + } +} + +impl Drop for Multipart { + fn drop(&mut self) { + // Determine the end reason: use the one recorded during polling if available, + // otherwise infer an abnormal termination reason. + let end_reason = self + .end_reason + .unwrap_or_else(|| self.infer_abnormal_end_reason()); + + match end_reason { + EndReason::Subscription(reason) => { + self.creation_span + .set_span_dyn_attribute(SUBSCRIPTION_END_REASON_KEY, reason.as_value()); + self.emit_subscription_termination_metric(reason); + } + EndReason::Defer(reason) => { + self.creation_span + .set_span_dyn_attribute(DEFER_END_REASON_KEY, reason.as_value()); + } + } + } +} + +impl Multipart { + /// Emit a counter metric for subscription termination events that require observability. + fn emit_subscription_termination_metric(&self, reason: SubscriptionEndReason) { + match reason { + SubscriptionEndReason::ServerClose => { + let subgraph_name = self + .subgraph_name + .as_deref() + .unwrap_or("unknown") + .to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.server_close", + "Subscription terminated because the subgraph gracefully closed the stream", + 1, + subgraph.service.name = subgraph_name + ); + } + SubscriptionEndReason::SubgraphError => { + let subgraph_name = self + .subgraph_name + .as_deref() + .unwrap_or("unknown") + .to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + "Subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop)", + 1, + subgraph.service.name = subgraph_name + ); + } + SubscriptionEndReason::ClientDisconnect => { + let client_name = self.client_name.as_deref().unwrap_or("unknown").to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + "Subscription terminated because the client disconnected", + 1, + apollo.client.name = client_name + ); + } + SubscriptionEndReason::HeartbeatDeliveryFailed => { + let client_name = self.client_name.as_deref().unwrap_or("unknown").to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.heartbeat_delivery_failed", + "Subscription terminated because a heartbeat could not be delivered to the client", + 1, + apollo.client.name = client_name + ); + } + SubscriptionEndReason::SchemaReload => { + u64_counter!( + "apollo.router.operations.subscriptions.schema_reload", + "Subscription terminated because the router schema was updated", + 1 + ); + } + SubscriptionEndReason::ConfigReload => { + u64_counter!( + "apollo.router.operations.subscriptions.config_reload", + "Subscription terminated because the router configuration was updated", + 1 + ); + } } } } @@ -91,6 +325,10 @@ impl Stream for Multipart { Poll::Ready(message) => match message { Some(MessageKind::Heartbeat) => { // It's the ticker for heartbeat for subscription + // Mark that we're sending a heartbeat - if the stream is dropped before + // the next poll, we know the heartbeat delivery likely failed + self.heartbeat_pending = true; + let buf = if self.is_first_chunk { self.is_first_chunk = false; Bytes::from_static( @@ -105,8 +343,15 @@ impl Stream for Multipart { Poll::Ready(Some(Ok(buf))) } Some(MessageKind::Message(mut response)) => { + // Clear heartbeat pending flag since we received a message poll + self.heartbeat_pending = false; + let is_still_open = response.has_next.unwrap_or(false) || response.subscribed.unwrap_or(false); + + // Check for reload-related termination before errors are moved + let maybe_end_reason = Self::detect_reload_end_reason(&response.errors); + let mut buf = if self.is_first_chunk { self.is_first_chunk = false; Vec::from(&b"\r\n--graphql\r\ncontent-type: application/json\r\n\r\n"[..]) @@ -114,6 +359,12 @@ impl Stream for Multipart { Vec::from(&b"\r\ncontent-type: application/json\r\n\r\n"[..]) }; + // Track whether this is an unexpected subgraph error (e.g. WebSocket + // WEBSOCKET_MESSAGE_ERROR or WEBSOCKET_CLOSE_ERROR from a killed process + // or network drop). This is set inside the Subscription branch below, + // before errors are consumed. + let mut has_subgraph_errors = false; + match self.mode { ProtocolMode::Subscription => { let is_transport_error = @@ -126,9 +377,18 @@ impl Stream for Multipart { && response.extensions.is_empty() { self.is_terminated = true; + self.end_reason = Some(EndReason::Subscription( + SubscriptionEndReason::ServerClose, + )); return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); } + // Capture before errors are moved: if the response has errors + // and they're not from a router-initiated transport error + // (JWT/execution), these are from the subgraph (WebSocket layer). + has_subgraph_errors = + !response.errors.is_empty() && !is_transport_error; + let response = if is_transport_error { SubscriptionPayload { errors: std::mem::take(&mut response.errors), @@ -159,13 +419,24 @@ impl Stream for Multipart { buf.extend_from_slice(b"\r\n--graphql"); } else { self.is_terminated = true; + self.end_reason = Some(match self.mode { + ProtocolMode::Subscription => EndReason::Subscription( + maybe_end_reason.unwrap_or(if has_subgraph_errors { + SubscriptionEndReason::SubgraphError + } else { + SubscriptionEndReason::ServerClose + }), + ), + ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), + }); buf.extend_from_slice(b"\r\n--graphql--\r\n"); } Poll::Ready(Some(Ok(buf.into()))) } Some(MessageKind::Eof) => { - // If the stream ends or is empty + // If the stream ends or is empty - this is a clean termination + self.heartbeat_pending = false; let buf = if self.is_first_chunk { self.is_first_chunk = false; Bytes::from_static( @@ -177,11 +448,23 @@ impl Stream for Multipart { ) }; self.is_terminated = true; + if self.mode == ProtocolMode::Subscription { + self.end_reason = + Some(EndReason::Subscription(SubscriptionEndReason::ServerClose)); + } Poll::Ready(Some(Ok(buf))) } None => { + // Stream ended - this is a clean termination + self.heartbeat_pending = false; self.is_terminated = true; + self.end_reason = Some(match self.mode { + ProtocolMode::Subscription => { + EndReason::Subscription(SubscriptionEndReason::ServerClose) + } + ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), + }); Poll::Ready(None) } }, @@ -192,10 +475,525 @@ impl Stream for Multipart { #[cfg(test)] mod tests { + use std::sync::Arc; + use std::sync::Mutex; + use futures::stream; + use opentelemetry::KeyValue; use serde_json_bytes::ByteString; + use tracing_subscriber::Layer; + use tracing_subscriber::layer::Context; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::registry::LookupSpan; use super::*; + use crate::metrics::FutureMetricsExt; + use crate::plugins::telemetry::dynamic_attribute::DynAttributeLayer; + use crate::plugins::telemetry::otel; + use crate::plugins::telemetry::otel::OtelData; + + #[derive(Clone, Default)] + struct EndReasonCapture { + captured_reason: Arc>>, + } + + impl Layer for EndReasonCapture + where + S: tracing_core::Subscriber + for<'lookup> LookupSpan<'lookup>, + { + fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(id) + && let Some(data) = span.extensions().get::() + && let Some(attributes) = data.builder.attributes.as_ref() + { + *self.captured_reason.lock().unwrap() = attributes.iter().find_map(|attr| { + let key = &attr.key; + (*key == SUBSCRIPTION_END_REASON_KEY || *key == DEFER_END_REASON_KEY) + .then(|| attr.clone()) + }); + } + } + } + + /// Helper to set up tracing with DynAttributeLayer and EndReasonCapture + fn setup_tracing() -> (tracing::subscriber::DefaultGuard, EndReasonCapture) { + let layer = EndReasonCapture::default(); + let subscriber = tracing_subscriber::Registry::default() + .with(DynAttributeLayer::new()) + .with(otel::layer().force_sampling()) + .with(layer.clone()); + let guard = tracing::subscriber::set_default(subscriber); + (guard, layer) + } + + #[tokio::test] + async fn test_subscription_end_reason_server_close_empty_response() { + async { + // Test: Server closes connection successfully (empty response) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let span_guard = span.enter(); + + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Empty response signals server-side close + graphql::Response::builder().build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.server_close", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_subscription_end_reason_server_close_with_final_data() { + async { + // Test: Server closes normally with final data (subscribed=false, no errors) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("final"))) + .subscribed(false) // Server close with final data + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.server_close", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_end_reason_server_close_via_eof() { + async { + // Test: Stream ends via EOF (empty stream) — the Eof sentinel fires, + // which sets ServerClose (same as all other server-side termination paths). + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages (will get EOF) + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.server_close", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_end_reason_heartbeat_delivery_failed() { + async { + // Test: Stream dropped while heartbeat was pending + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + use tokio::time::sleep; + + let (tx, rx) = tokio::sync::mpsc::channel::(1); + let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_client_name(Some("test_client".to_string())); + + // Spawn a task that never sends anything, then drops the sender + tokio::spawn(async move { + sleep(std::time::Duration::from_millis(100)).await; + drop(tx); + }); + + // Wait for a heartbeat to be sent + let heartbeat = + "\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"; + while let Some(resp) = protocol.next().await { + let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); + if res == heartbeat + || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") + { + // Got a heartbeat, now drop the protocol while heartbeat is pending + assert!(protocol.heartbeat_pending); + break; + } + } + + // Protocol is dropped here with heartbeat_pending = true + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::HeartbeatDeliveryFailed.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.heartbeat_delivery_failed", + 1, + "apollo.client.name" = "test_client" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_end_reason_client_disconnect() { + async { + // Test: Stream dropped after a message (not heartbeat) was sent + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_client_name(Some("test_client".to_string())); + + // Get the first message + let resp = protocol.next().await; + assert!(resp.is_some()); + + // Verify heartbeat_pending is false (we got a message, not heartbeat) + assert!(!protocol.heartbeat_pending); + + // Protocol is dropped here without being terminated + // and heartbeat_pending = false + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ClientDisconnect.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + 1, + "apollo.client.name" = "test_client" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_subscription_end_reason_schema_reload() { + async { + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a schema reload") + .extension_code("SUBSCRIPTION_SCHEMA_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SchemaReload.as_value() + )) + ); + + assert_counter!("apollo.router.operations.subscriptions.schema_reload", 1); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_subscription_end_reason_config_reload() { + async { + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Config reload error response + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a configuration reload") + .extension_code("SUBSCRIPTION_CONFIG_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ConfigReload.as_value() + )) + ); + + assert_counter!("apollo.router.operations.subscriptions.config_reload", 1); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_defer_end_reason_completed() { + // Test: Defer completes normally with all chunks delivered (has_next=false) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("initial"))) + .has_next(true) + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + "deferred", + ))) + .has_next(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) + ); + } + + #[tokio::test] + async fn test_defer_end_reason_completed_single_chunk() { + // Test: Defer completes with a single chunk (has_next=false) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .has_next(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) + ); + } + + #[tokio::test] + async fn test_defer_end_reason_completed_empty_stream() { + // Test: Defer completes when the stream is empty (None case) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) + ); + } + + #[tokio::test] + async fn test_defer_end_reason_client_disconnect() { + // Test: Client disconnects before all deferred data is delivered + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("initial"))) + .has_next(true) // More data expected + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + "deferred1", + ))) + .has_next(true) // Still more data expected + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Read only the first chunk, then drop (simulating client disconnect) + let resp = protocol.next().await; + assert!(resp.is_some()); + + // Stream is NOT terminated (has_next was true) + assert!(!protocol.is_terminated); + + // Drop the protocol - simulates client disconnect + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::ClientDisconnect.as_value() + )) + ); + } #[tokio::test] async fn test_heartbeat_and_boundaries() { @@ -306,4 +1104,239 @@ mod tests { } } } + + #[tokio::test] + async fn test_heartbeat_pending_flag() { + use tokio::time::sleep; + + // Create a subscription stream that will have a delay to allow heartbeats + let (tx, rx) = tokio::sync::mpsc::channel::(1); + let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); + + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + let heartbeat = + String::from("\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"); + + // Spawn a task to send a response after a delay (longer than heartbeat interval) + tokio::spawn(async move { + // Wait longer than the test heartbeat interval (10ms) + sleep(std::time::Duration::from_millis(30)).await; + let _ = tx + .send( + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + String::from("test"), + ))) + .subscribed(false) + .build(), + ) + .await; + }); + + // Read items from the stream + let mut got_heartbeat = false; + let mut got_message = false; + while let Some(resp) = protocol.next().await { + let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); + if res == heartbeat || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") { + // After receiving a heartbeat, heartbeat_pending should be true + assert!( + protocol.heartbeat_pending, + "heartbeat_pending should be true after yielding heartbeat" + ); + got_heartbeat = true; + } else if res.contains("\"test\"") { + // After receiving a message, heartbeat_pending should be false + assert!( + !protocol.heartbeat_pending, + "heartbeat_pending should be false after receiving message" + ); + got_message = true; + break; + } + } + assert!(got_heartbeat, "should have received at least one heartbeat"); + assert!(got_message, "should have received the test message"); + } + + #[test] + fn test_defer_mode_drop_records_client_disconnect() { + // Defer mode should record client_disconnect on drop if not terminated + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + drop(protocol); + drop(_span_guard); + drop(span); + let defer_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + defer_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::ClientDisconnect.as_value() + )) + ); + } + + #[tokio::test] + async fn test_end_reason_subgraph_error() { + async { + // Test: Subscription terminated because the subgraph WebSocket connection + // was lost unexpectedly (e.g. process killed). The terminating response + // has errors (like WEBSOCKET_MESSAGE_ERROR) and subscribed=false. + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + + let responses = vec![ + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("cannot read message from websocket") + .extension_code("WEBSOCKET_MESSAGE_ERROR") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("flaky_subgraph".to_string())); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SubgraphError.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + 1, + "subgraph.service.name" = "flaky_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_end_reason_subgraph_error_with_close_code() { + async { + // Test: Subscription terminated because the subgraph WebSocket closed + // with an abnormal close code, producing a WEBSOCKET_CLOSE_ERROR. + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + + let responses = vec![graphql::Response::builder() + .error( + graphql::Error::builder() + .message( + "websocket connection has been closed with error code '1011' and reason 'internal error'", + ) + .extension_code("WEBSOCKET_CLOSE_ERROR") + .build(), + ) + .subscribed(false) + .build()]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("error_subgraph".to_string())); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SubgraphError.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + 1, + "subgraph.service.name" = "error_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_server_close_metric_defaults_to_unknown_subgraph() { + async { + // Test: server_close metric uses "unknown" when no subgraph name is set + let (_guard, _layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + assert_counter!( + "apollo.router.operations.subscriptions.server_close", + 1, + "subgraph.service.name" = "unknown" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_client_disconnect_metric_defaults_to_unknown_client() { + async { + // Test: client_disconnect metric uses "unknown" when no client name is set + let (_guard, _layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + let resp = protocol.next().await; + assert!(resp.is_some()); + + drop(protocol); + drop(_span_guard); + drop(span); + + assert_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + 1, + "apollo.client.name" = "unknown" + ); + } + .with_metrics() + .await; + } } diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 7aa93e0bd8..6577f4aa36 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -51,6 +51,8 @@ use crate::layers::DEFAULT_BUFFER_SIZE; use crate::layers::ServiceBuilderExt; #[cfg(test)] use crate::plugin::test::MockSupergraphService; +use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY; +use crate::plugins::telemetry::CLIENT_NAME; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_BODY; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI; @@ -377,13 +379,22 @@ impl RouterService { ACCEL_BUFFERING_HEADER_VALUE.clone(), ); let response = match response.subscribed { - Some(true) => http::Response::from_parts( - parts, - router::body::from_result_stream(Multipart::new( - body, - ProtocolMode::Subscription, - )), - ), + Some(true) => { + let subgraph_name: Option = context + .get(SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY) + .ok() + .flatten(); + let client_name: Option = + context.get(CLIENT_NAME).ok().flatten(); + http::Response::from_parts( + parts, + router::body::from_result_stream( + Multipart::new(body, ProtocolMode::Subscription) + .with_subgraph_name(subgraph_name) + .with_client_name(client_name), + ), + ) + } _ => http::Response::from_parts( parts, router::body::from_result_stream(Multipart::new( diff --git a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx index 0d9c7eac83..9efce1fc5a 100644 --- a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx +++ b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx @@ -242,6 +242,7 @@ Similar to the initial call to Uplink, the router does not record metrics for ca - `apollo.router.opened.subscriptions` - Number of different opened subscriptions (not the number of clients with an opened subscriptions in case it's deduplicated). This metric contains `graphql.operation.name` label to know exactly which subscription is still opened. - `apollo.router.skipped.event.count` - Number of subscription events that has been skipped because too many events have been received from the subgraph but not yet sent to the client. +- `apollo.router.operations.subscriptions.rejected.limit` - Number of subscription requests rejected because Apollo Router has reached its [`max_opened_subscriptions`](/router/executing-operations/subscription-support/#limiting-the-number-of-client-connections) limit. ## Batching diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 7ab898608f..81428c7909 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -453,4 +453,28 @@ subscription: #highlight-end ``` -If a client attempts to execute a subscription on your router when it's already at `max_open_subscriptions`, the router rejects the client's request with an error. +If a client attempts to execute a subscription on Apollo Router when it's already at `max_opened_subscriptions`, the router rejects the client's request with an error. + +Each rejected subscription increments the `apollo.router.operations.subscriptions.rejected.limit` counter metric, which you can use to monitor how often clients hit the limit. + +### Tracking causes of ending subscriptions + +Apollo Router records an `apollo.subscription.end_reason` attribute on the `router` span for subscription operations. This attribute indicates why a subscription ended, which helps you debug and monitor subscription lifecycles using observability tools. + +The `apollo.subscription.end_reason` attribute can have these values: + +| Value | Description | +|-------|-------------| +| `server_close` | The server (subgraph) closed the connection successfully. This occurs when the subgraph ends the subscription. | +| `subgraph_error` | The subgraph closed the subscription stream unexpectedly (e.g. process killed, network drop). | +| `heartbeat_delivery_failed` | Apollo Router failed to deliver a heartbeat message to the client. This usually indicates the client disconnected unexpectedly when a keepalive heartbeat was sent. | +| `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | +| `schema_reload` | The subscription was terminated because Apollo Router's supergraph schema was updated. Clients can reconnect to resume the subscription. | +| `config_reload` | The subscription was terminated because Apollo Router's configuration was updated. Clients can reconnect to resume the subscription. | + +#### Example: Monitor subscription health + +Use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: + +- Frequent `subgraph_error` values might indicate issues with your subgraph subscription sources closing unexpectedly. +- The `server_close` value indicates healthy subscription lifecycles where the subgraph properly signals closure.