fix: request aborted shoudn't result in a fetch error#2741
fix: request aborted shoudn't result in a fetch error#2741
Conversation
WalkthroughThis pull request modifies client disconnection error handling across the router's request lifecycle. Changes ensure that Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
Comment |
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2741 +/- ##
==========================================
- Coverage 63.46% 57.77% -5.69%
==========================================
Files 251 235 -16
Lines 26767 26204 -563
==========================================
- Hits 16987 15140 -1847
- Misses 8414 9603 +1189
- Partials 1366 1461 +95
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
router/core/batch.go (1)
183-212:⚠️ Potential issue | 🟠 MajorReturn immediately for canceled batch requests.
This branch still falls through to
writeRequestErrors, so a batched disconnect will try to serialize a GraphQL error after we've already classified the failure as a client-side cancellation. That can reintroduce write-side noise and mutate the observed status in the batch path.💡 Proposed fix
func processBatchError(w http.ResponseWriter, r *http.Request, err error, requestLogger *zap.Logger) { - if errors.Is(err, context.Canceled) { - span := trace.SpanFromContext(r.Context()) - span.RecordError(err) - } else { - ctrace.AttachErrToSpanFromContext(r.Context(), err) - } + if errors.Is(err, context.Canceled) { + trace.SpanFromContext(r.Context()).RecordError(err) + return + } + + ctrace.AttachErrToSpanFromContext(r.Context(), err) requestError := graphqlerrors.RequestError{🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/core/batch.go` around lines 183 - 212, In processBatchError, the context.Canceled branch records the span but then falls through to writeRequestErrors; modify processBatchError so that when errors.Is(err, context.Canceled) is true you RecordError on the span (as now) and then immediately return to avoid calling writeRequestErrors and serializing a GraphQL error for client cancellations; keep the existing else branch (ctrace.AttachErrToSpanFromContext) and the subsequent handling for non-canceled errors intact.
🧹 Nitpick comments (2)
router/core/engine_loader_hooks_test.go (2)
373-392: Assert the merged attrs at the metric-store boundary too.Right now this only checks the returned slice. The test would still pass if
recordFetchErrorreturned the merged attrs but calledMeasureRequestErrorwith the old slice.Suggested assertion
resultSlice, _ := hooks.recordFetchError(ctx, span, fetchErr, rc, nil, metricAddOpt, prePopulated) span.End() @@ require.True(t, hasExisting, "pre-populated attrs should be preserved") require.True(t, hasErrorCodes, "error codes should be appended") + + require.True(t, store.requestErrorCalled, "MeasureRequestError should be called") + + var metricHasExisting, metricHasErrorCodes bool + for _, attr := range store.requestErrorSliceAttr { + if string(attr.Key) == "existing.attr" { + metricHasExisting = true + } + if string(attr.Key) == "graphql.error.codes" { + metricHasErrorCodes = true + } + } + require.True(t, metricHasExisting, + "MeasureRequestError should receive the pre-populated attrs") + require.True(t, metricHasErrorCodes, + "MeasureRequestError should receive the appended error codes")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/core/engine_loader_hooks_test.go` around lines 373 - 392, The test currently only asserts the returned slice from hooks.recordFetchError contains merged attributes; extend it to also verify that the metric-store call received the merged slice: capture the arguments passed to the mock MetricStore.MeasureRequestError (or the equivalent MeasureRequestError spy used by hooks), and assert that the captured attrs include both the prePopulated attribute ("existing.attr") and the "graphql.error.codes" attribute (same checks used for resultSlice). Update references to the mock/spy used by the hooks setup (e.g., the MetricStore mock instance) and reuse metricAddOpt/prePopulated/resultSlice names to ensure MeasureRequestError was invoked with the merged attrs.
107-130: Assert the exception event for wrapped cancellations too.This case currently only proves the status/metric behavior. If the wrapped
context.Canceledpath stops emitting the observability event, the regression would still pass.Suggested assertion
require.NotEqual(t, codes.Error, spans[0].Status().Code, "wrapped context.Canceled should not set span status to Error") + require.Len(t, spans[0].Events(), 1, + "wrapped context.Canceled should still be recorded as a span event") + require.Equal(t, "exception", spans[0].Events()[0].Name) require.False(t, store.requestErrorCalled, "MeasureRequestError should not be called for wrapped context.Canceled")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/core/engine_loader_hooks_test.go` around lines 107 - 130, The test "wrapped context.Canceled does not set span ERROR status" currently omits asserting that an observability exception event is still emitted for wrapped cancellations; update the test after calling hooks.OnFinished(ctx, ds, &resolve.ResponseInfo{Err: wrappedErr}) to inspect the recorded span (spans[0]) events and assert that there is an "exception" event (or an event whose attributes indicate an exception/exception.type/exception.message matching wrappedErr) so the wrapped context.Canceled path still emits the expected exception event; use the existing exporter.GetSpans().Snapshots() and spans[0].Events() APIs to locate and assert the event.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@router-tests/telemetry/span_error_status_test.go`:
- Around line 188-191: Replace the fixed time.Sleep usage with
require.Eventually to avoid races when waiting for async span/log export:
instead of sleeping then calling exporter.GetSpans().Snapshots() and
require.NotEmpty(t, spans), call require.Eventually with a short polling
interval and timeout and inside the poll invoke exporter.GetSpans().Snapshots()
and assert len(spans) > 0 (or use require.NotEmpty within the closure) so the
test waits until all expected spans are exported; update the same pattern found
around the other occurrences that use time.Sleep before checking
exporter.GetSpans().Snapshots() (the instances at the other mentioned blocks
should be changed similarly).
In `@router/core/engine_loader_hooks.go`:
- Around line 274-331: The recordFetchError helper currently records the error
and metrics but never sets the span attribute that marks non-canceled request
failures; update recordFetchError to set the span attribute
rotel.WgRequestError.Bool(true) for real fetch failures (i.e., when fetchErr is
not context.Canceled and not context.DeadlineExceeded or equivalent cancellation
checks) before returning so traces and metrics stay in sync; reference the
recordFetchError function and rotel.WgRequestError to locate where to add
span.SetAttributes(...) alongside the existing metricAttrs append and
otelmetric.WithAttributeSet creation.
---
Outside diff comments:
In `@router/core/batch.go`:
- Around line 183-212: In processBatchError, the context.Canceled branch records
the span but then falls through to writeRequestErrors; modify processBatchError
so that when errors.Is(err, context.Canceled) is true you RecordError on the
span (as now) and then immediately return to avoid calling writeRequestErrors
and serializing a GraphQL error for client cancellations; keep the existing else
branch (ctrace.AttachErrToSpanFromContext) and the subsequent handling for
non-canceled errors intact.
---
Nitpick comments:
In `@router/core/engine_loader_hooks_test.go`:
- Around line 373-392: The test currently only asserts the returned slice from
hooks.recordFetchError contains merged attributes; extend it to also verify that
the metric-store call received the merged slice: capture the arguments passed to
the mock MetricStore.MeasureRequestError (or the equivalent MeasureRequestError
spy used by hooks), and assert that the captured attrs include both the
prePopulated attribute ("existing.attr") and the "graphql.error.codes" attribute
(same checks used for resultSlice). Update references to the mock/spy used by
the hooks setup (e.g., the MetricStore mock instance) and reuse
metricAddOpt/prePopulated/resultSlice names to ensure MeasureRequestError was
invoked with the merged attrs.
- Around line 107-130: The test "wrapped context.Canceled does not set span
ERROR status" currently omits asserting that an observability exception event is
still emitted for wrapped cancellations; update the test after calling
hooks.OnFinished(ctx, ds, &resolve.ResponseInfo{Err: wrappedErr}) to inspect the
recorded span (spans[0]) events and assert that there is an "exception" event
(or an event whose attributes indicate an
exception/exception.type/exception.message matching wrappedErr) so the wrapped
context.Canceled path still emits the expected exception event; use the existing
exporter.GetSpans().Snapshots() and spans[0].Events() APIs to locate and assert
the event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 95ba86f9-0380-4da8-8e23-5351e8b8d117
📒 Files selected for processing (9)
router-tests/telemetry/span_error_status_test.gorouter/core/batch.gorouter/core/engine_loader_hooks.gorouter/core/engine_loader_hooks_test.gorouter/core/errors.gorouter/core/graphql_prehandler.gorouter/core/operation_metrics_test.gorouter/pkg/trace/transport.gorouter/pkg/trace/transport_test.go
| time.Sleep(500 * time.Millisecond) | ||
|
|
||
| spans := exporter.GetSpans().Snapshots() | ||
| require.NotEmpty(t, spans) |
There was a problem hiding this comment.
Replace the new fixed sleeps with require.Eventually.
These assertions depend on async span/log export, so time.Sleep(500 * time.Millisecond) is still racy under slower CI and can miss late spans or log entries.
As per coding guidelines, "For periodic exporters, wait for ALL expected items using require.Eventually, not just one sentinel value, to avoid race conditions with export cycles".
Also applies to: 292-295, 361-363
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@router-tests/telemetry/span_error_status_test.go` around lines 188 - 191,
Replace the fixed time.Sleep usage with require.Eventually to avoid races when
waiting for async span/log export: instead of sleeping then calling
exporter.GetSpans().Snapshots() and require.NotEmpty(t, spans), call
require.Eventually with a short polling interval and timeout and inside the poll
invoke exporter.GetSpans().Snapshots() and assert len(spans) > 0 (or use
require.NotEmpty within the closure) so the test waits until all expected spans
are exported; update the same pattern found around the other occurrences that
use time.Sleep before checking exporter.GetSpans().Snapshots() (the instances at
the other mentioned blocks should be changed similarly).
| func (f *engineLoaderHooks) recordFetchError( | ||
| ctx context.Context, | ||
| span trace.Span, | ||
| fetchErr error, | ||
| reqContext *requestContext, | ||
| metricAttrs []attribute.KeyValue, | ||
| metricAddOpt otelmetric.AddOption, | ||
| metricSliceAttrs []attribute.KeyValue, | ||
| ) ([]attribute.KeyValue, otelmetric.MeasurementOption) { | ||
| rtrace.SetSanitizedSpanStatus(span, codes.Error, fetchErr.Error()) | ||
| span.RecordError(fetchErr) | ||
|
|
||
| // Extract downstream error codes from subgraph errors | ||
| var errorCodesAttr []string | ||
|
|
||
| if unwrapped, ok := fetchErr.(multiError); ok { | ||
| for _, e := range unwrapped.Unwrap() { | ||
| var subgraphError *resolve.SubgraphError | ||
| if !errors.As(e, &subgraphError) { | ||
| continue | ||
| } | ||
|
|
||
| for i, downstreamError := range subgraphError.DownstreamErrors { | ||
| var errorCode string | ||
| if downstreamError.Extensions != nil { | ||
| if value := downstreamError.Extensions.Get("code"); value != nil { | ||
| errorCode = string(value.GetStringBytes()) | ||
| } | ||
| } | ||
|
|
||
| if errorCode == "" { | ||
| continue | ||
| } | ||
|
|
||
| errorCodesAttr = append(errorCodesAttr, errorCode) | ||
| span.AddEvent(fmt.Sprintf("Downstream error %d", i+1), | ||
| trace.WithAttributes( | ||
| rotel.WgSubgraphErrorExtendedCode.String(errorCode), | ||
| rotel.WgSubgraphErrorMessage.String(downstreamError.Message), | ||
| ), | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| errorCodesAttr = unique.SliceElements(errorCodesAttr) | ||
| // Reduce cardinality of error codes | ||
| slices.Sort(errorCodesAttr) | ||
| } | ||
|
|
||
| metricSliceAttrs := *reqContext.telemetry.AcquireAttributes() | ||
| defer reqContext.telemetry.ReleaseAttributes(&metricSliceAttrs) | ||
| metricSliceAttrs = append(metricSliceAttrs, reqContext.telemetry.metricSliceAttrs...) | ||
|
|
||
| // We can't add this earlier because this is done per subgraph response | ||
| if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 { | ||
| metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr)) | ||
| } | ||
|
|
||
| f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt) | ||
| if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 { | ||
| metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr)) | ||
| } | ||
|
|
||
| metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true)) | ||
| f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt) | ||
|
|
||
| attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) | ||
| f.metricStore.MeasureRequestCount(ctx, metricSliceAttrs, attrOpt) | ||
| f.metricStore.MeasureLatency(ctx, latency, metricSliceAttrs, attrOpt) | ||
| } else { | ||
| f.metricStore.MeasureRequestCount(ctx, reqContext.telemetry.metricSliceAttrs, metricAddOpt) | ||
| f.metricStore.MeasureLatency(ctx, latency, reqContext.telemetry.metricSliceAttrs, metricAddOpt) | ||
| } | ||
| metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true)) | ||
| attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) | ||
|
|
||
| span.SetAttributes(traceAttrs...) | ||
| return metricSliceAttrs, attrOpt |
There was a problem hiding this comment.
Restore wg.request.error=true on real fetch failures.
This helper replaced the previous rtrace.AttachErrToSpan path, but it no longer sets the span attribute that marks non-canceled request failures. The metric attrs still get wg.request.error=true, so traces and metrics will drift apart for the same fetch error.
💡 Proposed fix
func (f *engineLoaderHooks) recordFetchError(
ctx context.Context,
span trace.Span,
fetchErr error,
reqContext *requestContext,
metricAttrs []attribute.KeyValue,
metricAddOpt otelmetric.AddOption,
metricSliceAttrs []attribute.KeyValue,
) ([]attribute.KeyValue, otelmetric.MeasurementOption) {
rtrace.SetSanitizedSpanStatus(span, codes.Error, fetchErr.Error())
+ span.SetAttributes(rotel.WgRequestError.Bool(true))
span.RecordError(fetchErr)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (f *engineLoaderHooks) recordFetchError( | |
| ctx context.Context, | |
| span trace.Span, | |
| fetchErr error, | |
| reqContext *requestContext, | |
| metricAttrs []attribute.KeyValue, | |
| metricAddOpt otelmetric.AddOption, | |
| metricSliceAttrs []attribute.KeyValue, | |
| ) ([]attribute.KeyValue, otelmetric.MeasurementOption) { | |
| rtrace.SetSanitizedSpanStatus(span, codes.Error, fetchErr.Error()) | |
| span.RecordError(fetchErr) | |
| // Extract downstream error codes from subgraph errors | |
| var errorCodesAttr []string | |
| if unwrapped, ok := fetchErr.(multiError); ok { | |
| for _, e := range unwrapped.Unwrap() { | |
| var subgraphError *resolve.SubgraphError | |
| if !errors.As(e, &subgraphError) { | |
| continue | |
| } | |
| for i, downstreamError := range subgraphError.DownstreamErrors { | |
| var errorCode string | |
| if downstreamError.Extensions != nil { | |
| if value := downstreamError.Extensions.Get("code"); value != nil { | |
| errorCode = string(value.GetStringBytes()) | |
| } | |
| } | |
| if errorCode == "" { | |
| continue | |
| } | |
| errorCodesAttr = append(errorCodesAttr, errorCode) | |
| span.AddEvent(fmt.Sprintf("Downstream error %d", i+1), | |
| trace.WithAttributes( | |
| rotel.WgSubgraphErrorExtendedCode.String(errorCode), | |
| rotel.WgSubgraphErrorMessage.String(downstreamError.Message), | |
| ), | |
| ) | |
| } | |
| } | |
| errorCodesAttr = unique.SliceElements(errorCodesAttr) | |
| // Reduce cardinality of error codes | |
| slices.Sort(errorCodesAttr) | |
| } | |
| metricSliceAttrs := *reqContext.telemetry.AcquireAttributes() | |
| defer reqContext.telemetry.ReleaseAttributes(&metricSliceAttrs) | |
| metricSliceAttrs = append(metricSliceAttrs, reqContext.telemetry.metricSliceAttrs...) | |
| // We can't add this earlier because this is done per subgraph response | |
| if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 { | |
| metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr)) | |
| } | |
| f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt) | |
| if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 { | |
| metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr)) | |
| } | |
| metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true)) | |
| f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt) | |
| attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) | |
| f.metricStore.MeasureRequestCount(ctx, metricSliceAttrs, attrOpt) | |
| f.metricStore.MeasureLatency(ctx, latency, metricSliceAttrs, attrOpt) | |
| } else { | |
| f.metricStore.MeasureRequestCount(ctx, reqContext.telemetry.metricSliceAttrs, metricAddOpt) | |
| f.metricStore.MeasureLatency(ctx, latency, reqContext.telemetry.metricSliceAttrs, metricAddOpt) | |
| } | |
| metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true)) | |
| attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) | |
| span.SetAttributes(traceAttrs...) | |
| return metricSliceAttrs, attrOpt | |
| func (f *engineLoaderHooks) recordFetchError( | |
| ctx context.Context, | |
| span trace.Span, | |
| fetchErr error, | |
| reqContext *requestContext, | |
| metricAttrs []attribute.KeyValue, | |
| metricAddOpt otelmetric.AddOption, | |
| metricSliceAttrs []attribute.KeyValue, | |
| ) ([]attribute.KeyValue, otelmetric.MeasurementOption) { | |
| rtrace.SetSanitizedSpanStatus(span, codes.Error, fetchErr.Error()) | |
| span.SetAttributes(rotel.WgRequestError.Bool(true)) | |
| span.RecordError(fetchErr) | |
| // Extract downstream error codes from subgraph errors | |
| var errorCodesAttr []string | |
| if unwrapped, ok := fetchErr.(multiError); ok { | |
| for _, e := range unwrapped.Unwrap() { | |
| var subgraphError *resolve.SubgraphError | |
| if !errors.As(e, &subgraphError) { | |
| continue | |
| } | |
| for i, downstreamError := range subgraphError.DownstreamErrors { | |
| var errorCode string | |
| if downstreamError.Extensions != nil { | |
| if value := downstreamError.Extensions.Get("code"); value != nil { | |
| errorCode = string(value.GetStringBytes()) | |
| } | |
| } | |
| if errorCode == "" { | |
| continue | |
| } | |
| errorCodesAttr = append(errorCodesAttr, errorCode) | |
| span.AddEvent(fmt.Sprintf("Downstream error %d", i+1), | |
| trace.WithAttributes( | |
| rotel.WgSubgraphErrorExtendedCode.String(errorCode), | |
| rotel.WgSubgraphErrorMessage.String(downstreamError.Message), | |
| ), | |
| ) | |
| } | |
| } | |
| errorCodesAttr = unique.SliceElements(errorCodesAttr) | |
| slices.Sort(errorCodesAttr) | |
| } | |
| if v, ok := reqContext.telemetry.metricSetAttrs[ContextFieldGraphQLErrorCodes]; ok && len(errorCodesAttr) > 0 { | |
| metricSliceAttrs = append(metricSliceAttrs, attribute.StringSlice(v, errorCodesAttr)) | |
| } | |
| f.metricStore.MeasureRequestError(ctx, metricSliceAttrs, metricAddOpt) | |
| metricAttrs = append(metricAttrs, rotel.WgRequestError.Bool(true)) | |
| attrOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...)) | |
| return metricSliceAttrs, attrOpt | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@router/core/engine_loader_hooks.go` around lines 274 - 331, The
recordFetchError helper currently records the error and metrics but never sets
the span attribute that marks non-canceled request failures; update
recordFetchError to set the span attribute rotel.WgRequestError.Bool(true) for
real fetch failures (i.e., when fetchErr is not context.Canceled and not
context.DeadlineExceeded or equivalent cancellation checks) before returning so
traces and metrics stay in sync; reference the recordFetchError function and
rotel.WgRequestError to locate where to add span.SetAttributes(...) alongside
the existing metricAttrs append and otelmetric.WithAttributeSet creation.
This PR fixes places where we mark spans as errors for client disconnects.
Summary by CodeRabbit
Release Notes
Bug Fixes
Tests
Checklist
Open Source AI Manifesto
This project follows the principles of the Open Source AI Manifesto. Please ensure your contribution aligns with its principles.