Skip to content

Commit 04bb420

Browse files
committed
fix(transport): track cumulative inflight keepalive bytes and count
1 parent a73c4d1 commit 04bb420

File tree

4 files changed

+255
-4
lines changed

4 files changed

+255
-4
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import * as chai from 'chai';
2+
import sinonChai from 'sinon-chai';
3+
import {
4+
fakeFetchGetKeepalive,
5+
fakeFetchInstall,
6+
fakeFetchRespondWith,
7+
fakeFetchRestore,
8+
} from '../../../tests/utils/index.ts';
9+
import { _resetKeepaliveTracking, FetchTransport } from './FetchTransport.ts';
10+
11+
chai.use(sinonChai);
12+
const { expect } = chai;
13+
14+
const makeTransport = (
15+
overrides?: Partial<ConstructorParameters<typeof FetchTransport>[0]>,
16+
) =>
17+
new FetchTransport({
18+
url: 'http://example.com',
19+
headers: {},
20+
compression: 'none',
21+
...overrides,
22+
});
23+
24+
const smallPayload = new TextEncoder().encode('{"small": true}');
25+
const largePayload = new Uint8Array(49153); // 1 byte over the 48KiB budget
26+
27+
interface Deferred {
28+
promise: Promise<Response>;
29+
resolve: (value: Response) => void;
30+
reject: (reason: Error) => void;
31+
}
32+
33+
function createDeferred(): Deferred {
34+
let resolve!: (value: Response) => void;
35+
let reject!: (reason: Error) => void;
36+
const promise = new Promise<Response>((res, rej) => {
37+
resolve = res;
38+
reject = rej;
39+
});
40+
return { promise, resolve, reject };
41+
}
42+
43+
describe('FetchTransport', () => {
44+
beforeEach(() => {
45+
_resetKeepaliveTracking();
46+
fakeFetchInstall();
47+
});
48+
49+
afterEach(() => {
50+
fakeFetchRestore();
51+
});
52+
53+
describe('keepalive budget', () => {
54+
it('should set keepalive to true for payloads within the budget', async () => {
55+
fakeFetchRespondWith('ok', { status: 200 });
56+
const transport = makeTransport();
57+
58+
await transport.send(smallPayload, 1000);
59+
60+
expect(fakeFetchGetKeepalive()).to.equal(true);
61+
});
62+
63+
it('should set keepalive to false for payloads exceeding the budget', async () => {
64+
fakeFetchRespondWith('ok', { status: 200 });
65+
const transport = makeTransport();
66+
67+
await transport.send(largePayload, 1000);
68+
69+
expect(fakeFetchGetKeepalive()).to.equal(false);
70+
});
71+
});
72+
73+
describe('keepalive cumulative tracking', () => {
74+
it('should disable keepalive when cumulative bytes exceed the budget', async () => {
75+
const deferred = createDeferred();
76+
fakeFetchRestore();
77+
const stub = fakeFetchInstall();
78+
stub.onCall(0).returns(deferred.promise);
79+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
80+
81+
const transport = makeTransport();
82+
83+
// 30KiB payload — fits individually but two exceed the 48KiB budget
84+
const payload30k = new Uint8Array(30 * 1024);
85+
86+
// Start first request (stays inflight)
87+
const first = transport.send(payload30k, 5000);
88+
// Allow microtask to run so fetch is called
89+
await Promise.resolve();
90+
91+
expect(fakeFetchGetKeepalive(0)).to.equal(true);
92+
93+
// Second request should exceed cumulative budget
94+
await transport.send(payload30k, 5000);
95+
96+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
97+
98+
// Clean up
99+
deferred.resolve(new Response('ok', { status: 200 }));
100+
await first;
101+
});
102+
103+
it('should disable keepalive when concurrent count reaches 9', async () => {
104+
const deferreds: Deferred[] = [];
105+
fakeFetchRestore();
106+
const stub = fakeFetchInstall();
107+
108+
for (let i = 0; i < 9; i++) {
109+
const d = createDeferred();
110+
deferreds.push(d);
111+
stub.onCall(i).returns(d.promise);
112+
}
113+
stub.onCall(9).resolves(new Response('ok', { status: 200 }));
114+
115+
const transport = makeTransport();
116+
const tinyPayload = new Uint8Array(1);
117+
118+
// Start 9 concurrent requests
119+
const pending = [];
120+
for (let i = 0; i < 9; i++) {
121+
pending.push(transport.send(tinyPayload, 5000));
122+
await Promise.resolve();
123+
}
124+
125+
for (let i = 0; i < 9; i++) {
126+
expect(fakeFetchGetKeepalive(i)).to.equal(true);
127+
}
128+
129+
// 10th request should exceed concurrent limit
130+
await transport.send(tinyPayload, 5000);
131+
132+
expect(fakeFetchGetKeepalive(9)).to.equal(false);
133+
134+
// Clean up
135+
for (const d of deferreds) {
136+
d.resolve(new Response('ok', { status: 200 }));
137+
}
138+
await Promise.all(pending);
139+
});
140+
141+
it('should decrement counters after request completes', async () => {
142+
const deferred = createDeferred();
143+
fakeFetchRestore();
144+
const stub = fakeFetchInstall();
145+
stub.onCall(0).returns(deferred.promise);
146+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
147+
stub.onCall(2).resolves(new Response('ok', { status: 200 }));
148+
149+
const transport = makeTransport();
150+
const payload30k = new Uint8Array(30 * 1024);
151+
152+
// First request stays inflight
153+
const first = transport.send(payload30k, 5000);
154+
await Promise.resolve();
155+
156+
// Second request exceeds cumulative budget
157+
await transport.send(payload30k, 5000);
158+
159+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
160+
161+
// Resolve first request to free up budget
162+
deferred.resolve(new Response('ok', { status: 200 }));
163+
await first;
164+
165+
// Third request should fit now
166+
await transport.send(payload30k, 5000);
167+
168+
expect(fakeFetchGetKeepalive(2)).to.equal(true);
169+
});
170+
171+
it('should decrement counters after request fails', async () => {
172+
const deferred = createDeferred();
173+
fakeFetchRestore();
174+
const stub = fakeFetchInstall();
175+
stub.onCall(0).returns(deferred.promise);
176+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
177+
stub.onCall(2).resolves(new Response('ok', { status: 200 }));
178+
179+
const transport = makeTransport();
180+
const payload30k = new Uint8Array(30 * 1024);
181+
182+
// First request stays inflight
183+
const first = transport.send(payload30k, 5000);
184+
await Promise.resolve();
185+
186+
// Second request exceeds cumulative budget
187+
await transport.send(payload30k, 5000);
188+
189+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
190+
191+
// Reject first request to free up budget
192+
deferred.reject(new TypeError('Failed to fetch'));
193+
await first;
194+
195+
// Third request should fit now
196+
await transport.send(payload30k, 5000);
197+
198+
expect(fakeFetchGetKeepalive(2)).to.equal(true);
199+
});
200+
});
201+
202+
describe('network errors', () => {
203+
it('should return failure when fetch throws', async () => {
204+
// Restore then re-install to reconfigure the stub to reject
205+
fakeFetchRestore();
206+
fakeFetchInstall().rejects(new TypeError('Failed to fetch'));
207+
208+
const transport = makeTransport();
209+
const result = await transport.send(smallPayload, 1000);
210+
211+
expect(result).to.deep.equal({
212+
status: 'failure',
213+
error: new TypeError('Failed to fetch'),
214+
});
215+
});
216+
});
217+
});

packages/web-sdk/src/transport/FetchTransport/FetchTransport.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,28 @@ import type {
44
} from '@opentelemetry/otlp-exporter-base';
55
import type { FetchRequestParameters } from './types.ts';
66

7+
// The fetch spec enforces a 64KiB limit on the sum of all inflight keepalive
8+
// request bodies per fetch group. We use a conservative budget to leave room
9+
// for third-party scripts that may also be using keepalive or sendBeacon.
10+
// https://github.com/whatwg/fetch/issues/679
11+
const KEEPALIVE_BYTE_BUDGET = 49152; // 48KiB
12+
13+
// Chrome enforces a 9-request concurrent keepalive limit per renderer process.
14+
const MAX_KEEPALIVE_REQUESTS = 9;
15+
16+
let inflightKeepaliveBytes = 0;
17+
let inflightKeepaliveCount = 0;
18+
19+
/** @internal for testing only */
20+
export function _resetKeepaliveTracking(): void {
21+
inflightKeepaliveBytes = 0;
22+
inflightKeepaliveCount = 0;
23+
}
24+
725
export class FetchTransport implements IExporterTransport {
826
public constructor(private readonly _config: FetchRequestParameters) {}
927

10-
// _compressRequest compresses the data using the gzip algorithm.
11-
// Embrace Data endpoints require the data to be compressed.
28+
// Embrace endpoints require data to be encoded with gzip
1229
private static async _compressRequest(
1330
data: Uint8Array<ArrayBuffer>,
1431
): Promise<Uint8Array<ArrayBuffer>> {
@@ -69,7 +86,6 @@ export class FetchTransport implements IExporterTransport {
6986

7087
if (this._config.compression === 'gzip') {
7188
request = await FetchTransport._compressRequest(data);
72-
7389
headers['Content-Encoding'] = 'gzip';
7490
headers['Content-Length'] = request.length.toString();
7591
}
@@ -89,10 +105,20 @@ export class FetchTransport implements IExporterTransport {
89105
}, timeoutMillis);
90106
}
91107

108+
const wouldExceedBytes =
109+
inflightKeepaliveBytes + request.byteLength > KEEPALIVE_BYTE_BUDGET;
110+
const wouldExceedCount = inflightKeepaliveCount >= MAX_KEEPALIVE_REQUESTS;
111+
const keepalive = !wouldExceedBytes && !wouldExceedCount;
112+
113+
if (keepalive) {
114+
inflightKeepaliveBytes += request.byteLength;
115+
inflightKeepaliveCount++;
116+
}
117+
92118
try {
93119
const response = await fetch(this._config.url, {
94120
method: 'POST',
95-
keepalive: true,
121+
keepalive,
96122
headers,
97123
body: request,
98124
signal,
@@ -115,6 +141,10 @@ export class FetchTransport implements IExporterTransport {
115141
if (timeoutId !== undefined) {
116142
clearTimeout(timeoutId);
117143
}
144+
if (keepalive) {
145+
inflightKeepaliveBytes -= request.byteLength;
146+
inflightKeepaliveCount--;
147+
}
118148
}
119149
}
120150
}

packages/web-sdk/tests/utils/fakeFetch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,8 @@ export const fakeFetchRespondWith = (
7373
options?: ResponseInit,
7474
) => fetchStub?.callsFake(() => Promise.resolve(new Response(data, options)));
7575

76+
export const fakeFetchGetKeepalive = (callNumber = 0) =>
77+
fakeFetchGetOptions(callNumber)?.keepalive;
78+
7679
export const fakeFetchWasCalled = (callNumber = 0) =>
7780
!!fetchStub?.getCall(callNumber);

packages/web-sdk/tests/utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export { FakeLogRecordProcessor } from './FakeLogRecordProcessor.ts';
99
export { FakeSpanProcessor } from './FakeSpanProcessor.ts';
1010
export {
1111
fakeFetchGetBody,
12+
fakeFetchGetKeepalive,
1213
fakeFetchGetMethod,
1314
fakeFetchGetOptions,
1415
fakeFetchGetRequestHeaders,

0 commit comments

Comments
 (0)