Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 59 additions & 20 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import {
parseStringToURLObject,
SEMANTIC_ATTRIBUTE_SENTRY_OP,
setHttpStatus,
startSpan,
startSpanManual,
winterCGHeadersToDict,
withIsolationScope,
} from '@sentry/core';
import type { CloudflareOptions } from './client';
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
import { init } from './sdk';
import { classifyResponseStreaming } from './utils/streaming';

interface RequestHandlerWrapperOptions {
options: CloudflareOptions;
Expand Down Expand Up @@ -98,26 +99,64 @@ export function wrapRequestHandler(
// Note: This span will not have a duration unless I/O happens in the handler. This is
// because of how the cloudflare workers runtime works.
// See: https://developers.cloudflare.com/workers/runtime-apis/performance/
return startSpan(
{
name,
attributes,
},
async span => {
try {
const res = await handler();
setHttpStatus(span, res.status);
return res;
} catch (e) {
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
throw e;
} finally {
waitUntil?.(flush(2000));

// Use startSpanManual to control when span ends (needed for streaming responses)
return startSpanManual({ name, attributes }, async span => {
let res: Response;

try {
res = await handler();
setHttpStatus(span, res.status);
} catch (e) {
span.end();
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
},
);
waitUntil?.(flush(2000));
throw e;
}

// Classify response to detect actual streaming
const classification = classifyResponseStreaming(res);

if (classification.isStreaming && classification.response.body) {
// Streaming response detected - monitor consumption to keep span alive
const [clientStream, monitorStream] = classification.response.body.tee();

// Monitor stream consumption and end span when complete
const streamMonitor = (async () => {
const reader = monitorStream.getReader();

try {
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
}
} catch {
// Stream error or cancellation - will end span in finally
} finally {
reader.releaseLock();
span.end();
waitUntil?.(flush(2000));
}
})();
Comment on lines +133 to +143
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Missing error handling for response.body.tee() call.
Severity: MEDIUM | Confidence: Low

🔍 Detailed Analysis

The call to classification.response.body.tee() is not wrapped in error handling. If tee() throws (e.g., if the body stream is locked or in an invalid state), this will result in an uncaught exception. This bypasses existing error handling, preventing proper span termination and error capture by Sentry's exception handler.

💡 Suggested Fix

Wrap the classification.response.body.tee() call in a try-catch block to handle potential TypeError exceptions gracefully.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: packages/cloudflare/src/request.ts#L127-L143

Potential issue: The call to `classification.response.body.tee()` is not wrapped in
error handling. If `tee()` throws (e.g., if the body stream is locked or in an invalid
state), this will result in an uncaught exception. This bypasses existing error
handling, preventing proper span termination and error capture by Sentry's exception
handler.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 2881816


waitUntil?.(streamMonitor);

// Return response with client stream
return new Response(clientStream, {
status: classification.response.status,
statusText: classification.response.statusText,
headers: classification.response.headers,
});
}

// Non-streaming response - end span immediately and return original
span.end();
waitUntil?.(flush(2000));
return classification.response;
});
},
);
});
Expand Down
42 changes: 42 additions & 0 deletions packages/cloudflare/src/utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
export type StreamingGuess = {
response: Response;
isStreaming: boolean;
};

/**
* Classifies a Response as streaming or non-streaming.
*
* Heuristics:
* - No body → not streaming
* - Known streaming Content-Types → streaming (SSE, NDJSON, JSON streaming)
* - text/plain without Content-Length → streaming (some AI APIs)
* - Otherwise → not streaming (conservative default, including HTML/SSR)
*
* We avoid probing the stream to prevent blocking on transform streams (like injectTraceMetaTags)
* or SSR streams that may not have data ready immediately.
*/
export function classifyResponseStreaming(res: Response): StreamingGuess {
if (!res.body) {
return { response: res, isStreaming: false };
}

const contentType = res.headers.get('content-type') ?? '';
const contentLength = res.headers.get('content-length');

// Streaming: Known streaming content types
// - text/event-stream: Server-Sent Events (Vercel AI SDK, real-time APIs)
// - application/x-ndjson, application/ndjson: Newline-delimited JSON
// - application/stream+json: JSON streaming
// - text/plain (without Content-Length): Some AI APIs use this for streaming text
if (
/^text\/event-stream\b/i.test(contentType) ||
/^application\/(x-)?ndjson\b/i.test(contentType) ||
/^application\/stream\+json\b/i.test(contentType) ||
(/^text\/plain\b/i.test(contentType) && !contentLength)
) {
return { response: res, isStreaming: true };
}

// Default: treat as non-streaming
return { response: res, isStreaming: false };
}
34 changes: 29 additions & 5 deletions packages/cloudflare/test/durableobject.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ describe('instrumentDurableObjectWithSentry', () => {
});

it('flush performs after all waitUntil promises are finished', async () => {
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');

// Measure delta instead of absolute call count to avoid interference from parallel tests.
// Since we spy on the prototype, other tests running in parallel may also call flush.
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
const before = flush.mock.calls.length;

const waitUntil = vi.fn();
const testClass = vi.fn(context => ({
fetch: () => {
Expand All @@ -133,12 +140,29 @@ describe('instrumentDurableObjectWithSentry', () => {
waitUntil,
} as unknown as ExecutionContext;
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
expect(flush).not.toBeCalled();
expect(waitUntil).toHaveBeenCalledOnce();

// Call fetch (don't await yet)
const responsePromise = dObject.fetch(new Request('https://example.com'));

// Advance past classification timeout and get response
vi.advanceTimersByTime(30);
const response = await responsePromise;

// Consume response (triggers span end for buffered responses)
await response.text();

// The flush should now be queued in waitUntil
expect(waitUntil).toHaveBeenCalled();

// Advance to trigger the setTimeout in the handler's waitUntil
vi.advanceTimersToNextTimer();
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
expect(flush).toBeCalled();

const after = flush.mock.calls.length;
const delta = after - before;

// Verify that exactly one flush call was made during this test
expect(delta).toBe(1);
});

describe('instrumentPrototypeMethods option', () => {
Expand Down
6 changes: 5 additions & 1 deletion packages/cloudflare/test/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ describe('withSentry', () => {
createMockExecutionContext(),
);

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result?.status).toBe(response.status);
if (result) {
expect(await result.text()).toBe('test');
}
});

test('merges options from env and callback', async () => {
Expand Down
4 changes: 3 additions & 1 deletion packages/cloudflare/test/pages-plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => {
pluginArgs: MOCK_OPTIONS,
});

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});
});
50 changes: 44 additions & 6 deletions packages/cloudflare/test/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ describe('withSentry', () => {
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => response,
);
expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content matches
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});

test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => {
Expand All @@ -48,6 +50,25 @@ describe('withSentry', () => {
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
});

test('handles streaming responses correctly', async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});
const streamingResponse = new Response(stream);

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => streamingResponse,
);

const text = await result.text();
expect(text).toBe('chunk1chunk2');
});

test("doesn't error if context is undefined", () => {
expect(() =>
wrapRequestHandler(
Expand All @@ -69,11 +90,18 @@ describe('withSentry', () => {
});

test('flush must be called when all waitUntil are done', async () => {
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});

// Measure delta instead of absolute call count to avoid interference from parallel tests.
// Since we spy on the prototype, other tests running in parallel may also call flush.
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
const before = flushSpy.mock.calls.length;

const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn(promise => waits.push(promise));

Expand All @@ -83,13 +111,20 @@ describe('withSentry', () => {

await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
addDelayedWaitUntil(context);
return new Response('test');
const response = new Response('test');
// Add Content-Length to skip probing
response.headers.set('content-length', '4');
return response;
});
expect(flush).not.toBeCalled();
expect(waitUntil).toBeCalled();
vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers());
vi.advanceTimersToNextTimer().runAllTimers();
await Promise.all(waits);
expect(flush).toHaveBeenCalledOnce();

const after = flushSpy.mock.calls.length;
const delta = after - before;

// Verify that exactly one flush call was made during this test
expect(delta).toBe(1);
});

describe('scope instrumentation', () => {
Expand Down Expand Up @@ -303,6 +338,9 @@ describe('withSentry', () => {
},
);

// Wait for async span end and transaction capture
await new Promise(resolve => setTimeout(resolve, 50));

expect(sentryEvent.transaction).toEqual('GET /');
expect(sentryEvent.spans).toHaveLength(0);
expect(sentryEvent.contexts?.trace).toEqual({
Expand Down
Loading