From 271e4fe95f89956ebdf435e6428c18e88ff0544c Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Fri, 30 Jan 2026 17:31:43 +0000 Subject: [PATCH 01/15] Implement subscription end reason attribute --- .../src/plugins/telemetry/span_factory.rs | 6 +- apollo-router/src/protocols/multipart.rs | 388 +++++++++++++++++- 2 files changed, 391 insertions(+), 3 deletions(-) diff --git a/apollo-router/src/plugins/telemetry/span_factory.rs b/apollo-router/src/plugins/telemetry/span_factory.rs index b7a0f044a5..b7ecff75fd 100644 --- a/apollo-router/src/plugins/telemetry/span_factory.rs +++ b/apollo-router/src/plugins/telemetry/span_factory.rs @@ -93,7 +93,8 @@ impl SpanMode { "otel.status_code" = ::tracing::field::Empty, "apollo_private.duration_ns" = ::tracing::field::Empty, "apollo_private.http.request_headers" = ::tracing::field::Empty, - "apollo_private.http.response_headers" = ::tracing::field::Empty + "apollo_private.http.response_headers" = ::tracing::field::Empty, + "apollo.subscription.end_reason" = ::tracing::field::Empty, ); span } @@ -110,6 +111,7 @@ impl SpanMode { "apollo_private.http.request_headers" = ::tracing::field::Empty, "apollo_private.http.response_headers" = ::tracing::field::Empty, "apollo_private.request" = true, + "apollo.subscription.end_reason" = ::tracing::field::Empty, ) } } @@ -163,7 +165,7 @@ impl SpanMode { apollo_private.graphql.variables = Telemetry::filter_variables_values( &request.supergraph_request.body().variables, &send_variable_values, - ) + ), ) } } diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 5e191ffbb9..3fe0ba0e97 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -10,6 +10,7 @@ use serde::Serialize; use serde_json_bytes::Value; use tokio_stream::once; use tokio_stream::wrappers::IntervalStream; +use tracing::Span; use crate::graphql; use crate::plugins::subscription::SUBSCRIPTION_ERROR_EXTENSION_KEY; @@ -50,6 +51,11 @@ pub(crate) struct Multipart { is_first_chunk: bool, is_terminated: bool, mode: ProtocolMode, + /// Tracks whether a heartbeat was sent but not yet followed by another poll. + /// Used to detect if a heartbeat was the last thing sent before connection closed. + heartbeat_pending: bool, + /// The span captured at creation time, used to record attributes on connection close. + span: Span, } impl Multipart { @@ -73,6 +79,55 @@ impl Multipart { is_first_chunk: true, is_terminated: false, mode, + heartbeat_pending: false, + // Capture the current span so we can record attributes later + span: Span::current(), + } + } +} + +/// Reasons why a subscription ended +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum SubscriptionEndReason { + /// Subscription completed normally after receiving all data + Complete, + /// Server closed the connection gracefully + ServerClose, + /// Stream source ended (e.g., subgraph closed the connection) + StreamEnd, + /// Heartbeat could not be delivered - client likely disconnected + HeartbeatDeliveryFailed, + /// Client disconnected unexpectedly (after a message was sent) + ClientDisconnect, +} + +impl SubscriptionEndReason { + /// Returns the string representation of the end reason + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Complete => "complete", + Self::ServerClose => "server_close", + Self::StreamEnd => "stream_end", + Self::HeartbeatDeliveryFailed => "heartbeat_delivery_failed", + Self::ClientDisconnect => "client_disconnect", + } + } +} + +impl Drop for Multipart { + fn drop(&mut self) { + // Only handle subscription mode + if self.mode == ProtocolMode::Subscription && !self.is_terminated { + // Stream wasn't terminated properly - determine the reason + let reason = if self.heartbeat_pending { + // Heartbeat was the last thing sent - likely failed to deliver + SubscriptionEndReason::HeartbeatDeliveryFailed + } else { + // Connection closed after a message was sent + SubscriptionEndReason::ClientDisconnect + }; + self.span + .record("apollo.subscription.end_reason", reason.as_str()); } } } @@ -91,6 +146,10 @@ impl Stream for Multipart { Poll::Ready(message) => match message { Some(MessageKind::Heartbeat) => { // It's the ticker for heartbeat for subscription + // Mark that we're sending a heartbeat - if the stream is dropped before + // the next poll, we know the heartbeat delivery likely failed + self.heartbeat_pending = true; + let buf = if self.is_first_chunk { self.is_first_chunk = false; Bytes::from_static( @@ -105,6 +164,9 @@ impl Stream for Multipart { Poll::Ready(Some(Ok(buf))) } Some(MessageKind::Message(mut response)) => { + // Clear heartbeat pending flag since we received a message poll + self.heartbeat_pending = false; + let is_still_open = response.has_next.unwrap_or(false) || response.subscribed.unwrap_or(false); let mut buf = if self.is_first_chunk { @@ -126,6 +188,10 @@ impl Stream for Multipart { && response.extensions.is_empty() { self.is_terminated = true; + self.span.record( + "apollo.subscription.end_reason", + SubscriptionEndReason::ServerClose.as_str(), + ); return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); } @@ -159,13 +225,20 @@ impl Stream for Multipart { buf.extend_from_slice(b"\r\n--graphql"); } else { self.is_terminated = true; + if self.mode == ProtocolMode::Subscription { + self.span.record( + "apollo.subscription.end_reason", + SubscriptionEndReason::Complete.as_str(), + ); + } buf.extend_from_slice(b"\r\n--graphql--\r\n"); } Poll::Ready(Some(Ok(buf.into()))) } Some(MessageKind::Eof) => { - // If the stream ends or is empty + // If the stream ends or is empty - this is a clean termination + self.heartbeat_pending = false; let buf = if self.is_first_chunk { self.is_first_chunk = false; Bytes::from_static( @@ -177,11 +250,21 @@ impl Stream for Multipart { ) }; self.is_terminated = true; + self.span.record( + "apollo.subscription.end_reason", + SubscriptionEndReason::StreamEnd.as_str(), + ); Poll::Ready(Some(Ok(buf))) } None => { + // Stream ended - this is a clean termination + self.heartbeat_pending = false; self.is_terminated = true; + self.span.record( + "apollo.subscription.end_reason", + SubscriptionEndReason::StreamEnd.as_str(), + ); Poll::Ready(None) } }, @@ -192,11 +275,249 @@ impl Stream for Multipart { #[cfg(test)] mod tests { + use std::sync::Arc; + use futures::stream; use serde_json_bytes::ByteString; + use tracing::Id; + use tracing::Subscriber; + use tracing::field::Visit; + use tracing::span::Record; + use tracing_subscriber::Layer; + use tracing_subscriber::layer::Context; + use tracing_subscriber::layer::SubscriberExt; use super::*; + /// A test layer that captures the recorded `apollo.subscription.end_reason` value + #[derive(Clone)] + struct EndReasonCapture { + captured_reason: Arc>>, + } + + struct EndReasonVisitor<'a> { + captured_reason: &'a Arc>>, + } + + impl Visit for EndReasonVisitor<'_> { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "apollo.subscription.end_reason" { + *self.captured_reason.lock().unwrap() = Some(value.to_string()); + } + } + + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {} + } + + impl Layer for EndReasonCapture { + fn on_record(&self, _span: &Id, values: &Record<'_>, _ctx: Context<'_, S>) { + let mut visitor = EndReasonVisitor { + captured_reason: &self.captured_reason, + }; + values.record(&mut visitor); + } + } + + /// Helper to run a test with a span that has the end_reason field + fn setup_tracing() -> (tracing::subscriber::DefaultGuard, EndReasonCapture) { + let layer = EndReasonCapture { + captured_reason: Arc::new(std::sync::Mutex::new(None)), + }; + let subscriber = tracing_subscriber::registry().with(layer.clone()); + let guard = tracing::subscriber::set_default(subscriber); + (guard, layer) + } + + #[tokio::test] + async fn test_end_reason_complete() { + // Test: Subscription completes normally with subscribed=false + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("final"))) + .subscribed(false) // This marks completion + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + + let reason = layer.captured_reason.lock().unwrap().clone(); + + assert_eq!( + reason, + Some(SubscriptionEndReason::Complete.as_str().to_string()) + ); + } + + #[tokio::test] + async fn test_end_reason_server_close() { + // Test: Server closes connection gracefully (empty response) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Empty response signals server-side graceful close + graphql::Response::builder().build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(SubscriptionEndReason::ServerClose.as_str().to_string()) + ); + } + + #[tokio::test] + async fn test_end_reason_stream_end() { + // Test: Stream ends via EOF (empty stream) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages (will get EOF) + while protocol.next().await.is_some() {} + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(SubscriptionEndReason::StreamEnd.as_str().to_string()) + ); + } + + #[tokio::test] + async fn test_end_reason_heartbeat_delivery_failed() { + // Test: Stream dropped while heartbeat was pending + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + use tokio::time::sleep; + + let (tx, rx) = tokio::sync::mpsc::channel::(1); + let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Spawn a task that never sends anything, then drops the sender + tokio::spawn(async move { + sleep(std::time::Duration::from_millis(100)).await; + drop(tx); + }); + + // Wait for a heartbeat to be sent + let heartbeat = "\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"; + while let Some(resp) = protocol.next().await { + let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); + if res == heartbeat || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") { + // Got a heartbeat, now drop the protocol while heartbeat is pending + assert!(protocol.heartbeat_pending); + break; + } + } + // Protocol is dropped here with heartbeat_pending = true + drop(protocol); + let reason = layer.captured_reason.lock().unwrap().clone(); + + assert_eq!( + reason, + Some( + SubscriptionEndReason::HeartbeatDeliveryFailed + .as_str() + .to_string() + ) + ); + } + + #[tokio::test] + async fn test_end_reason_client_disconnect() { + // Test: Stream dropped after a message (not heartbeat) was sent + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Get the first message + let resp = protocol.next().await; + assert!(resp.is_some()); + + // Verify heartbeat_pending is false (we got a message, not heartbeat) + assert!(!protocol.heartbeat_pending); + + // Protocol is dropped here without being terminated + // and heartbeat_pending = false + drop(protocol); + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(SubscriptionEndReason::ClientDisconnect.as_str().to_string()) + ); + } + + #[tokio::test] + async fn test_end_reason_not_set_for_defer_mode() { + // Test: Defer mode should not record any end reason + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .has_next(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + + let reason = layer.captured_reason.lock().unwrap().clone(); + // Defer mode doesn't set end_reason + assert_eq!(reason, None); + } + #[tokio::test] async fn test_heartbeat_and_boundaries() { let responses = vec![ @@ -306,4 +627,69 @@ mod tests { } } } + + #[tokio::test] + async fn test_heartbeat_pending_flag() { + use tokio::time::sleep; + + // Create a subscription stream that will have a delay to allow heartbeats + let (tx, rx) = tokio::sync::mpsc::channel::(1); + let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); + + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + let heartbeat = + String::from("\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"); + + // Spawn a task to send a response after a delay (longer than heartbeat interval) + tokio::spawn(async move { + // Wait longer than the test heartbeat interval (10ms) + sleep(std::time::Duration::from_millis(30)).await; + let _ = tx + .send( + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + String::from("test"), + ))) + .subscribed(false) + .build(), + ) + .await; + }); + + // Read items from the stream + let mut got_heartbeat = false; + let mut got_message = false; + while let Some(resp) = protocol.next().await { + let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); + if res == heartbeat || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") { + // After receiving a heartbeat, heartbeat_pending should be true + assert!( + protocol.heartbeat_pending, + "heartbeat_pending should be true after yielding heartbeat" + ); + got_heartbeat = true; + } else if res.contains("\"test\"") { + // After receiving a message, heartbeat_pending should be false + assert!( + !protocol.heartbeat_pending, + "heartbeat_pending should be false after receiving message" + ); + got_message = true; + break; + } + } + assert!(got_heartbeat, "should have received at least one heartbeat"); + assert!(got_message, "should have received the test message"); + } + + #[test] + fn test_defer_mode_no_drop_logging() { + // Defer mode should not trigger any logging on drop + // This test just verifies it doesn't panic + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + drop(protocol); + // No panic = success (defer mode doesn't log on drop) + } } From fa9c6532382fadc3b4a4dcf264178e0146ccb144 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:49:43 +0000 Subject: [PATCH 02/15] Add docs --- .../subscriptions/configuration.mdx | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 7ab898608f..9a66dd4bb1 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -454,3 +454,31 @@ subscription: ``` If a client attempts to execute a subscription on your router when it's already at `max_open_subscriptions`, the router rejects the client's request with an error. + +### Subscription end reason telemetry + +The router records an `apollo.subscription.end_reason` attribute on the `router` span for subscription operations. This attribute indicates why a subscription ended, which is useful for debugging and monitoring subscription lifecycle in your observability tools. + +The possible values for `apollo.subscription.end_reason` are: + +| Value | Description | +|-------|-------------| +| `complete` | The subscription completed normally. The subgraph sent a final response indicating the subscription has completed. | +| `server_close` | The server (subgraph) closed the connection gracefully. This occurs when the subgraph sends an empty response to signal intentional closure. | +| `stream_end` | The underlying data stream ended. This typically happens when the subgraph closes the WebSocket connection or the stream source terminates. | +| `heartbeat_delivery_failed` | A heartbeat message could not be delivered to the client. This usually indicates the client disconnected unexpectedly while the router was sending a keepalive heartbeat. | +| `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | + + + +The `apollo.subscription.end_reason` attribute only appears on subscription operations using the [HTTP multipart protocol](/router/executing-operations/subscription-multipart-protocol/). WebSocket passthrough subscriptions to subgraphs have their own connection lifecycle managed by the WebSocket protocol. + + + +#### Example: Monitoring subscription health + +You can use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: + +- A high rate of `heartbeat_delivery_failed` or `client_disconnect` may indicate network instability between clients and your router. +- Frequent `stream_end` values without corresponding `complete` values may indicate issues with your subgraph subscription sources. +- The `complete` value indicates healthy subscription lifecycles where the subgraph properly signals completion. From bbcb1c1bbbe47f0d7db5f334cda357384b715b4d Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Tue, 3 Feb 2026 17:44:11 +0000 Subject: [PATCH 03/15] Remove "complete" reason --- apollo-router/src/protocols/multipart.rs | 29 ++++++++----------- .../subscriptions/configuration.mdx | 8 ++--- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 3fe0ba0e97..932f429a74 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -89,8 +89,6 @@ impl Multipart { /// Reasons why a subscription ended #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriptionEndReason { - /// Subscription completed normally after receiving all data - Complete, /// Server closed the connection gracefully ServerClose, /// Stream source ended (e.g., subgraph closed the connection) @@ -105,7 +103,6 @@ impl SubscriptionEndReason { /// Returns the string representation of the end reason pub(crate) fn as_str(&self) -> &'static str { match self { - Self::Complete => "complete", Self::ServerClose => "server_close", Self::StreamEnd => "stream_end", Self::HeartbeatDeliveryFailed => "heartbeat_delivery_failed", @@ -228,7 +225,7 @@ impl Stream for Multipart { if self.mode == ProtocolMode::Subscription { self.span.record( "apollo.subscription.end_reason", - SubscriptionEndReason::Complete.as_str(), + SubscriptionEndReason::ServerClose.as_str(), ); } buf.extend_from_slice(b"\r\n--graphql--\r\n"); @@ -329,8 +326,8 @@ mod tests { } #[tokio::test] - async fn test_end_reason_complete() { - // Test: Subscription completes normally with subscribed=false + async fn test_end_reason_server_close_empty_response() { + // Test: Server closes connection gracefully (empty response) let (_guard, layer) = setup_tracing(); let span = tracing::info_span!( "test_span", @@ -342,28 +339,24 @@ mod tests { .data(serde_json_bytes::Value::String(ByteString::from("data"))) .subscribed(true) .build(), - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("final"))) - .subscribed(false) // This marks completion - .build(), + // Empty response signals server-side graceful close + graphql::Response::builder().build(), ]; let gql_responses = stream::iter(responses); let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); // Consume all messages while protocol.next().await.is_some() {} - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( reason, - Some(SubscriptionEndReason::Complete.as_str().to_string()) + Some(SubscriptionEndReason::ServerClose.as_str().to_string()) ); } #[tokio::test] - async fn test_end_reason_server_close() { - // Test: Server closes connection gracefully (empty response) + async fn test_end_reason_server_close_with_final_data() { + // Test: Server closes normally with final data (subscribed=false, no errors) let (_guard, layer) = setup_tracing(); let span = tracing::info_span!( "test_span", @@ -375,8 +368,10 @@ mod tests { .data(serde_json_bytes::Value::String(ByteString::from("data"))) .subscribed(true) .build(), - // Empty response signals server-side graceful close - graphql::Response::builder().build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("final"))) + .subscribed(false) // Graceful close with final data + .build(), ]; let gql_responses = stream::iter(responses); let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 9a66dd4bb1..9eec8334fe 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -463,8 +463,7 @@ The possible values for `apollo.subscription.end_reason` are: | Value | Description | |-------|-------------| -| `complete` | The subscription completed normally. The subgraph sent a final response indicating the subscription has completed. | -| `server_close` | The server (subgraph) closed the connection gracefully. This occurs when the subgraph sends an empty response to signal intentional closure. | +| `server_close` | The server (subgraph) closed the connection gracefully. This occurs when the subgraph ends the subscription. | | `stream_end` | The underlying data stream ended. This typically happens when the subgraph closes the WebSocket connection or the stream source terminates. | | `heartbeat_delivery_failed` | A heartbeat message could not be delivered to the client. This usually indicates the client disconnected unexpectedly while the router was sending a keepalive heartbeat. | | `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | @@ -479,6 +478,5 @@ The `apollo.subscription.end_reason` attribute only appears on subscription oper You can use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: -- A high rate of `heartbeat_delivery_failed` or `client_disconnect` may indicate network instability between clients and your router. -- Frequent `stream_end` values without corresponding `complete` values may indicate issues with your subgraph subscription sources. -- The `complete` value indicates healthy subscription lifecycles where the subgraph properly signals completion. +- Frequent `stream_end` values may indicate issues with your subgraph subscription sources closing unexpectedly. +- The `server_close` value indicates healthy subscription lifecycles where the subgraph properly signals closure. From b9ce87b39cea1f7d2cedfeb4f671734420e32f09 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:16:40 +0000 Subject: [PATCH 04/15] Add end reasons for schema reloads and config reloads --- .../src/plugins/subscription/execution.rs | 4 +- apollo-router/src/plugins/subscription/mod.rs | 2 + apollo-router/src/protocols/multipart.rs | 111 +++++++++++++++++- .../subscriptions/configuration.mdx | 8 +- 4 files changed, 112 insertions(+), 13 deletions(-) diff --git a/apollo-router/src/plugins/subscription/execution.rs b/apollo-router/src/plugins/subscription/execution.rs index 45594e7ee9..856b06025a 100644 --- a/apollo-router/src/plugins/subscription/execution.rs +++ b/apollo-router/src/plugins/subscription/execution.rs @@ -35,8 +35,8 @@ use crate::services::execution; use crate::services::execution::QueryPlan; use crate::services::subgraph::BoxGqlStream; -const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD"; -const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD"; +pub(crate) const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD"; +pub(crate) const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD"; const SUBSCRIPTION_JWT_EXPIRED_EXTENSION_CODE: &str = "SUBSCRIPTION_JWT_EXPIRED"; const SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE: &str = "SUBSCRIPTION_EXECUTION_ERROR"; diff --git a/apollo-router/src/plugins/subscription/mod.rs b/apollo-router/src/plugins/subscription/mod.rs index 7ccecf1799..81160f877b 100644 --- a/apollo-router/src/plugins/subscription/mod.rs +++ b/apollo-router/src/plugins/subscription/mod.rs @@ -36,6 +36,8 @@ pub(crate) mod notification; pub(crate) mod subgraph; pub(crate) use callback::SUBSCRIPTION_CALLBACK_HMAC_KEY; +pub(crate) use execution::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE; +pub(crate) use execution::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE; pub(crate) use execution::SubscriptionExecutionLayer; pub(crate) use execution::SubscriptionTaskParams; pub(crate) use fetch::fetch_service_handle_subscription; diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 932f429a74..e81e968b0e 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -13,7 +13,9 @@ use tokio_stream::wrappers::IntervalStream; use tracing::Span; use crate::graphql; +use crate::plugins::subscription::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE; use crate::plugins::subscription::SUBSCRIPTION_ERROR_EXTENSION_KEY; +use crate::plugins::subscription::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE; #[cfg(test)] const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(10); @@ -84,6 +86,22 @@ impl Multipart { span: Span::current(), } } + + /// Checks if the errors indicate a reload-related termination and returns the appropriate end reason + fn detect_reload_end_reason(errors: &[graphql::Error]) -> SubscriptionEndReason { + for error in errors { + match error.extensions.get("code").and_then(|v| v.as_str()) { + Some(code) if code == SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE => { + return SubscriptionEndReason::SchemaReload; + } + Some(code) if code == SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE => { + return SubscriptionEndReason::ConfigReload; + } + _ => {} + } + } + SubscriptionEndReason::ServerClose + } } /// Reasons why a subscription ended @@ -97,16 +115,21 @@ pub(crate) enum SubscriptionEndReason { HeartbeatDeliveryFailed, /// Client disconnected unexpectedly (after a message was sent) ClientDisconnect, + /// Subscription terminated due to router schema reload + SchemaReload, + /// Subscription terminated due to router configuration reload + ConfigReload, } impl SubscriptionEndReason { - /// Returns the string representation of the end reason pub(crate) fn as_str(&self) -> &'static str { match self { Self::ServerClose => "server_close", Self::StreamEnd => "stream_end", Self::HeartbeatDeliveryFailed => "heartbeat_delivery_failed", Self::ClientDisconnect => "client_disconnect", + Self::SchemaReload => "schema_reload", + Self::ConfigReload => "config_reload", } } } @@ -166,6 +189,10 @@ impl Stream for Multipart { let is_still_open = response.has_next.unwrap_or(false) || response.subscribed.unwrap_or(false); + + // Check for reload-related termination before errors are moved + let end_reason = Self::detect_reload_end_reason(&response.errors); + let mut buf = if self.is_first_chunk { self.is_first_chunk = false; Vec::from(&b"\r\n--graphql\r\ncontent-type: application/json\r\n\r\n"[..]) @@ -223,10 +250,8 @@ impl Stream for Multipart { } else { self.is_terminated = true; if self.mode == ProtocolMode::Subscription { - self.span.record( - "apollo.subscription.end_reason", - SubscriptionEndReason::ServerClose.as_str(), - ); + self.span + .record("apollo.subscription.end_reason", end_reason.as_str()); } buf.extend_from_slice(b"\r\n--graphql--\r\n"); } @@ -487,6 +512,82 @@ mod tests { ); } + #[tokio::test] + async fn test_end_reason_schema_reload() { + // Test: Subscription terminated due to schema reload + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Schema reload error response + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a schema reload") + .extension_code("SUBSCRIPTION_SCHEMA_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(SubscriptionEndReason::SchemaReload.as_str().to_string()) + ); + } + + #[tokio::test] + async fn test_end_reason_config_reload() { + // Test: Subscription terminated due to config reload + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!( + "test_span", + "apollo.subscription.end_reason" = tracing::field::Empty + ); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Config reload error response + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a configuration reload") + .extension_code("SUBSCRIPTION_CONFIG_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(SubscriptionEndReason::ConfigReload.as_str().to_string()) + ); + } + #[tokio::test] async fn test_end_reason_not_set_for_defer_mode() { // Test: Defer mode should not record any end reason diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 9eec8334fe..29b661cbcc 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -467,12 +467,8 @@ The possible values for `apollo.subscription.end_reason` are: | `stream_end` | The underlying data stream ended. This typically happens when the subgraph closes the WebSocket connection or the stream source terminates. | | `heartbeat_delivery_failed` | A heartbeat message could not be delivered to the client. This usually indicates the client disconnected unexpectedly while the router was sending a keepalive heartbeat. | | `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | - - - -The `apollo.subscription.end_reason` attribute only appears on subscription operations using the [HTTP multipart protocol](/router/executing-operations/subscription-multipart-protocol/). WebSocket passthrough subscriptions to subgraphs have their own connection lifecycle managed by the WebSocket protocol. - - +| `schema_reload` | The subscription was terminated because the router's supergraph schema was updated. Clients can reconnect to resume the subscription. | +| `config_reload` | The subscription was terminated because the router's configuration was updated. Clients can reconnect to resume the subscription. | #### Example: Monitoring subscription health From d3bb6a4e062a661c797b41c06f0a78845eb7fdf8 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Fri, 6 Feb 2026 17:56:07 +0000 Subject: [PATCH 05/15] Add attribute for defer.end_reason --- .../src/plugins/telemetry/span_factory.rs | 2 - apollo-router/src/protocols/multipart.rs | 440 +++++++++++++----- 2 files changed, 323 insertions(+), 119 deletions(-) diff --git a/apollo-router/src/plugins/telemetry/span_factory.rs b/apollo-router/src/plugins/telemetry/span_factory.rs index b7ecff75fd..cecacde346 100644 --- a/apollo-router/src/plugins/telemetry/span_factory.rs +++ b/apollo-router/src/plugins/telemetry/span_factory.rs @@ -94,7 +94,6 @@ impl SpanMode { "apollo_private.duration_ns" = ::tracing::field::Empty, "apollo_private.http.request_headers" = ::tracing::field::Empty, "apollo_private.http.response_headers" = ::tracing::field::Empty, - "apollo.subscription.end_reason" = ::tracing::field::Empty, ); span } @@ -111,7 +110,6 @@ impl SpanMode { "apollo_private.http.request_headers" = ::tracing::field::Empty, "apollo_private.http.response_headers" = ::tracing::field::Empty, "apollo_private.request" = true, - "apollo.subscription.end_reason" = ::tracing::field::Empty, ) } } diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index e81e968b0e..a6e7bec508 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -16,12 +16,18 @@ use crate::graphql; use crate::plugins::subscription::SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE; use crate::plugins::subscription::SUBSCRIPTION_ERROR_EXTENSION_KEY; use crate::plugins::subscription::SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE; +use crate::plugins::telemetry::dynamic_attribute::SpanDynAttribute; #[cfg(test)] const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(10); #[cfg(not(test))] const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +const SUBSCRIPTION_END_REASON_KEY: opentelemetry::Key = + opentelemetry::Key::from_static_str("apollo.subscription.end_reason"); +const DEFER_END_REASON_KEY: opentelemetry::Key = + opentelemetry::Key::from_static_str("apollo.defer.end_reason"); + #[derive(thiserror::Error, Debug)] pub(crate) enum Error { #[error("serialization error")] @@ -132,22 +138,58 @@ impl SubscriptionEndReason { Self::ConfigReload => "config_reload", } } + + pub(crate) fn as_value(&self) -> opentelemetry::Value { + opentelemetry::Value::String(self.as_str().into()) + } +} + +/// Reasons why a defer request ended +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DeferEndReason { + /// All deferred chunks were delivered successfully + Completed, + /// Client disconnected before all deferred data was delivered + ClientDisconnect, +} + +impl DeferEndReason { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Completed => "completed", + Self::ClientDisconnect => "client_disconnect", + } + } + + pub(crate) fn as_value(&self) -> opentelemetry::Value { + opentelemetry::Value::String(self.as_str().into()) + } } impl Drop for Multipart { fn drop(&mut self) { - // Only handle subscription mode - if self.mode == ProtocolMode::Subscription && !self.is_terminated { - // Stream wasn't terminated properly - determine the reason - let reason = if self.heartbeat_pending { - // Heartbeat was the last thing sent - likely failed to deliver - SubscriptionEndReason::HeartbeatDeliveryFailed - } else { - // Connection closed after a message was sent - SubscriptionEndReason::ClientDisconnect - }; - self.span - .record("apollo.subscription.end_reason", reason.as_str()); + if !self.is_terminated { + match self.mode { + ProtocolMode::Subscription => { + // Stream wasn't terminated properly - determine the reason + let reason = if self.heartbeat_pending { + // Heartbeat was the last thing sent - likely failed to deliver + SubscriptionEndReason::HeartbeatDeliveryFailed + } else { + // Connection closed after a message was sent + SubscriptionEndReason::ClientDisconnect + }; + self.span + .set_span_dyn_attribute(SUBSCRIPTION_END_REASON_KEY, reason.as_value()); + } + ProtocolMode::Defer => { + // Defer stream wasn't terminated properly - client disconnected + self.span.set_span_dyn_attribute( + DEFER_END_REASON_KEY, + DeferEndReason::ClientDisconnect.as_value(), + ); + } + } } } } @@ -212,9 +254,9 @@ impl Stream for Multipart { && response.extensions.is_empty() { self.is_terminated = true; - self.span.record( - "apollo.subscription.end_reason", - SubscriptionEndReason::ServerClose.as_str(), + self.span.set_span_dyn_attribute( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value(), ); return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); } @@ -249,9 +291,19 @@ impl Stream for Multipart { buf.extend_from_slice(b"\r\n--graphql"); } else { self.is_terminated = true; - if self.mode == ProtocolMode::Subscription { - self.span - .record("apollo.subscription.end_reason", end_reason.as_str()); + match self.mode { + ProtocolMode::Subscription => { + self.span.set_span_dyn_attribute( + SUBSCRIPTION_END_REASON_KEY, + end_reason.as_value(), + ); + } + ProtocolMode::Defer => { + self.span.set_span_dyn_attribute( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value(), + ); + } } buf.extend_from_slice(b"\r\n--graphql--\r\n"); } @@ -272,10 +324,12 @@ impl Stream for Multipart { ) }; self.is_terminated = true; - self.span.record( - "apollo.subscription.end_reason", - SubscriptionEndReason::StreamEnd.as_str(), - ); + if self.mode == ProtocolMode::Subscription { + self.span.set_span_dyn_attribute( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::StreamEnd.as_value(), + ); + } Poll::Ready(Some(Ok(buf))) } @@ -283,10 +337,20 @@ impl Stream for Multipart { // Stream ended - this is a clean termination self.heartbeat_pending = false; self.is_terminated = true; - self.span.record( - "apollo.subscription.end_reason", - SubscriptionEndReason::StreamEnd.as_str(), - ); + match self.mode { + ProtocolMode::Subscription => { + self.span.set_span_dyn_attribute( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::StreamEnd.as_value(), + ); + } + ProtocolMode::Defer => { + self.span.set_span_dyn_attribute( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value(), + ); + } + } Poll::Ready(None) } }, @@ -298,67 +362,62 @@ impl Stream for Multipart { #[cfg(test)] mod tests { use std::sync::Arc; + use std::sync::Mutex; use futures::stream; + use opentelemetry::KeyValue; use serde_json_bytes::ByteString; - use tracing::Id; - use tracing::Subscriber; - use tracing::field::Visit; - use tracing::span::Record; use tracing_subscriber::Layer; use tracing_subscriber::layer::Context; use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::registry::LookupSpan; use super::*; + use crate::plugins::telemetry::dynamic_attribute::DynAttributeLayer; + use crate::plugins::telemetry::otel; + use crate::plugins::telemetry::otel::OtelData; - /// A test layer that captures the recorded `apollo.subscription.end_reason` value - #[derive(Clone)] + #[derive(Clone, Default)] struct EndReasonCapture { - captured_reason: Arc>>, - } - - struct EndReasonVisitor<'a> { - captured_reason: &'a Arc>>, + captured_reason: Arc>>, } - impl Visit for EndReasonVisitor<'_> { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - if field.name() == "apollo.subscription.end_reason" { - *self.captured_reason.lock().unwrap() = Some(value.to_string()); + impl Layer for EndReasonCapture + where + S: tracing_core::Subscriber + for<'lookup> LookupSpan<'lookup>, + { + fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(id) + && let Some(data) = span.extensions().get::() + && let Some(attributes) = data.builder.attributes.as_ref() + { + *self.captured_reason.lock().unwrap() = attributes.iter().find_map(|attr| { + let key = &attr.key; + (*key == SUBSCRIPTION_END_REASON_KEY || *key == DEFER_END_REASON_KEY) + .then(|| attr.clone()) + }); } } - - fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {} - } - - impl Layer for EndReasonCapture { - fn on_record(&self, _span: &Id, values: &Record<'_>, _ctx: Context<'_, S>) { - let mut visitor = EndReasonVisitor { - captured_reason: &self.captured_reason, - }; - values.record(&mut visitor); - } } - /// Helper to run a test with a span that has the end_reason field + /// Helper to set up tracing with DynAttributeLayer and EndReasonCapture fn setup_tracing() -> (tracing::subscriber::DefaultGuard, EndReasonCapture) { - let layer = EndReasonCapture { - captured_reason: Arc::new(std::sync::Mutex::new(None)), - }; - let subscriber = tracing_subscriber::registry().with(layer.clone()); + let layer = EndReasonCapture::default(); + let subscriber = tracing_subscriber::Registry::default() + .with(DynAttributeLayer::new()) + .with(otel::layer().force_sampling()) + .with(layer.clone()); let guard = tracing::subscriber::set_default(subscriber); (guard, layer) } #[tokio::test] - async fn test_end_reason_server_close_empty_response() { + async fn test_subscription_end_reason_server_close_empty_response() { // Test: Server closes connection gracefully (empty response) let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); - let _span_guard = span.enter(); + let span = tracing::info_span!("test_span"); + let span_guard = span.enter(); + let responses = vec![ graphql::Response::builder() .data(serde_json_bytes::Value::String(ByteString::from("data"))) @@ -372,21 +431,26 @@ mod tests { // Consume all messages while protocol.next().await.is_some() {} + + drop(protocol); + drop(span_guard); + drop(span); + let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::ServerClose.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) ); } #[tokio::test] - async fn test_end_reason_server_close_with_final_data() { + async fn test_subscription_end_reason_server_close_with_final_data() { // Test: Server closes normally with final data (subscribed=false, no errors) let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses = vec![ graphql::Response::builder() @@ -403,10 +467,18 @@ mod tests { // Consume all messages while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::ServerClose.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) ); } @@ -414,10 +486,7 @@ mod tests { async fn test_end_reason_stream_end() { // Test: Stream ends via EOF (empty stream) let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses: Vec = vec![]; let gql_responses = stream::iter(responses); @@ -425,10 +494,18 @@ mod tests { // Consume all messages (will get EOF) while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::StreamEnd.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::StreamEnd.as_value() + )) ); } @@ -436,10 +513,7 @@ mod tests { async fn test_end_reason_heartbeat_delivery_failed() { // Test: Stream dropped while heartbeat was pending let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); use tokio::time::sleep; @@ -463,17 +537,19 @@ mod tests { break; } } + // Protocol is dropped here with heartbeat_pending = true drop(protocol); - let reason = layer.captured_reason.lock().unwrap().clone(); + drop(_span_guard); + drop(span); + let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some( - SubscriptionEndReason::HeartbeatDeliveryFailed - .as_str() - .to_string() - ) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::HeartbeatDeliveryFailed.as_value() + )) ); } @@ -481,10 +557,7 @@ mod tests { async fn test_end_reason_client_disconnect() { // Test: Stream dropped after a message (not heartbeat) was sent let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses = vec![ graphql::Response::builder() @@ -505,28 +578,29 @@ mod tests { // Protocol is dropped here without being terminated // and heartbeat_pending = false drop(protocol); + drop(_span_guard); + drop(span); + let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::ClientDisconnect.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ClientDisconnect.as_value() + )) ); } #[tokio::test] - async fn test_end_reason_schema_reload() { - // Test: Subscription terminated due to schema reload + async fn test_subscription_end_reason_schema_reload() { let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses = vec![ graphql::Response::builder() .data(serde_json_bytes::Value::String(ByteString::from("data"))) .subscribed(true) .build(), - // Schema reload error response graphql::Response::builder() .error( graphql::Error::builder() @@ -542,22 +616,24 @@ mod tests { // Consume all messages while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::SchemaReload.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SchemaReload.as_value() + )) ); } #[tokio::test] - async fn test_end_reason_config_reload() { - // Test: Subscription terminated due to config reload + async fn test_subscription_end_reason_config_reload() { let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty - ); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses = vec![ graphql::Response::builder() @@ -580,22 +656,63 @@ mod tests { // Consume all messages while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); let reason = layer.captured_reason.lock().unwrap().clone(); assert_eq!( reason, - Some(SubscriptionEndReason::ConfigReload.as_str().to_string()) + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ConfigReload.as_value() + )) ); } #[tokio::test] - async fn test_end_reason_not_set_for_defer_mode() { - // Test: Defer mode should not record any end reason + async fn test_defer_end_reason_completed() { + // Test: Defer completes normally with all chunks delivered (has_next=false) let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!( - "test_span", - "apollo.subscription.end_reason" = tracing::field::Empty + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("initial"))) + .has_next(true) + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + "deferred", + ))) + .has_next(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) ); + } + + #[tokio::test] + async fn test_defer_end_reason_completed_single_chunk() { + // Test: Defer completes with a single chunk (has_next=false) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); let responses = vec![ graphql::Response::builder() @@ -609,9 +726,87 @@ mod tests { // Consume all messages while protocol.next().await.is_some() {} - let reason = layer.captured_reason.lock().unwrap().clone(); - // Defer mode doesn't set end_reason - assert_eq!(reason, None); + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) + ); + } + + #[tokio::test] + async fn test_defer_end_reason_completed_empty_stream() { + // Test: Defer completes when the stream is empty (None case) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::Completed.as_value() + )) + ); + } + + #[tokio::test] + async fn test_defer_end_reason_client_disconnect() { + // Test: Client disconnects before all deferred data is delivered + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("initial"))) + .has_next(true) // More data expected + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from( + "deferred1", + ))) + .has_next(true) // Still more data expected + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Defer); + + // Read only the first chunk, then drop (simulating client disconnect) + let resp = protocol.next().await; + assert!(resp.is_some()); + + // Stream is NOT terminated (has_next was true) + assert!(!protocol.is_terminated); + + // Drop the protocol - simulates client disconnect + drop(protocol); + drop(_span_guard); + drop(span); + + let end_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + end_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::ClientDisconnect.as_value() + )) + ); } #[tokio::test] @@ -779,13 +974,24 @@ mod tests { } #[test] - fn test_defer_mode_no_drop_logging() { - // Defer mode should not trigger any logging on drop - // This test just verifies it doesn't panic + fn test_defer_mode_drop_records_client_disconnect() { + // Defer mode should record client_disconnect on drop if not terminated + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); let responses: Vec = vec![]; let gql_responses = stream::iter(responses); let protocol = Multipart::new(gql_responses, ProtocolMode::Defer); drop(protocol); - // No panic = success (defer mode doesn't log on drop) + drop(_span_guard); + drop(span); + let defer_reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + defer_reason, + Some(KeyValue::new( + DEFER_END_REASON_KEY, + DeferEndReason::ClientDisconnect.as_value() + )) + ); } } From 21ca6591047accc6dbf6709ab90dc3f4bd0689e8 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:09:13 +0000 Subject: [PATCH 06/15] Add changeset --- ...scription_observability_heartbeat_payload_errors.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md diff --git a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md new file mode 100644 index 0000000000..792aad298e --- /dev/null +++ b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md @@ -0,0 +1,10 @@ +### Add `apollo.subscription.end_reason` and `apollo.defer.end_reason` attributes to router spans ([PR #8858](https://github.com/apollographql/router/pull/8858)) + +Adds two new span attributes that indicate why a streaming response (subscription or defer) ended: + +- **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `stream_end`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`. +- **`apollo.defer.end_reason`**: Records the reason a deferred query ended. Possible values are `completed` (all deferred chunks were delivered successfully) and `client_disconnect` (the client disconnected before all deferred data was delivered). + +Both attributes are added dynamically to router spans only when relevant (i.e., only on requests that actually use subscriptions or `@defer`), rather than being present on every router span. + +By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8858 \ No newline at end of file From cfd39543e3e41cf459aef91b098205f3c49cdef2 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 11 Feb 2026 12:44:49 +0000 Subject: [PATCH 07/15] Store end reason in Multipart struct, set attribute in Drop impl --- apollo-router/src/protocols/multipart.rs | 108 +++++++++++------------ 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index a6e7bec508..a0f475203f 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -64,6 +64,9 @@ pub(crate) struct Multipart { heartbeat_pending: bool, /// The span captured at creation time, used to record attributes on connection close. span: Span, + /// The end reason determined during polling, written to the span on Drop. + /// If `None` when dropped and `!is_terminated`, an abnormal reason is inferred. + end_reason: Option, } impl Multipart { @@ -90,6 +93,7 @@ impl Multipart { heartbeat_pending: false, // Capture the current span so we can record attributes later span: Span::current(), + end_reason: None, } } @@ -110,6 +114,14 @@ impl Multipart { } } +/// Unified end reason for both subscription and defer modes, +/// stored in the Multipart struct and written to the span on Drop. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum EndReason { + Subscription(SubscriptionEndReason), + Defer(DeferEndReason), +} + /// Reasons why a subscription ended #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriptionEndReason { @@ -168,27 +180,34 @@ impl DeferEndReason { impl Drop for Multipart { fn drop(&mut self) { - if !self.is_terminated { - match self.mode { - ProtocolMode::Subscription => { - // Stream wasn't terminated properly - determine the reason - let reason = if self.heartbeat_pending { - // Heartbeat was the last thing sent - likely failed to deliver - SubscriptionEndReason::HeartbeatDeliveryFailed - } else { - // Connection closed after a message was sent - SubscriptionEndReason::ClientDisconnect - }; - self.span - .set_span_dyn_attribute(SUBSCRIPTION_END_REASON_KEY, reason.as_value()); - } - ProtocolMode::Defer => { - // Defer stream wasn't terminated properly - client disconnected - self.span.set_span_dyn_attribute( - DEFER_END_REASON_KEY, - DeferEndReason::ClientDisconnect.as_value(), - ); - } + // Determine the end reason: use the one recorded during polling if available, + // otherwise infer an abnormal termination reason. + let end_reason = self.end_reason.take().unwrap_or_else(|| match self.mode { + ProtocolMode::Subscription => { + // Stream wasn't terminated properly - determine the reason + let reason = if self.heartbeat_pending { + // Heartbeat was the last thing sent - likely failed to deliver + SubscriptionEndReason::HeartbeatDeliveryFailed + } else { + // Connection closed after a message was sent + SubscriptionEndReason::ClientDisconnect + }; + EndReason::Subscription(reason) + } + ProtocolMode::Defer => { + // Defer stream wasn't terminated properly - client disconnected + EndReason::Defer(DeferEndReason::ClientDisconnect) + } + }); + + match end_reason { + EndReason::Subscription(reason) => { + self.span + .set_span_dyn_attribute(SUBSCRIPTION_END_REASON_KEY, reason.as_value()); + } + EndReason::Defer(reason) => { + self.span + .set_span_dyn_attribute(DEFER_END_REASON_KEY, reason.as_value()); } } } @@ -254,10 +273,9 @@ impl Stream for Multipart { && response.extensions.is_empty() { self.is_terminated = true; - self.span.set_span_dyn_attribute( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::ServerClose.as_value(), - ); + self.end_reason = Some(EndReason::Subscription( + SubscriptionEndReason::ServerClose, + )); return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); } @@ -291,20 +309,10 @@ impl Stream for Multipart { buf.extend_from_slice(b"\r\n--graphql"); } else { self.is_terminated = true; - match self.mode { - ProtocolMode::Subscription => { - self.span.set_span_dyn_attribute( - SUBSCRIPTION_END_REASON_KEY, - end_reason.as_value(), - ); - } - ProtocolMode::Defer => { - self.span.set_span_dyn_attribute( - DEFER_END_REASON_KEY, - DeferEndReason::Completed.as_value(), - ); - } - } + self.end_reason = Some(match self.mode { + ProtocolMode::Subscription => EndReason::Subscription(end_reason), + ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), + }); buf.extend_from_slice(b"\r\n--graphql--\r\n"); } @@ -325,10 +333,8 @@ impl Stream for Multipart { }; self.is_terminated = true; if self.mode == ProtocolMode::Subscription { - self.span.set_span_dyn_attribute( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::StreamEnd.as_value(), - ); + self.end_reason = + Some(EndReason::Subscription(SubscriptionEndReason::StreamEnd)); } Poll::Ready(Some(Ok(buf))) @@ -337,20 +343,12 @@ impl Stream for Multipart { // Stream ended - this is a clean termination self.heartbeat_pending = false; self.is_terminated = true; - match self.mode { + self.end_reason = Some(match self.mode { ProtocolMode::Subscription => { - self.span.set_span_dyn_attribute( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::StreamEnd.as_value(), - ); + EndReason::Subscription(SubscriptionEndReason::StreamEnd) } - ProtocolMode::Defer => { - self.span.set_span_dyn_attribute( - DEFER_END_REASON_KEY, - DeferEndReason::Completed.as_value(), - ); - } - } + ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), + }); Poll::Ready(None) } }, From b5da3056fea55716388a80b87bbe423524dccf77 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 11 Feb 2026 17:34:02 +0000 Subject: [PATCH 08/15] Make usage of detect_reload_end_reason clearer --- apollo-router/src/protocols/multipart.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index a0f475203f..b9abe9c9a9 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -98,19 +98,19 @@ impl Multipart { } /// Checks if the errors indicate a reload-related termination and returns the appropriate end reason - fn detect_reload_end_reason(errors: &[graphql::Error]) -> SubscriptionEndReason { + fn detect_reload_end_reason(errors: &[graphql::Error]) -> Option { for error in errors { match error.extensions.get("code").and_then(|v| v.as_str()) { Some(code) if code == SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE => { - return SubscriptionEndReason::SchemaReload; + return Some(SubscriptionEndReason::SchemaReload); } Some(code) if code == SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE => { - return SubscriptionEndReason::ConfigReload; + return Some(SubscriptionEndReason::ConfigReload); } _ => {} } } - SubscriptionEndReason::ServerClose + None } } @@ -252,7 +252,7 @@ impl Stream for Multipart { response.has_next.unwrap_or(false) || response.subscribed.unwrap_or(false); // Check for reload-related termination before errors are moved - let end_reason = Self::detect_reload_end_reason(&response.errors); + let maybe_end_reason = Self::detect_reload_end_reason(&response.errors); let mut buf = if self.is_first_chunk { self.is_first_chunk = false; @@ -310,7 +310,9 @@ impl Stream for Multipart { } else { self.is_terminated = true; self.end_reason = Some(match self.mode { - ProtocolMode::Subscription => EndReason::Subscription(end_reason), + ProtocolMode::Subscription => EndReason::Subscription( + maybe_end_reason.unwrap_or(SubscriptionEndReason::ServerClose), + ), ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), }); buf.extend_from_slice(b"\r\n--graphql--\r\n"); From 6aadae35946e4eb2ff144307b6fa21c00257339d Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:24:01 +0000 Subject: [PATCH 09/15] Apply suggestions from code review Co-authored-by: Parker --- .../subscriptions/configuration.mdx | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 29b661cbcc..63611961b2 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -455,24 +455,24 @@ subscription: If a client attempts to execute a subscription on your router when it's already at `max_open_subscriptions`, the router rejects the client's request with an error. -### Subscription end reason telemetry +### Tracking causes of ending subscriptions -The router records an `apollo.subscription.end_reason` attribute on the `router` span for subscription operations. This attribute indicates why a subscription ended, which is useful for debugging and monitoring subscription lifecycle in your observability tools. +Apollo Router records an `apollo.subscription.end_reason` attribute on the `router` span for subscription operations. This attribute indicates why a subscription ended, which helps you debug and monitor subscription lifecycles using observability tools. -The possible values for `apollo.subscription.end_reason` are: +The `apollo.subscription.end_reason` attribute can have these values: | Value | Description | |-------|-------------| | `server_close` | The server (subgraph) closed the connection gracefully. This occurs when the subgraph ends the subscription. | -| `stream_end` | The underlying data stream ended. This typically happens when the subgraph closes the WebSocket connection or the stream source terminates. | -| `heartbeat_delivery_failed` | A heartbeat message could not be delivered to the client. This usually indicates the client disconnected unexpectedly while the router was sending a keepalive heartbeat. | +| `stream_end` | The underlying data stream ended. This typically occurs when the subgraph closes the WebSocket connection or the stream source terminates. | +| `heartbeat_delivery_failed` | Apollo Router failed to deliver a heartbeat message to the client. This usually indicates the client disconnected unexpectedly when a keepalive heartbeat was sent. | | `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | -| `schema_reload` | The subscription was terminated because the router's supergraph schema was updated. Clients can reconnect to resume the subscription. | -| `config_reload` | The subscription was terminated because the router's configuration was updated. Clients can reconnect to resume the subscription. | +| `schema_reload` | The subscription was terminated because Apollo Router's supergraph schema was updated. Clients can reconnect to resume the subscription. | +| `config_reload` | The subscription was terminated because Apollo Router's configuration was updated. Clients can reconnect to resume the subscription. | -#### Example: Monitoring subscription health +#### Example: Monitor subscription health -You can use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: +Use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: -- Frequent `stream_end` values may indicate issues with your subgraph subscription sources closing unexpectedly. +- Frequent `stream_end` values might indicate issues with your subgraph subscription sources closing unexpectedly. - The `server_close` value indicates healthy subscription lifecycles where the subgraph properly signals closure. From a0c301075022e6fa3583f8418767b111566939ed Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 12 Feb 2026 17:25:44 +0000 Subject: [PATCH 10/15] Change gracefully -> successfully in docs + comments --- apollo-router/src/protocols/multipart.rs | 8 ++++---- .../routing/operations/subscriptions/configuration.mdx | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index b9abe9c9a9..242d1a8656 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -125,7 +125,7 @@ enum EndReason { /// Reasons why a subscription ended #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriptionEndReason { - /// Server closed the connection gracefully + /// Server closed the connection successfully ServerClose, /// Stream source ended (e.g., subgraph closed the connection) StreamEnd, @@ -413,7 +413,7 @@ mod tests { #[tokio::test] async fn test_subscription_end_reason_server_close_empty_response() { - // Test: Server closes connection gracefully (empty response) + // Test: Server closes connection successfully (empty response) let (_guard, layer) = setup_tracing(); let span = tracing::info_span!("test_span"); let span_guard = span.enter(); @@ -423,7 +423,7 @@ mod tests { .data(serde_json_bytes::Value::String(ByteString::from("data"))) .subscribed(true) .build(), - // Empty response signals server-side graceful close + // Empty response signals server-side close graphql::Response::builder().build(), ]; let gql_responses = stream::iter(responses); @@ -459,7 +459,7 @@ mod tests { .build(), graphql::Response::builder() .data(serde_json_bytes::Value::String(ByteString::from("final"))) - .subscribed(false) // Graceful close with final data + .subscribed(false) // Server close with final data .build(), ]; let gql_responses = stream::iter(responses); diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 63611961b2..f7efef8877 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -463,7 +463,7 @@ The `apollo.subscription.end_reason` attribute can have these values: | Value | Description | |-------|-------------| -| `server_close` | The server (subgraph) closed the connection gracefully. This occurs when the subgraph ends the subscription. | +| `server_close` | The server (subgraph) closed the connection successfully. This occurs when the subgraph ends the subscription. | | `stream_end` | The underlying data stream ended. This typically occurs when the subgraph closes the WebSocket connection or the stream source terminates. | | `heartbeat_delivery_failed` | Apollo Router failed to deliver a heartbeat message to the client. This usually indicates the client disconnected unexpectedly when a keepalive heartbeat was sent. | | `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | From 4975ff9d71994e104d915a70de40ccf1adabcf03 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Wed, 18 Feb 2026 15:24:19 +0000 Subject: [PATCH 11/15] Add counters for subscription end reasons, rename stream_end to subgraph_error --- ..._observability_heartbeat_payload_errors.md | 17 +- .../src/plugins/subscription/fetch.rs | 7 + apollo-router/src/plugins/subscription/mod.rs | 2 + apollo-router/src/protocols/multipart.rs | 863 ++++++++++++------ apollo-router/src/services/router/service.rs | 25 +- .../subscriptions/configuration.mdx | 2 +- 6 files changed, 648 insertions(+), 268 deletions(-) diff --git a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md index 792aad298e..a9cbef1e88 100644 --- a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md +++ b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md @@ -1,10 +1,23 @@ -### Add `apollo.subscription.end_reason` and `apollo.defer.end_reason` attributes to router spans ([PR #8858](https://github.com/apollographql/router/pull/8858)) +### Add subscription and defer observability: end reason span attributes and termination metrics ([PR #8858](https://github.com/apollographql/router/pull/8858)) -Adds two new span attributes that indicate why a streaming response (subscription or defer) ended: +Adds new span attributes and metrics to improve observability of streaming responses. + +**Span attributes:** - **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `stream_end`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`. - **`apollo.defer.end_reason`**: Records the reason a deferred query ended. Possible values are `completed` (all deferred chunks were delivered successfully) and `client_disconnect` (the client disconnected before all deferred data was delivered). Both attributes are added dynamically to router spans only when relevant (i.e., only on requests that actually use subscriptions or `@defer`), rather than being present on every router span. +**Metrics:** + +The following counters are emitted when a subscription terminates: + +- **`apollo.router.operations.subscriptions.stream_end`** (attributes: `subgraph.service.name`): The subgraph gracefully closed the stream. +- **`apollo.router.operations.subscriptions.subgraph_error`** (attributes: `subgraph.service.name`): The subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop). +- **`apollo.router.operations.subscriptions.client_disconnect`** (attributes: `apollo.client.name`): The client disconnected before the subscription ended. +- **`apollo.router.operations.subscriptions.heartbeat_delivery_failed`** (attributes: `apollo.client.name`): A heartbeat could not be delivered to the client. +- **`apollo.router.operations.subscriptions.schema_reload`**: The subscription was terminated because the router schema was updated. +- **`apollo.router.operations.subscriptions.config_reload`**: The subscription was terminated because the router configuration was updated. + By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8858 \ No newline at end of file diff --git a/apollo-router/src/plugins/subscription/fetch.rs b/apollo-router/src/plugins/subscription/fetch.rs index 737cdfeafb..db86d1bb9a 100644 --- a/apollo-router/src/plugins/subscription/fetch.rs +++ b/apollo-router/src/plugins/subscription/fetch.rs @@ -14,6 +14,7 @@ use tracing::instrument::Instrumented; use crate::error::Error; use crate::http_ext; +use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY; use crate::plugins::subscription::SubscriptionTaskParams; use crate::query_planner::OperationKind; use crate::query_planner::SUBSCRIBE_SPAN_NAME; @@ -50,6 +51,12 @@ pub(crate) fn fetch_service_handle_subscription( let service_name = service_name.clone(); let fetch_time_offset = context.created_at.elapsed().as_nanos() as i64; + // Store the subgraph name in context so it's available for metrics at the router layer + let _ = context.insert( + SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY, + service_name.to_string(), + ); + // Subscriptions are not supported for connectors, so they always go to the subgraph service subscription_with_subgraph_service(schema, subgraph_service_factory, request).instrument( tracing::info_span!( diff --git a/apollo-router/src/plugins/subscription/mod.rs b/apollo-router/src/plugins/subscription/mod.rs index 81160f877b..5297c16eb3 100644 --- a/apollo-router/src/plugins/subscription/mod.rs +++ b/apollo-router/src/plugins/subscription/mod.rs @@ -47,6 +47,8 @@ pub(crate) const APOLLO_SUBSCRIPTION_PLUGIN_NAME: &str = "subscription"; pub(crate) const SUBSCRIPTION_ERROR_EXTENSION_KEY: &str = "apollo::subscriptions::fatal_error"; pub(crate) const SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS: &str = "apollo.subscription.custom_connection_params"; +pub(crate) const SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY: &str = + "apollo::subscription::subgraph_name"; #[derive(Debug, Clone)] pub(crate) struct Subscription { diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 242d1a8656..035c634704 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -63,10 +63,14 @@ pub(crate) struct Multipart { /// Used to detect if a heartbeat was the last thing sent before connection closed. heartbeat_pending: bool, /// The span captured at creation time, used to record attributes on connection close. - span: Span, + creation_span: Span, /// The end reason determined during polling, written to the span on Drop. /// If `None` when dropped and `!is_terminated`, an abnormal reason is inferred. end_reason: Option, + /// The subgraph name for subscription streams, used in stream_end metrics. + subgraph_name: Option, + /// The client name for subscription streams, used in client_disconnect and heartbeat_delivery_failed metrics. + client_name: Option, } impl Multipart { @@ -92,11 +96,25 @@ impl Multipart { mode, heartbeat_pending: false, // Capture the current span so we can record attributes later - span: Span::current(), + creation_span: Span::current(), end_reason: None, + subgraph_name: None, + client_name: None, } } + /// Set the subgraph name for stream_end metrics attribution. + pub(crate) fn with_subgraph_name(mut self, name: Option) -> Self { + self.subgraph_name = name; + self + } + + /// Set the client name for client_disconnect and heartbeat_delivery_failed metrics attribution. + pub(crate) fn with_client_name(mut self, name: Option) -> Self { + self.client_name = name; + self + } + /// Checks if the errors indicate a reload-related termination and returns the appropriate end reason fn detect_reload_end_reason(errors: &[graphql::Error]) -> Option { for error in errors { @@ -112,6 +130,27 @@ impl Multipart { } None } + + /// Infer the end reason for a subscription that was not terminated properly. + fn infer_abnormal_end_reason(&self) -> EndReason { + match self.mode { + ProtocolMode::Subscription => { + // Stream wasn't terminated properly + let reason = if self.heartbeat_pending { + // Heartbeat was the last thing sent - likely failed to deliver + SubscriptionEndReason::HeartbeatDeliveryFailed + } else { + // Connection closed after a message was sent + SubscriptionEndReason::ClientDisconnect + }; + EndReason::Subscription(reason) + } + ProtocolMode::Defer => { + // Defer stream wasn't terminated properly - client disconnected + EndReason::Defer(DeferEndReason::ClientDisconnect) + } + } + } } /// Unified end reason for both subscription and defer modes, @@ -125,10 +164,16 @@ enum EndReason { /// Reasons why a subscription ended #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriptionEndReason { - /// Server closed the connection successfully + /// The subgraph gracefully closed the subscription stream. This fires when + /// the terminating response is the "magic empty response" (no data, no errors, + /// no extensions) that `filter_stream` injects after the subgraph sends a + /// WebSocket `complete` message. ServerClose, - /// Stream source ended (e.g., subgraph closed the connection) - StreamEnd, + /// The subgraph closed the subscription stream unexpectedly (e.g. process + /// killed, network drop). This fires when the terminating response contains + /// errors (such as `WEBSOCKET_MESSAGE_ERROR` or `WEBSOCKET_CLOSE_ERROR`) + /// indicating the connection was lost rather than cleanly completed. + SubgraphError, /// Heartbeat could not be delivered - client likely disconnected HeartbeatDeliveryFailed, /// Client disconnected unexpectedly (after a message was sent) @@ -143,7 +188,7 @@ impl SubscriptionEndReason { pub(crate) fn as_str(&self) -> &'static str { match self { Self::ServerClose => "server_close", - Self::StreamEnd => "stream_end", + Self::SubgraphError => "subgraph_error", Self::HeartbeatDeliveryFailed => "heartbeat_delivery_failed", Self::ClientDisconnect => "client_disconnect", Self::SchemaReload => "schema_reload", @@ -182,37 +227,90 @@ impl Drop for Multipart { fn drop(&mut self) { // Determine the end reason: use the one recorded during polling if available, // otherwise infer an abnormal termination reason. - let end_reason = self.end_reason.take().unwrap_or_else(|| match self.mode { - ProtocolMode::Subscription => { - // Stream wasn't terminated properly - determine the reason - let reason = if self.heartbeat_pending { - // Heartbeat was the last thing sent - likely failed to deliver - SubscriptionEndReason::HeartbeatDeliveryFailed - } else { - // Connection closed after a message was sent - SubscriptionEndReason::ClientDisconnect - }; - EndReason::Subscription(reason) - } - ProtocolMode::Defer => { - // Defer stream wasn't terminated properly - client disconnected - EndReason::Defer(DeferEndReason::ClientDisconnect) - } - }); + let end_reason = self + .end_reason + .unwrap_or_else(|| self.infer_abnormal_end_reason()); match end_reason { EndReason::Subscription(reason) => { - self.span + self.creation_span .set_span_dyn_attribute(SUBSCRIPTION_END_REASON_KEY, reason.as_value()); + self.emit_subscription_termination_metric(reason); } EndReason::Defer(reason) => { - self.span + self.creation_span .set_span_dyn_attribute(DEFER_END_REASON_KEY, reason.as_value()); } } } } +impl Multipart { + /// Emit a counter metric for subscription termination events that require observability. + fn emit_subscription_termination_metric(&self, reason: SubscriptionEndReason) { + match reason { + SubscriptionEndReason::ServerClose => { + let subgraph_name = self + .subgraph_name + .as_deref() + .unwrap_or("unknown") + .to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.stream_end", + "Subscription terminated because the subgraph gracefully closed the stream", + 1, + subgraph.service.name = subgraph_name + ); + } + SubscriptionEndReason::SubgraphError => { + let subgraph_name = self + .subgraph_name + .as_deref() + .unwrap_or("unknown") + .to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + "Subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop)", + 1, + subgraph.service.name = subgraph_name + ); + } + SubscriptionEndReason::ClientDisconnect => { + let client_name = self.client_name.as_deref().unwrap_or("unknown").to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + "Subscription terminated because the client disconnected", + 1, + apollo.client.name = client_name + ); + } + SubscriptionEndReason::HeartbeatDeliveryFailed => { + let client_name = self.client_name.as_deref().unwrap_or("unknown").to_string(); + u64_counter!( + "apollo.router.operations.subscriptions.heartbeat_delivery_failed", + "Subscription terminated because a heartbeat could not be delivered to the client", + 1, + apollo.client.name = client_name + ); + } + SubscriptionEndReason::SchemaReload => { + u64_counter!( + "apollo.router.operations.subscriptions.schema_reload", + "Subscription terminated because the router schema was updated", + 1 + ); + } + SubscriptionEndReason::ConfigReload => { + u64_counter!( + "apollo.router.operations.subscriptions.config_reload", + "Subscription terminated because the router configuration was updated", + 1 + ); + } + } + } +} + impl Stream for Multipart { type Item = Result; @@ -261,6 +359,12 @@ impl Stream for Multipart { Vec::from(&b"\r\ncontent-type: application/json\r\n\r\n"[..]) }; + // Track whether this is an unexpected subgraph error (e.g. WebSocket + // WEBSOCKET_MESSAGE_ERROR or WEBSOCKET_CLOSE_ERROR from a killed process + // or network drop). This is set inside the Subscription branch below, + // before errors are consumed. + let mut has_subgraph_errors = false; + match self.mode { ProtocolMode::Subscription => { let is_transport_error = @@ -279,6 +383,12 @@ impl Stream for Multipart { return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); } + // Capture before errors are moved: if the response has errors + // and they're not from a router-initiated transport error + // (JWT/execution), these are from the subgraph (WebSocket layer). + has_subgraph_errors = + !response.errors.is_empty() && !is_transport_error; + let response = if is_transport_error { SubscriptionPayload { errors: std::mem::take(&mut response.errors), @@ -311,7 +421,11 @@ impl Stream for Multipart { self.is_terminated = true; self.end_reason = Some(match self.mode { ProtocolMode::Subscription => EndReason::Subscription( - maybe_end_reason.unwrap_or(SubscriptionEndReason::ServerClose), + maybe_end_reason.unwrap_or(if has_subgraph_errors { + SubscriptionEndReason::SubgraphError + } else { + SubscriptionEndReason::ServerClose + }), ), ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), }); @@ -336,7 +450,7 @@ impl Stream for Multipart { self.is_terminated = true; if self.mode == ProtocolMode::Subscription { self.end_reason = - Some(EndReason::Subscription(SubscriptionEndReason::StreamEnd)); + Some(EndReason::Subscription(SubscriptionEndReason::ServerClose)); } Poll::Ready(Some(Ok(buf))) @@ -347,7 +461,7 @@ impl Stream for Multipart { self.is_terminated = true; self.end_reason = Some(match self.mode { ProtocolMode::Subscription => { - EndReason::Subscription(SubscriptionEndReason::StreamEnd) + EndReason::Subscription(SubscriptionEndReason::ServerClose) } ProtocolMode::Defer => EndReason::Defer(DeferEndReason::Completed), }); @@ -373,6 +487,7 @@ mod tests { use tracing_subscriber::registry::LookupSpan; use super::*; + use crate::metrics::FutureMetricsExt; use crate::plugins::telemetry::dynamic_attribute::DynAttributeLayer; use crate::plugins::telemetry::otel; use crate::plugins::telemetry::otel::OtelData; @@ -413,261 +528,334 @@ mod tests { #[tokio::test] async fn test_subscription_end_reason_server_close_empty_response() { - // Test: Server closes connection successfully (empty response) - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let span_guard = span.enter(); - - let responses = vec![ - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("data"))) - .subscribed(true) - .build(), - // Empty response signals server-side close - graphql::Response::builder().build(), - ]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Consume all messages - while protocol.next().await.is_some() {} - - drop(protocol); - drop(span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::ServerClose.as_value() - )) - ); + async { + // Test: Server closes connection successfully (empty response) + // ServerClose also emits the stream_end metric since in production all + // server-side terminations go through filter_stream → ServerClose. + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let span_guard = span.enter(); + + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Empty response signals server-side close + graphql::Response::builder().build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.stream_end", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; } #[tokio::test] async fn test_subscription_end_reason_server_close_with_final_data() { - // Test: Server closes normally with final data (subscribed=false, no errors) - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - let responses = vec![ - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("data"))) - .subscribed(true) - .build(), - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("final"))) - .subscribed(false) // Server close with final data - .build(), - ]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Consume all messages - while protocol.next().await.is_some() {} - - drop(protocol); - drop(_span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::ServerClose.as_value() - )) - ); + async { + // Test: Server closes normally with final data (subscribed=false, no errors) + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("final"))) + .subscribed(false) // Server close with final data + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.stream_end", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; } #[tokio::test] - async fn test_end_reason_stream_end() { - // Test: Stream ends via EOF (empty stream) - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - let responses: Vec = vec![]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Consume all messages (will get EOF) - while protocol.next().await.is_some() {} - - drop(protocol); - drop(_span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::StreamEnd.as_value() - )) - ); + async fn test_end_reason_server_close_via_eof() { + async { + // Test: Stream ends via EOF (empty stream) — the Eof sentinel fires, + // which sets ServerClose (same as all other server-side termination paths). + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("test_subgraph".to_string())); + + // Consume all messages (will get EOF) + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ServerClose.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.stream_end", + 1, + "subgraph.service.name" = "test_subgraph" + ); + } + .with_metrics() + .await; } #[tokio::test] async fn test_end_reason_heartbeat_delivery_failed() { - // Test: Stream dropped while heartbeat was pending - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - use tokio::time::sleep; - - let (tx, rx) = tokio::sync::mpsc::channel::(1); - let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Spawn a task that never sends anything, then drops the sender - tokio::spawn(async move { - sleep(std::time::Duration::from_millis(100)).await; - drop(tx); - }); - - // Wait for a heartbeat to be sent - let heartbeat = "\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"; - while let Some(resp) = protocol.next().await { - let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); - if res == heartbeat || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") { - // Got a heartbeat, now drop the protocol while heartbeat is pending - assert!(protocol.heartbeat_pending); - break; + async { + // Test: Stream dropped while heartbeat was pending + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + use tokio::time::sleep; + + let (tx, rx) = tokio::sync::mpsc::channel::(1); + let gql_responses = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_client_name(Some("test_client".to_string())); + + // Spawn a task that never sends anything, then drops the sender + tokio::spawn(async move { + sleep(std::time::Duration::from_millis(100)).await; + drop(tx); + }); + + // Wait for a heartbeat to be sent + let heartbeat = + "\r\n--graphql\r\ncontent-type: application/json\r\n\r\n{}\r\n--graphql"; + while let Some(resp) = protocol.next().await { + let res = String::from_utf8(resp.unwrap().to_vec()).unwrap(); + if res == heartbeat + || res.starts_with("\r\ncontent-type: application/json\r\n\r\n{}") + { + // Got a heartbeat, now drop the protocol while heartbeat is pending + assert!(protocol.heartbeat_pending); + break; + } } - } - - // Protocol is dropped here with heartbeat_pending = true - drop(protocol); - drop(_span_guard); - drop(span); - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::HeartbeatDeliveryFailed.as_value() - )) - ); + // Protocol is dropped here with heartbeat_pending = true + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::HeartbeatDeliveryFailed.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.heartbeat_delivery_failed", + 1, + "apollo.client.name" = "test_client" + ); + } + .with_metrics() + .await; } #[tokio::test] async fn test_end_reason_client_disconnect() { - // Test: Stream dropped after a message (not heartbeat) was sent - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - let responses = vec![ - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("data"))) - .subscribed(true) - .build(), - ]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Get the first message - let resp = protocol.next().await; - assert!(resp.is_some()); - - // Verify heartbeat_pending is false (we got a message, not heartbeat) - assert!(!protocol.heartbeat_pending); - - // Protocol is dropped here without being terminated - // and heartbeat_pending = false - drop(protocol); - drop(_span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::ClientDisconnect.as_value() - )) - ); + async { + // Test: Stream dropped after a message (not heartbeat) was sent + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_client_name(Some("test_client".to_string())); + + // Get the first message + let resp = protocol.next().await; + assert!(resp.is_some()); + + // Verify heartbeat_pending is false (we got a message, not heartbeat) + assert!(!protocol.heartbeat_pending); + + // Protocol is dropped here without being terminated + // and heartbeat_pending = false + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ClientDisconnect.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + 1, + "apollo.client.name" = "test_client" + ); + } + .with_metrics() + .await; } #[tokio::test] async fn test_subscription_end_reason_schema_reload() { - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - let responses = vec![ - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("data"))) - .subscribed(true) - .build(), - graphql::Response::builder() - .error( - graphql::Error::builder() - .message("subscription has been closed due to a schema reload") - .extension_code("SUBSCRIPTION_SCHEMA_RELOAD") - .build(), - ) - .subscribed(false) - .build(), - ]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Consume all messages - while protocol.next().await.is_some() {} - drop(protocol); - drop(_span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::SchemaReload.as_value() - )) - ); + async { + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a schema reload") + .extension_code("SUBSCRIPTION_SCHEMA_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SchemaReload.as_value() + )) + ); + + assert_counter!("apollo.router.operations.subscriptions.schema_reload", 1); + } + .with_metrics() + .await; } #[tokio::test] async fn test_subscription_end_reason_config_reload() { - let (_guard, layer) = setup_tracing(); - let span = tracing::info_span!("test_span"); - let _span_guard = span.enter(); - let responses = vec![ - graphql::Response::builder() - .data(serde_json_bytes::Value::String(ByteString::from("data"))) - .subscribed(true) - .build(), - // Config reload error response - graphql::Response::builder() - .error( - graphql::Error::builder() - .message("subscription has been closed due to a configuration reload") - .extension_code("SUBSCRIPTION_CONFIG_RELOAD") - .build(), - ) - .subscribed(false) - .build(), - ]; - let gql_responses = stream::iter(responses); - let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); - - // Consume all messages - while protocol.next().await.is_some() {} - drop(protocol); - drop(_span_guard); - drop(span); - - let reason = layer.captured_reason.lock().unwrap().clone(); - assert_eq!( - reason, - Some(KeyValue::new( - SUBSCRIPTION_END_REASON_KEY, - SubscriptionEndReason::ConfigReload.as_value() - )) - ); + async { + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + // Config reload error response + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("subscription has been closed due to a configuration reload") + .extension_code("SUBSCRIPTION_CONFIG_RELOAD") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + // Consume all messages + while protocol.next().await.is_some() {} + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::ConfigReload.as_value() + )) + ); + + assert_counter!("apollo.router.operations.subscriptions.config_reload", 1); + } + .with_metrics() + .await; } #[tokio::test] @@ -994,4 +1182,163 @@ mod tests { )) ); } + + #[tokio::test] + async fn test_end_reason_subgraph_error() { + async { + // Test: Subscription terminated because the subgraph WebSocket connection + // was lost unexpectedly (e.g. process killed). The terminating response + // has errors (like WEBSOCKET_MESSAGE_ERROR) and subscribed=false. + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + + let responses = vec![ + graphql::Response::builder() + .error( + graphql::Error::builder() + .message("cannot read message from websocket") + .extension_code("WEBSOCKET_MESSAGE_ERROR") + .build(), + ) + .subscribed(false) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("flaky_subgraph".to_string())); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SubgraphError.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + 1, + "subgraph.service.name" = "flaky_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_end_reason_subgraph_error_with_close_code() { + async { + // Test: Subscription terminated because the subgraph WebSocket closed + // with an abnormal close code, producing a WEBSOCKET_CLOSE_ERROR. + let (_guard, layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + + let responses = vec![graphql::Response::builder() + .error( + graphql::Error::builder() + .message( + "websocket connection has been closed with error code '1011' and reason 'internal error'", + ) + .extension_code("WEBSOCKET_CLOSE_ERROR") + .build(), + ) + .subscribed(false) + .build()]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription) + .with_subgraph_name(Some("error_subgraph".to_string())); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + let reason = layer.captured_reason.lock().unwrap().clone(); + assert_eq!( + reason, + Some(KeyValue::new( + SUBSCRIPTION_END_REASON_KEY, + SubscriptionEndReason::SubgraphError.as_value() + )) + ); + + assert_counter!( + "apollo.router.operations.subscriptions.subgraph_error", + 1, + "subgraph.service.name" = "error_subgraph" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_stream_end_metric_defaults_to_unknown_subgraph() { + async { + // Test: stream_end metric uses "unknown" when no subgraph name is set + let (_guard, _layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses: Vec = vec![]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + while protocol.next().await.is_some() {} + + drop(protocol); + drop(_span_guard); + drop(span); + + assert_counter!( + "apollo.router.operations.subscriptions.stream_end", + 1, + "subgraph.service.name" = "unknown" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_client_disconnect_metric_defaults_to_unknown_client() { + async { + // Test: client_disconnect metric uses "unknown" when no client name is set + let (_guard, _layer) = setup_tracing(); + let span = tracing::info_span!("test_span"); + let _span_guard = span.enter(); + let responses = vec![ + graphql::Response::builder() + .data(serde_json_bytes::Value::String(ByteString::from("data"))) + .subscribed(true) + .build(), + ]; + let gql_responses = stream::iter(responses); + let mut protocol = Multipart::new(gql_responses, ProtocolMode::Subscription); + + let resp = protocol.next().await; + assert!(resp.is_some()); + + drop(protocol); + drop(_span_guard); + drop(span); + + assert_counter!( + "apollo.router.operations.subscriptions.client_disconnect", + 1, + "apollo.client.name" = "unknown" + ); + } + .with_metrics() + .await; + } } diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 7aa93e0bd8..6577f4aa36 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -51,6 +51,8 @@ use crate::layers::DEFAULT_BUFFER_SIZE; use crate::layers::ServiceBuilderExt; #[cfg(test)] use crate::plugin::test::MockSupergraphService; +use crate::plugins::subscription::SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY; +use crate::plugins::telemetry::CLIENT_NAME; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_BODY; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_HEADERS; use crate::plugins::telemetry::config_new::attributes::HTTP_REQUEST_URI; @@ -377,13 +379,22 @@ impl RouterService { ACCEL_BUFFERING_HEADER_VALUE.clone(), ); let response = match response.subscribed { - Some(true) => http::Response::from_parts( - parts, - router::body::from_result_stream(Multipart::new( - body, - ProtocolMode::Subscription, - )), - ), + Some(true) => { + let subgraph_name: Option = context + .get(SUBSCRIPTION_SUBGRAPH_NAME_CONTEXT_KEY) + .ok() + .flatten(); + let client_name: Option = + context.get(CLIENT_NAME).ok().flatten(); + http::Response::from_parts( + parts, + router::body::from_result_stream( + Multipart::new(body, ProtocolMode::Subscription) + .with_subgraph_name(subgraph_name) + .with_client_name(client_name), + ), + ) + } _ => http::Response::from_parts( parts, router::body::from_result_stream(Multipart::new( diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index f7efef8877..3c3a345912 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -464,7 +464,7 @@ The `apollo.subscription.end_reason` attribute can have these values: | Value | Description | |-------|-------------| | `server_close` | The server (subgraph) closed the connection successfully. This occurs when the subgraph ends the subscription. | -| `stream_end` | The underlying data stream ended. This typically occurs when the subgraph closes the WebSocket connection or the stream source terminates. | +| `subgraph_error` | The subgraph closed the subscription stream unexpectedly (e.g. process killed, network drop). | | `heartbeat_delivery_failed` | Apollo Router failed to deliver a heartbeat message to the client. This usually indicates the client disconnected unexpectedly when a keepalive heartbeat was sent. | | `client_disconnect` | The client disconnected after receiving subscription data (not during a heartbeat). This indicates the client closed the connection mid-subscription. | | `schema_reload` | The subscription was terminated because Apollo Router's supergraph schema was updated. Clients can reconnect to resume the subscription. | From 9a2fae0c1e5f815133cb5b4d0731cf3f36ebd362 Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 19 Feb 2026 12:18:54 +0000 Subject: [PATCH 12/15] Add metric for when a subscription request is rejected as the `max_opened_subscriptions` limit has been reached --- ..._observability_heartbeat_payload_errors.md | 4 + .../src/plugins/subscription/fetch.rs | 100 ++++++++++++++++++ .../standard-instruments.mdx | 1 + .../subscriptions/configuration.mdx | 4 +- 4 files changed, 108 insertions(+), 1 deletion(-) diff --git a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md index a9cbef1e88..d5e4bdc5e7 100644 --- a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md +++ b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md @@ -20,4 +20,8 @@ The following counters are emitted when a subscription terminates: - **`apollo.router.operations.subscriptions.schema_reload`**: The subscription was terminated because the router schema was updated. - **`apollo.router.operations.subscriptions.config_reload`**: The subscription was terminated because the router configuration was updated. +The following counter is emitted when a subscription request is rejected: + +- **`apollo.router.operations.subscriptions.rejected.limit`**: A new subscription request was rejected because the router has reached its `max_opened_subscriptions` limit. + By [@rohan-b99](https://github.com/rohan-b99) in https://github.com/apollographql/router/pull/8858 \ No newline at end of file diff --git a/apollo-router/src/plugins/subscription/fetch.rs b/apollo-router/src/plugins/subscription/fetch.rs index db86d1bb9a..d5858f7aea 100644 --- a/apollo-router/src/plugins/subscription/fetch.rs +++ b/apollo-router/src/plugins/subscription/fetch.rs @@ -98,6 +98,11 @@ fn subscription_with_subgraph_service( .and_then(|s| s.max_opened_subscriptions) && OPENED_SUBSCRIPTIONS.load(Ordering::Relaxed) >= max_opened_subscriptions { + u64_counter!( + "apollo.router.operations.subscriptions.rejected.limit", + "Number of subscription requests rejected because the maximum opened subscriptions limit was reached", + 1 + ); return Box::pin(async { Ok(( Value::default(), @@ -234,3 +239,98 @@ fn subscription_with_subgraph_service( Ok((Value::default(), response)) }) } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::Ordering; + + use crate::query_planner::OperationKind; + use apollo_federation::query_plan::serializable_document::SerializableDocument; + use serde_json_bytes::Value; + use tokio::sync::mpsc; + + use crate::Context; + use crate::json_ext::Path; + use crate::metrics::FutureMetricsExt; + use crate::plugins::subscription::SubscriptionConfig; + use crate::query_planner::fetch::Variables; + use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS; + use crate::query_planner::subscription::SubscriptionNode; + use crate::services::SubgraphServiceFactory; + use crate::services::fetch::SubscriptionRequest; + + use super::subscription_with_subgraph_service; + + #[tokio::test] + async fn test_subscription_limit_reached_emits_metric() { + async { + let original_count = OPENED_SUBSCRIPTIONS.swap(1, Ordering::Relaxed); + + let subscription_config = SubscriptionConfig { + max_opened_subscriptions: Some(1), + ..Default::default() + }; + + let subscription_node = SubscriptionNode { + service_name: Arc::from("subgraph-a"), + variable_usages: Vec::new(), + operation: SerializableDocument::from_string("subscription { onEvent { id } }"), + operation_name: None, + operation_kind: OperationKind::Subscription, + input_rewrites: None, + output_rewrites: None, + }; + + let (sender, _receiver) = mpsc::channel(1); + let supergraph_request = Arc::new( + http::Request::builder() + .body(crate::graphql::Request::builder().build()) + .unwrap(), + ); + + let schema = Arc::new( + crate::spec::Schema::parse( + include_str!("../../testdata/minimal_supergraph.graphql"), + &Default::default(), + ) + .expect("could not parse schema"), + ); + + let factory = Arc::new(SubgraphServiceFactory { + services: Arc::new(HashMap::new()), + }); + + let request = SubscriptionRequest::builder() + .context(Context::new()) + .subscription_node(subscription_node) + .supergraph_request(supergraph_request) + .variables(Variables::default()) + .current_dir(Path(Vec::new())) + .sender(sender) + .subscription_config(subscription_config) + .build(); + + let (data, errors) = subscription_with_subgraph_service(schema, factory, request) + .await + .expect("call should not fail"); + + assert_eq!(data, Value::default()); + assert_eq!(errors.len(), 1); + assert_eq!( + errors[0].message, + "can't open new subscription, limit reached" + ); + assert_eq!( + errors[0].extensions.get("code").and_then(|v| v.as_str()), + Some("SUBSCRIPTION_MAX_LIMIT") + ); + assert_counter!("apollo.router.operations.subscriptions.rejected.limit", 1); + + OPENED_SUBSCRIPTIONS.store(original_count, Ordering::Relaxed); + } + .with_metrics() + .await; + } +} diff --git a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx index 478f135528..b6b5d71e23 100644 --- a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx +++ b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx @@ -242,6 +242,7 @@ Similar to the initial call to Uplink, the router does not record metrics for ca - `apollo.router.opened.subscriptions` - Number of different opened subscriptions (not the number of clients with an opened subscriptions in case it's deduplicated). This metric contains `graphql.operation.name` label to know exactly which subscription is still opened. - `apollo.router.skipped.event.count` - Number of subscription events that has been skipped because too many events have been received from the subgraph but not yet sent to the client. +- `apollo.router.operations.subscriptions.rejected.limit` - Number of subscription requests rejected because the router has reached its [`max_opened_subscriptions`](/router/executing-operations/subscription-support/#limiting-the-number-of-client-connections) limit. ## Batching diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index 3c3a345912..a5e1f46fbf 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -453,7 +453,9 @@ subscription: #highlight-end ``` -If a client attempts to execute a subscription on your router when it's already at `max_open_subscriptions`, the router rejects the client's request with an error. +If a client attempts to execute a subscription on your router when it's already at `max_opened_subscriptions`, the router rejects the client's request with an error. + +Each rejected subscription increments the `apollo.router.operations.subscriptions.rejected.limit` counter metric, which you can use to monitor how often clients hit the limit. ### Tracking causes of ending subscriptions From cf8e42ee7776a32dc920e0eeb331ce7b8ff8562e Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:38:04 +0000 Subject: [PATCH 13/15] Format --- apollo-router/src/plugins/subscription/fetch.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apollo-router/src/plugins/subscription/fetch.rs b/apollo-router/src/plugins/subscription/fetch.rs index d5858f7aea..b06bed120c 100644 --- a/apollo-router/src/plugins/subscription/fetch.rs +++ b/apollo-router/src/plugins/subscription/fetch.rs @@ -246,23 +246,22 @@ mod tests { use std::sync::Arc; use std::sync::atomic::Ordering; - use crate::query_planner::OperationKind; use apollo_federation::query_plan::serializable_document::SerializableDocument; use serde_json_bytes::Value; use tokio::sync::mpsc; + use super::subscription_with_subgraph_service; use crate::Context; use crate::json_ext::Path; use crate::metrics::FutureMetricsExt; use crate::plugins::subscription::SubscriptionConfig; + use crate::query_planner::OperationKind; use crate::query_planner::fetch::Variables; use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS; use crate::query_planner::subscription::SubscriptionNode; use crate::services::SubgraphServiceFactory; use crate::services::fetch::SubscriptionRequest; - use super::subscription_with_subgraph_service; - #[tokio::test] async fn test_subscription_limit_reached_emits_metric() { async { From 2deda87390e6333e40b9802baf348ada5e84f4bb Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 19 Feb 2026 14:17:42 +0000 Subject: [PATCH 14/15] Apply style guide suggestions --- .../enabling-telemetry/standard-instruments.mdx | 2 +- docs/source/routing/operations/subscriptions/configuration.mdx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx index 256d35df9a..9efce1fc5a 100644 --- a/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx +++ b/docs/source/routing/observability/router-telemetry-otel/enabling-telemetry/standard-instruments.mdx @@ -242,7 +242,7 @@ Similar to the initial call to Uplink, the router does not record metrics for ca - `apollo.router.opened.subscriptions` - Number of different opened subscriptions (not the number of clients with an opened subscriptions in case it's deduplicated). This metric contains `graphql.operation.name` label to know exactly which subscription is still opened. - `apollo.router.skipped.event.count` - Number of subscription events that has been skipped because too many events have been received from the subgraph but not yet sent to the client. -- `apollo.router.operations.subscriptions.rejected.limit` - Number of subscription requests rejected because the router has reached its [`max_opened_subscriptions`](/router/executing-operations/subscription-support/#limiting-the-number-of-client-connections) limit. +- `apollo.router.operations.subscriptions.rejected.limit` - Number of subscription requests rejected because Apollo Router has reached its [`max_opened_subscriptions`](/router/executing-operations/subscription-support/#limiting-the-number-of-client-connections) limit. ## Batching diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index a5e1f46fbf..fed9a37a04 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -453,7 +453,7 @@ subscription: #highlight-end ``` -If a client attempts to execute a subscription on your router when it's already at `max_opened_subscriptions`, the router rejects the client's request with an error. +If a client attempts to execute a subscription on Apollo Router when it's already at `max_opened_subscriptions`, the router rejects the client's request with an error. Each rejected subscription increments the `apollo.router.operations.subscriptions.rejected.limit` counter metric, which you can use to monitor how often clients hit the limit. From b7d34ff04af77d7539d16addde4db9e2cff94e5c Mon Sep 17 00:00:00 2001 From: rohan-b99 <43239788+rohan-b99@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:55:24 +0000 Subject: [PATCH 15/15] Remove stream_end naming --- ..._observability_heartbeat_payload_errors.md | 4 ++-- apollo-router/src/protocols/multipart.rs | 20 +++++++++---------- .../subscriptions/configuration.mdx | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md index d5e4bdc5e7..181f0f6712 100644 --- a/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md +++ b/.changesets/feat_rohan_b99_subscription_observability_heartbeat_payload_errors.md @@ -4,7 +4,7 @@ Adds new span attributes and metrics to improve observability of streaming respo **Span attributes:** -- **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `stream_end`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`. +- **`apollo.subscription.end_reason`**: Records the reason a subscription was terminated. Possible values are `server_close`, `subgraph_error`, `heartbeat_delivery_failed`, `client_disconnect`, `schema_reload`, and `config_reload`. - **`apollo.defer.end_reason`**: Records the reason a deferred query ended. Possible values are `completed` (all deferred chunks were delivered successfully) and `client_disconnect` (the client disconnected before all deferred data was delivered). Both attributes are added dynamically to router spans only when relevant (i.e., only on requests that actually use subscriptions or `@defer`), rather than being present on every router span. @@ -13,7 +13,7 @@ Both attributes are added dynamically to router spans only when relevant (i.e., The following counters are emitted when a subscription terminates: -- **`apollo.router.operations.subscriptions.stream_end`** (attributes: `subgraph.service.name`): The subgraph gracefully closed the stream. +- **`apollo.router.operations.subscriptions.server_close`** (attributes: `subgraph.service.name`): The subgraph gracefully closed the stream. - **`apollo.router.operations.subscriptions.subgraph_error`** (attributes: `subgraph.service.name`): The subscription terminated unexpectedly due to a subgraph error (e.g. process killed, network drop). - **`apollo.router.operations.subscriptions.client_disconnect`** (attributes: `apollo.client.name`): The client disconnected before the subscription ended. - **`apollo.router.operations.subscriptions.heartbeat_delivery_failed`** (attributes: `apollo.client.name`): A heartbeat could not be delivered to the client. diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 035c634704..8c844c6639 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -67,7 +67,7 @@ pub(crate) struct Multipart { /// The end reason determined during polling, written to the span on Drop. /// If `None` when dropped and `!is_terminated`, an abnormal reason is inferred. end_reason: Option, - /// The subgraph name for subscription streams, used in stream_end metrics. + /// The subgraph name for subscription streams, used in server_close and subgraph_error metrics. subgraph_name: Option, /// The client name for subscription streams, used in client_disconnect and heartbeat_delivery_failed metrics. client_name: Option, @@ -103,7 +103,7 @@ impl Multipart { } } - /// Set the subgraph name for stream_end metrics attribution. + /// Set the subgraph name for server_close and subgraph_error metrics attribution. pub(crate) fn with_subgraph_name(mut self, name: Option) -> Self { self.subgraph_name = name; self @@ -256,7 +256,7 @@ impl Multipart { .unwrap_or("unknown") .to_string(); u64_counter!( - "apollo.router.operations.subscriptions.stream_end", + "apollo.router.operations.subscriptions.server_close", "Subscription terminated because the subgraph gracefully closed the stream", 1, subgraph.service.name = subgraph_name @@ -530,8 +530,6 @@ mod tests { async fn test_subscription_end_reason_server_close_empty_response() { async { // Test: Server closes connection successfully (empty response) - // ServerClose also emits the stream_end metric since in production all - // server-side terminations go through filter_stream → ServerClose. let (_guard, layer) = setup_tracing(); let span = tracing::info_span!("test_span"); let span_guard = span.enter(); @@ -565,7 +563,7 @@ mod tests { ); assert_counter!( - "apollo.router.operations.subscriptions.stream_end", + "apollo.router.operations.subscriptions.server_close", 1, "subgraph.service.name" = "test_subgraph" ); @@ -612,7 +610,7 @@ mod tests { ); assert_counter!( - "apollo.router.operations.subscriptions.stream_end", + "apollo.router.operations.subscriptions.server_close", 1, "subgraph.service.name" = "test_subgraph" ); @@ -651,7 +649,7 @@ mod tests { ); assert_counter!( - "apollo.router.operations.subscriptions.stream_end", + "apollo.router.operations.subscriptions.server_close", 1, "subgraph.service.name" = "test_subgraph" ); @@ -1283,9 +1281,9 @@ mod tests { } #[tokio::test] - async fn test_stream_end_metric_defaults_to_unknown_subgraph() { + async fn test_server_close_metric_defaults_to_unknown_subgraph() { async { - // Test: stream_end metric uses "unknown" when no subgraph name is set + // Test: server_close metric uses "unknown" when no subgraph name is set let (_guard, _layer) = setup_tracing(); let span = tracing::info_span!("test_span"); let _span_guard = span.enter(); @@ -1300,7 +1298,7 @@ mod tests { drop(span); assert_counter!( - "apollo.router.operations.subscriptions.stream_end", + "apollo.router.operations.subscriptions.server_close", 1, "subgraph.service.name" = "unknown" ); diff --git a/docs/source/routing/operations/subscriptions/configuration.mdx b/docs/source/routing/operations/subscriptions/configuration.mdx index fed9a37a04..81428c7909 100644 --- a/docs/source/routing/operations/subscriptions/configuration.mdx +++ b/docs/source/routing/operations/subscriptions/configuration.mdx @@ -476,5 +476,5 @@ The `apollo.subscription.end_reason` attribute can have these values: Use the `apollo.subscription.end_reason` attribute to monitor the health of your subscriptions: -- Frequent `stream_end` values might indicate issues with your subgraph subscription sources closing unexpectedly. +- Frequent `subgraph_error` values might indicate issues with your subgraph subscription sources closing unexpectedly. - The `server_close` value indicates healthy subscription lifecycles where the subgraph properly signals closure.