Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ed91aaf
fix(fetch): ensure proper cancellation of child streams when parent s…
Lei-k Oct 12, 2024
f878695
style(fetch): fix formatting
Lei-k Oct 12, 2024
d0282f4
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 14, 2024
2c6d34e
fix: resolve multiple ESLint issues
Lei-k Oct 14, 2024
4382ce0
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 14, 2024
c7c943f
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 15, 2024
643a342
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 15, 2024
f3aaa5d
Immediate stream cancellation after timeout in `_tryGetResponseText`
Lei-k Oct 15, 2024
7f92ea3
Merge branch 'fix/fetch-not-release' of https://github.com/Lei-k/sent…
Lei-k Oct 15, 2024
9b83669
fix conflicts
Lei-k Oct 15, 2024
1d29564
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 16, 2024
80aa60d
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 16, 2024
0cf8645
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 18, 2024
6d66814
fix file formatting
Lei-k Oct 18, 2024
0d10106
Update test cases to handle new logic in fetchUtils
Lei-k Oct 18, 2024
536cc02
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 18, 2024
78ac956
feat: define whatwg's stream types
Lei-k Oct 19, 2024
7fd0ffe
fix type error for tests
Lei-k Oct 19, 2024
78b3ed3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 23, 2024
8c99aee
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 27, 2024
5ec50f6
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 29, 2024
3923ecb
Merge branch 'getsentry:develop' into fix/fetch-not-release
Lei-k Oct 29, 2024
f13e8c2
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 30, 2024
4be816e
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 30, 2024
207ee69
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 31, 2024
94c0e2e
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 2, 2024
3ba1c20
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 4, 2024
189848d
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 8, 2024
8139565
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 15, 2024
8780cda
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 18, 2024
9ce2d8e
ref: Resolve merge conflict between develop and fix/fetch-not-release
Lei-k Dec 17, 2024
5c6ce1d
ref: fix typo
Lei-k Dec 17, 2024
2ed739d
ref: prettify file format
Lei-k Dec 17, 2024
02c2448
ref: fix file format
Lei-k Dec 17, 2024
b0b096a
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
8181799
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
dc69ad3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
dd4505e
chore: resolve conflict from PR #14745
Lei-k Dec 18, 2024
e506037
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 18, 2024
423bd67
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 19, 2024
7f0c21b
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 19, 2024
78d4453
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 23, 2024
8137807
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 23, 2024
617fcc3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 25, 2024
09a0fda
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions packages/replay-internal/src/coreHandlers/util/fetchUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,21 +293,40 @@ function _tryCloneResponse(response: Response): Response | void {
* Fetch can return a streaming body, that may not resolve (or not for a long time).
* If that happens, we rather abort after a short time than keep waiting for this.
*/
function _tryGetResponseText(response: Response): Promise<string | undefined> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Timeout while trying to read response body')), 500);

_getResponseText(response)
.then(
txt => resolve(txt),
reason => reject(reason),
)
.finally(() => clearTimeout(timeout));
});
}
async function _tryGetResponseText(response: Response): Promise<string | undefined> {
if (!response.body) {
throw new Error('Response has no body');
}

const reader = response.body.getReader();

const decoder = new TextDecoder();
let result = '';
let running = true;

const timeout = setTimeout(() => {
if (running) {
reader.cancel('Timeout while trying to read response body').catch(_ => {
// This block is only triggered when stream has already been released,
// so it's safe to ignore.
});
}
}, 500);

async function _getResponseText(response: Response): Promise<string> {
// Force this to be a promise, just to be safe
// eslint-disable-next-line no-return-await
return await response.text();
try {
while (running) {
const { value, done } = await reader.read();

running = !done;

if (value) {
result += decoder.decode(value, { stream: !done });
}
}
} finally {
clearTimeout(timeout);
reader.releaseLock();
}

return result;
}
121 changes: 78 additions & 43 deletions packages/utils/src/instrument/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,55 +115,91 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat
});
}

async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise<void> {
if (res && res.body) {
const body = res.body;
const responseReader = body.getReader();

// Define a maximum duration after which we just cancel
const maxFetchDurationTimeout = setTimeout(
() => {
body.cancel().then(null, () => {
// noop
});
},
90 * 1000, // 90s
);

let readingActive = true;
while (readingActive) {
let chunkTimeout;
try {
// abort reading if read op takes more than 5s
chunkTimeout = setTimeout(() => {
body.cancel().then(null, () => {
// noop on error
});
}, 5000);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing the 5-second chunkTimeout is fine, but if there is any reason to keep it, please let me know. Thanks!

async function resloveReader(reader: ReadableStreamDefaultReader, onFinishedResolving: () => void): Promise<void> {
let running = true;
while (running) {
try {
// This .read() call will reject/throw when `reader.cancel()`
const { done } = await reader.read();

// This .read() call will reject/throw when we abort due to timeouts through `body.cancel()`
const { done } = await responseReader.read();
running = !done;

clearTimeout(chunkTimeout);

if (done) {
onFinishedResolving();
readingActive = false;
}
} catch (error) {
readingActive = false;
} finally {
clearTimeout(chunkTimeout);
if (done) {
onFinishedResolving();
}
} catch (_) {
running = false;
}
}
}

/**
* Resolves the body stream of a `Response` object and links its cancellation to a parent `Response` body.
*
* This function attaches a custom `cancel` behavior to both the parent `Response` body and its `getReader()` method.
* When the parent stream or its reader is canceled, it triggers the cancellation of the child stream as well.
* The function also monitors the resolution of the child's body stream using `resloveReader` and performs cleanup.
*
* @param {Response} res - The `Response` object whose body stream will be resolved.
* @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`.
* @param {() => void} onFinishedResolving - A callback function to be invoked when the body stream of `res` is fully resolved.
*/
export function resolveResponse(res: Response, parentRes: Response, onFinishedResolving: () => void): void {
if (!res.body || !parentRes.body) {
if (res.body) {
res.body.cancel().catch(_ => {
// noop on error
});
}

return;
}

const body = res.body;
const parentBody = parentRes.body;
const responseReader = body.getReader();

clearTimeout(maxFetchDurationTimeout);
const originalCancel = parentBody.cancel.bind(parentBody) as (reason?: any) => Promise<any>;

responseReader.releaseLock();
body.cancel().then(null, () => {
// Override cancel method on parent response's body
parentBody.cancel = async (reason?: any) => {
responseReader.cancel('Cancelled by parent stream').catch(_ => {
// noop on error
});
}

await originalCancel(reason);
};

const originalGetReader = parentRes.body.getReader.bind(parentBody) as
(options: ReadableStreamGetReaderOptions) => ReadableStreamDefaultReader;

// Override getReader on parent response's body
parentBody.getReader = ((opts?: any) => {
const reader = originalGetReader(opts) as ReadableStreamDefaultReader;

const originalReaderCancel = reader.cancel.bind(reader) as (reason?: any) => Promise<any>;

reader.cancel = async (reason?: any) => {
responseReader.cancel('Cancelled by parent reader').catch(_ => {
// noop on error
});

await originalReaderCancel(reason);
};

return reader;
}) as any;

resloveReader(responseReader, onFinishedResolving).finally(() => {
try {
responseReader.releaseLock();
body.cancel().catch(() => {
// noop on error
});
} catch (_) {
// noop on error
}
});
}

function streamHandler(response: Response): void {
Expand All @@ -175,8 +211,7 @@ function streamHandler(response: Response): void {
return;
}

// eslint-disable-next-line @typescript-eslint/no-floating-promises
resolveResponse(clonedResponseForResolving, () => {
resolveResponse(clonedResponseForResolving, response, () => {
triggerHandlers('fetch-body-resolved', {
endTimestamp: timestampInSeconds() * 1000,
response,
Expand Down
122 changes: 121 additions & 1 deletion packages/utils/test/instrument/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
import { parseFetchArgs } from '../../src/instrument/fetch';
import { parseFetchArgs, resolveResponse } from '../../src/instrument/fetch';

async function delay(ms: number) {
await new Promise(res => {
setTimeout(() => {
res(true);
}, ms);
});
}

describe('instrument > parseFetchArgs', () => {
it.each([
Expand Down Expand Up @@ -27,3 +35,115 @@ describe('instrument > parseFetchArgs', () => {
expect(actual).toEqual(expected);
});
});

describe('instrument > fetch > resolveResponse', () => {
let mockReader: jest.Mocked<ReadableStreamDefaultReader<any>>;
let mockResponse: jest.Mocked<Response>;
let mockParentResponse: jest.Mocked<Response>;
let mockParentReader: jest.Mocked<ReadableStreamDefaultReader<any>>;
let onFinishedResolving: jest.Mock;

beforeEach(() => {
mockReader = {
read: jest.fn(),
cancel: jest.fn(async (reason?: any) => {
// Set read to reject on next call after cancel
mockReader.read.mockRejectedValueOnce(new Error(reason));
}),
releaseLock: jest.fn(),
} as unknown as jest.Mocked<ReadableStreamDefaultReader<any>>;

mockResponse = {
body: {
getReader: jest.fn(() => mockReader),
cancel: jest.fn(),
} as unknown as ReadableStream<any>,
} as jest.Mocked<Response>;

mockParentReader = {
read: jest.fn(),
cancel: jest.fn().mockResolvedValue(undefined),
releaseLock: jest.fn(),
} as unknown as jest.Mocked<ReadableStreamDefaultReader<any>>;

mockParentResponse = {
body: {
cancel: jest.fn().mockResolvedValue(undefined),
getReader: jest.fn(() => mockParentReader),
} as unknown as ReadableStream<any>,
} as jest.Mocked<Response>;

onFinishedResolving = jest.fn();
});

test('should call onFinishedResolving when the stream is fully read', async () => {
mockReader.read
.mockResolvedValueOnce({ done: false, value: 'chunk' })
.mockResolvedValueOnce({ done: true, value: null });

resolveResponse(mockResponse, mockParentResponse, onFinishedResolving);

// wait 100ms so all promise can be resolved/rejected
await delay(100);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.read).toHaveBeenCalledTimes(2);
expect(onFinishedResolving).toHaveBeenCalled();
});

test('should handle read errors gracefully', async () => {
mockReader.read.mockRejectedValue(new Error('Read error'));

resolveResponse(mockResponse, mockParentResponse, onFinishedResolving);

await delay(100);

expect(onFinishedResolving).not.toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.releaseLock).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockResponse.body?.cancel).toHaveBeenCalled();
});

test('should cancel reader and gracefully exit when parent response is cancelled', async () => {
mockReader.read
.mockResolvedValueOnce({ done: false, value: 'chunk1' })
.mockResolvedValueOnce({ done: false, value: 'chunk2' });

resolveResponse(mockResponse, mockParentResponse, onFinishedResolving);

await Promise.resolve();
await mockParentResponse.body?.cancel();
await delay(100);

expect(onFinishedResolving).not.toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.releaseLock).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.cancel).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockResponse.body?.cancel).toHaveBeenCalled();
});

test('should cancel reader and gracefully exit when parent reader is cancelled', async () => {
mockReader.read
.mockResolvedValueOnce({ done: false, value: 'chunk1' })
.mockResolvedValueOnce({ done: false, value: 'chunk2' });

resolveResponse(mockResponse, mockParentResponse, onFinishedResolving);

const parentReader = mockParentResponse.body!.getReader();
await Promise.resolve();

await parentReader.cancel();
await delay(100);

expect(onFinishedResolving).not.toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.releaseLock).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockReader.cancel).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(mockResponse.body?.cancel).toHaveBeenCalled();
});
});
Loading