Skip to content

Commit 7a70988

Browse files
committed
fix(transport): track cumulative inflight keepalive bytes and count
1 parent a73c4d1 commit 7a70988

File tree

5 files changed

+368
-19
lines changed

5 files changed

+368
-19
lines changed
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
import { DiagLogLevel, diag } from '@opentelemetry/api';
2+
import * as chai from 'chai';
3+
import {
4+
fakeFetchGetKeepalive,
5+
fakeFetchInstall,
6+
fakeFetchRespondWith,
7+
fakeFetchRestore,
8+
InMemoryDiagLogger,
9+
} from '../../../tests/utils/index.ts';
10+
import { _resetKeepaliveTracking, FetchTransport } from './FetchTransport.ts';
11+
12+
const { expect } = chai;
13+
14+
const makeTransport = (
15+
overrides?: Partial<ConstructorParameters<typeof FetchTransport>[0]>,
16+
) =>
17+
new FetchTransport({
18+
url: 'http://example.com',
19+
headers: {},
20+
compression: 'none',
21+
...overrides,
22+
});
23+
24+
const smallPayload = new TextEncoder().encode('{"small": true}');
25+
const largePayload = new Uint8Array(49153); // 1 byte over the 48KiB budget
26+
27+
interface Deferred {
28+
promise: Promise<Response>;
29+
resolve: (value: Response) => void;
30+
reject: (reason: Error) => void;
31+
}
32+
33+
function createDeferred(): Deferred {
34+
let resolve!: (value: Response) => void;
35+
let reject!: (reason: Error) => void;
36+
const promise = new Promise<Response>((res, rej) => {
37+
resolve = res;
38+
reject = rej;
39+
});
40+
return { promise, resolve, reject };
41+
}
42+
43+
describe('FetchTransport', () => {
44+
let diagLogger: InMemoryDiagLogger;
45+
46+
beforeEach(() => {
47+
_resetKeepaliveTracking();
48+
fakeFetchInstall();
49+
diagLogger = new InMemoryDiagLogger();
50+
diag.setLogger(diagLogger, DiagLogLevel.ALL);
51+
});
52+
53+
afterEach(() => {
54+
fakeFetchRestore();
55+
diag.disable();
56+
});
57+
58+
describe('keepalive budget', () => {
59+
it('should set keepalive to true for payloads within the budget', async () => {
60+
fakeFetchRespondWith('ok', { status: 200 });
61+
const transport = makeTransport();
62+
63+
await transport.send(smallPayload, 1000);
64+
65+
expect(fakeFetchGetKeepalive()).to.equal(true);
66+
});
67+
68+
it('should set keepalive to true for payloads exactly at the budget', async () => {
69+
fakeFetchRespondWith('ok', { status: 200 });
70+
const transport = makeTransport();
71+
72+
const exactPayload = new Uint8Array(49152);
73+
await transport.send(exactPayload, 1000);
74+
75+
expect(fakeFetchGetKeepalive()).to.equal(true);
76+
});
77+
78+
it('should set keepalive to false for payloads exceeding the budget', async () => {
79+
fakeFetchRespondWith('ok', { status: 200 });
80+
const transport = makeTransport();
81+
82+
await transport.send(largePayload, 1000);
83+
84+
expect(fakeFetchGetKeepalive()).to.equal(false);
85+
});
86+
});
87+
88+
describe('keepalive cumulative tracking', () => {
89+
it('should disable keepalive when cumulative bytes exceed the budget', async () => {
90+
const deferred = createDeferred();
91+
fakeFetchRestore();
92+
const stub = fakeFetchInstall();
93+
stub.onCall(0).returns(deferred.promise);
94+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
95+
96+
const transport = makeTransport();
97+
98+
// 30KiB payload — fits individually but two exceed the 48KiB budget
99+
const payload30k = new Uint8Array(30 * 1024);
100+
101+
// Start first request (stays inflight)
102+
const first = transport.send(payload30k, 5000);
103+
// Allow microtask to run so fetch is called
104+
await Promise.resolve();
105+
106+
expect(fakeFetchGetKeepalive(0)).to.equal(true);
107+
108+
// Second request should exceed cumulative budget
109+
await transport.send(payload30k, 5000);
110+
111+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
112+
113+
// Clean up
114+
deferred.resolve(new Response('ok', { status: 200 }));
115+
await first;
116+
});
117+
118+
it('should disable keepalive when concurrent count reaches 9', async () => {
119+
const deferreds: Deferred[] = [];
120+
fakeFetchRestore();
121+
const stub = fakeFetchInstall();
122+
123+
for (let i = 0; i < 9; i++) {
124+
const d = createDeferred();
125+
deferreds.push(d);
126+
stub.onCall(i).returns(d.promise);
127+
}
128+
stub.onCall(9).resolves(new Response('ok', { status: 200 }));
129+
130+
const transport = makeTransport();
131+
const tinyPayload = new Uint8Array(1);
132+
133+
// Start 9 concurrent requests
134+
const pending = [];
135+
for (let i = 0; i < 9; i++) {
136+
pending.push(transport.send(tinyPayload, 5000));
137+
await Promise.resolve();
138+
}
139+
140+
for (let i = 0; i < 9; i++) {
141+
expect(fakeFetchGetKeepalive(i)).to.equal(true);
142+
}
143+
144+
// 10th request should exceed concurrent limit
145+
await transport.send(tinyPayload, 5000);
146+
147+
expect(fakeFetchGetKeepalive(9)).to.equal(false);
148+
149+
// Clean up
150+
for (const d of deferreds) {
151+
d.resolve(new Response('ok', { status: 200 }));
152+
}
153+
await Promise.all(pending);
154+
});
155+
156+
it('should decrement counters after request completes', async () => {
157+
const deferred = createDeferred();
158+
fakeFetchRestore();
159+
const stub = fakeFetchInstall();
160+
stub.onCall(0).returns(deferred.promise);
161+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
162+
stub.onCall(2).resolves(new Response('ok', { status: 200 }));
163+
164+
const transport = makeTransport();
165+
const payload30k = new Uint8Array(30 * 1024);
166+
167+
// First request stays inflight
168+
const first = transport.send(payload30k, 5000);
169+
await Promise.resolve();
170+
171+
// Second request exceeds cumulative budget
172+
await transport.send(payload30k, 5000);
173+
174+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
175+
176+
// Resolve first request to free up budget
177+
deferred.resolve(new Response('ok', { status: 200 }));
178+
await first;
179+
180+
// Third request should fit now
181+
await transport.send(payload30k, 5000);
182+
183+
expect(fakeFetchGetKeepalive(2)).to.equal(true);
184+
});
185+
186+
it('should decrement counters after request fails', async () => {
187+
const deferred = createDeferred();
188+
fakeFetchRestore();
189+
const stub = fakeFetchInstall();
190+
stub.onCall(0).returns(deferred.promise);
191+
stub.onCall(1).resolves(new Response('ok', { status: 200 }));
192+
stub.onCall(2).resolves(new Response('ok', { status: 200 }));
193+
194+
const transport = makeTransport();
195+
const payload30k = new Uint8Array(30 * 1024);
196+
197+
// First request stays inflight
198+
const first = transport.send(payload30k, 5000);
199+
await Promise.resolve();
200+
201+
// Second request exceeds cumulative budget
202+
await transport.send(payload30k, 5000);
203+
204+
expect(fakeFetchGetKeepalive(1)).to.equal(false);
205+
206+
// Reject first request to free up budget
207+
deferred.reject(new TypeError('Failed to fetch'));
208+
await first;
209+
210+
// Third request should fit now
211+
await transport.send(payload30k, 5000);
212+
213+
expect(fakeFetchGetKeepalive(2)).to.equal(true);
214+
});
215+
});
216+
217+
describe('network errors', () => {
218+
it('should return failure when fetch throws', async () => {
219+
// Restore then re-install to reconfigure the stub to reject
220+
fakeFetchRestore();
221+
fakeFetchInstall().rejects(new TypeError('Failed to fetch'));
222+
223+
const transport = makeTransport();
224+
const result = await transport.send(smallPayload, 1000);
225+
226+
expect(result).to.deep.equal({
227+
status: 'failure',
228+
error: new TypeError('Failed to fetch'),
229+
});
230+
});
231+
});
232+
233+
describe('diagnostic logging', () => {
234+
it('should warn via diag when keepalive is downgraded for byte budget', async () => {
235+
fakeFetchRespondWith('ok', { status: 200 });
236+
const transport = makeTransport();
237+
238+
await transport.send(largePayload, 1000);
239+
240+
const warns = diagLogger.getWarnLogs();
241+
expect(warns).to.have.lengthOf(1);
242+
expect(warns[0]).to.include('inflight bytes');
243+
});
244+
245+
it('should warn via diag when keepalive is downgraded for concurrent count', async () => {
246+
const deferreds: Deferred[] = [];
247+
fakeFetchRestore();
248+
const stub = fakeFetchInstall();
249+
250+
for (let i = 0; i < 9; i++) {
251+
const d = createDeferred();
252+
deferreds.push(d);
253+
stub.onCall(i).returns(d.promise);
254+
}
255+
stub.onCall(9).resolves(new Response('ok', { status: 200 }));
256+
257+
const transport = makeTransport();
258+
const tinyPayload = new Uint8Array(1);
259+
260+
const pending = [];
261+
for (let i = 0; i < 9; i++) {
262+
pending.push(transport.send(tinyPayload, 5000));
263+
await Promise.resolve();
264+
}
265+
266+
// 10th request should trigger the warning
267+
await transport.send(tinyPayload, 5000);
268+
269+
const warns = diagLogger.getWarnLogs();
270+
expect(warns).to.have.lengthOf(1);
271+
expect(warns[0]).to.include('concurrent count');
272+
273+
for (const d of deferreds) {
274+
d.resolve(new Response('ok', { status: 200 }));
275+
}
276+
await Promise.all(pending);
277+
});
278+
279+
it('should warn via diag when fetch throws', async () => {
280+
fakeFetchRestore();
281+
fakeFetchInstall().rejects(new TypeError('Failed to fetch'));
282+
283+
const transport = makeTransport();
284+
await transport.send(smallPayload, 1000);
285+
286+
const warns = diagLogger.getWarnLogs();
287+
expect(warns).to.have.lengthOf(1);
288+
expect(warns[0]).to.include('Fetch transport failed');
289+
});
290+
291+
it('should log debug when HTTP response is not ok', async () => {
292+
fakeFetchRespondWith('error', { status: 500 });
293+
const transport = makeTransport();
294+
295+
await transport.send(smallPayload, 1000);
296+
297+
const match = diagLogger
298+
.getDebugLogs()
299+
.find((msg) => msg.includes('HTTP 500'));
300+
expect(match).to.be.a('string');
301+
});
302+
});
303+
});

0 commit comments

Comments
 (0)