Skip to content

Commit fb34061

Browse files
author
John Doe
committed
refactor: add better flush handling
1 parent 6950284 commit fb34061

File tree

3 files changed

+179
-28
lines changed

3 files changed

+179
-28
lines changed

packages/utils/src/lib/performance-observer.int.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,49 @@ describe('PerformanceObserverSink', () => {
179179

180180
expect(sink.getWrittenItems()).toHaveLength(2);
181181
});
182+
183+
it('cursor logic prevents duplicate processing of performance entries', () => {
184+
const observer = new PerformanceObserverSink(options);
185+
observer.subscribe();
186+
187+
performance.mark('first-mark');
188+
performance.mark('second-mark');
189+
expect(encode).not.toHaveBeenCalled();
190+
observer.flush();
191+
expect(sink.getWrittenItems()).toStrictEqual([
192+
'first-mark:mark',
193+
'second-mark:mark',
194+
]);
195+
196+
expect(encode).toHaveBeenCalledTimes(2);
197+
expect(encode).toHaveBeenNthCalledWith(
198+
1,
199+
expect.objectContaining({ name: 'first-mark' }),
200+
);
201+
expect(encode).toHaveBeenNthCalledWith(
202+
2,
203+
expect.objectContaining({ name: 'second-mark' }),
204+
);
205+
206+
performance.mark('third-mark');
207+
performance.measure('first-measure');
208+
209+
observer.flush();
210+
expect(sink.getWrittenItems()).toStrictEqual([
211+
'first-mark:mark',
212+
'second-mark:mark',
213+
'third-mark:mark',
214+
'first-measure:measure',
215+
]);
216+
217+
expect(encode).toHaveBeenCalledTimes(4);
218+
expect(encode).toHaveBeenNthCalledWith(
219+
3,
220+
expect.objectContaining({ name: 'third-mark' }),
221+
);
222+
expect(encode).toHaveBeenNthCalledWith(
223+
4,
224+
expect.objectContaining({ name: 'first-measure' }),
225+
);
226+
});
182227
});

packages/utils/src/lib/performance-observer.ts

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import {
2-
type EntryType,
32
type PerformanceEntry,
43
PerformanceObserver,
54
type PerformanceObserverEntryList,
65
performance,
76
} from 'node:perf_hooks';
87
import type { Buffered, Encoder, Observer, Sink } from './sink-source.types.js';
98

9+
const OBSERVED_TYPES = ['mark', 'measure'] as const;
10+
type ObservedEntryType = 'mark' | 'measure';
1011
export const DEFAULT_FLUSH_THRESHOLD = 20;
1112

1213
export type PerformanceObserverOptions<T> = {
@@ -24,16 +25,21 @@ export class PerformanceObserverSink<T>
2425
#flushThreshold: number;
2526
#sink: Sink<T, unknown>;
2627
#observer: PerformanceObserver | undefined;
27-
#observedTypes: EntryType[] = ['mark', 'measure'];
28-
#getEntries = (list: PerformanceObserverEntryList) =>
29-
this.#observedTypes.flatMap(t => list.getEntriesByType(t));
30-
#observedCount: number = 0;
28+
29+
#pendingCount = 0;
30+
31+
// "cursor" per type: how many we already wrote from the global buffer
32+
#written: Map<ObservedEntryType, number>;
3133

3234
constructor(options: PerformanceObserverOptions<T>) {
33-
this.#encode = options.encode;
34-
this.#sink = options.sink;
35-
this.#buffered = options.buffered ?? false;
36-
this.#flushThreshold = options.flushThreshold ?? DEFAULT_FLUSH_THRESHOLD;
35+
const { encode, sink, buffered, flushThreshold } = options;
36+
this.#encode = encode;
37+
this.#written = new Map<ObservedEntryType, number>(
38+
OBSERVED_TYPES.map(t => [t, 0]),
39+
);
40+
this.#sink = sink;
41+
this.#buffered = buffered ?? false;
42+
this.#flushThreshold = flushThreshold ?? DEFAULT_FLUSH_THRESHOLD;
3743
}
3844

3945
encode(entry: PerformanceEntry): T[] {
@@ -45,37 +51,51 @@ export class PerformanceObserverSink<T>
4551
return;
4652
}
4753

48-
this.#observer = new PerformanceObserver(list => {
49-
const entries = this.#getEntries(list);
50-
this.#observedCount += entries.length;
51-
if (this.#observedCount >= this.#flushThreshold) {
52-
this.flush(entries);
53-
}
54-
});
54+
// The only used to trigger the flush it is not processing the entries just counting them
55+
this.#observer = new PerformanceObserver(
56+
(list: PerformanceObserverEntryList) => {
57+
const batchCount = OBSERVED_TYPES.reduce(
58+
(n, t) => n + list.getEntriesByType(t).length,
59+
0,
60+
);
61+
62+
this.#pendingCount += batchCount;
63+
if (this.#pendingCount >= this.#flushThreshold) {
64+
this.flush();
65+
}
66+
},
67+
);
5568

5669
this.#observer.observe({
57-
entryTypes: this.#observedTypes,
70+
entryTypes: OBSERVED_TYPES,
5871
buffered: this.#buffered,
5972
});
6073
}
6174

62-
flush(entriesToProcess?: PerformanceEntry[]): void {
75+
flush(): void {
6376
if (!this.#observer) {
6477
return;
6578
}
6679

67-
const entries = entriesToProcess || this.#getEntries(performance);
68-
entries.forEach(entry => {
69-
const encoded = this.encode(entry);
70-
encoded.forEach(item => {
71-
this.#sink.write(item);
72-
});
73-
});
80+
OBSERVED_TYPES.forEach(t => {
81+
const written = this.#written.get(t) ?? 0;
82+
const fresh = performance.getEntriesByType(t).slice(written);
83+
84+
try {
85+
fresh
86+
.flatMap(entry => this.encode(entry))
87+
.forEach(item => this.#sink.write(item));
7488

75-
// In real PerformanceObserver, entries remain in the global buffer
76-
// They are only cleared when explicitly requested via performance.clearMarks/clearMeasures
89+
this.#written.set(t, written + fresh.length);
90+
} catch (error) {
91+
throw new Error(
92+
'PerformanceObserverSink failed to write items to sink.',
93+
{ cause: error },
94+
);
95+
}
96+
});
7797

78-
this.#observedCount = 0;
98+
this.#pendingCount = 0;
7999
}
80100

81101
unsubscribe(): void {

packages/utils/src/lib/performance-observer.unit.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,40 @@ describe('PerformanceObserverSink', () => {
4141
expect(MockPerformanceObserver.instances).toHaveLength(0);
4242
});
4343

44+
it('creates instance with default flushThreshold when not provided', () => {
45+
expect(
46+
() =>
47+
new PerformanceObserverSink({
48+
sink,
49+
encode,
50+
}),
51+
).not.toThrow();
52+
expect(MockPerformanceObserver.instances).toHaveLength(0);
53+
// Instance creation covers the default flushThreshold assignment
54+
});
55+
56+
it('automatically flushes when pendingCount reaches flushThreshold', () => {
57+
const observer = new PerformanceObserverSink({
58+
sink,
59+
encode,
60+
flushThreshold: 2, // Set threshold to 2
61+
});
62+
observer.subscribe();
63+
64+
const mockObserver = MockPerformanceObserver.lastInstance();
65+
66+
// Emit 1 entry - should not trigger flush yet (pendingCount = 1 < 2)
67+
mockObserver?.emitMark('first-mark');
68+
expect(sink.getWrittenItems()).toStrictEqual([]);
69+
70+
// Emit 1 more entry - should trigger flush (pendingCount = 2 >= 2)
71+
mockObserver?.emitMark('second-mark');
72+
expect(sink.getWrittenItems()).toStrictEqual([
73+
'first-mark:mark',
74+
'second-mark:mark',
75+
]);
76+
});
77+
4478
it('creates instance with all options without starting to observe', () => {
4579
expect(
4680
() =>
@@ -219,4 +253,56 @@ describe('PerformanceObserverSink', () => {
219253
expect(perfObserver?.disconnect).toHaveBeenCalledTimes(1);
220254
expect(MockPerformanceObserver.instances).toHaveLength(0);
221255
});
256+
257+
it('flush wraps sink write errors with descriptive error message', () => {
258+
const failingSink = {
259+
write: vi.fn(() => {
260+
throw new Error('Sink write failed');
261+
}),
262+
};
263+
264+
const observer = new PerformanceObserverSink({
265+
sink: failingSink as any,
266+
encode,
267+
flushThreshold: 1,
268+
});
269+
270+
observer.subscribe();
271+
272+
performance.mark('test-mark');
273+
274+
expect(() => observer.flush()).toThrow(
275+
expect.objectContaining({
276+
message: 'PerformanceObserverSink failed to write items to sink.',
277+
cause: expect.objectContaining({
278+
message: 'Sink write failed',
279+
}),
280+
}),
281+
);
282+
});
283+
284+
it('flush wraps encode errors with descriptive error message', () => {
285+
const failingEncode = vi.fn(() => {
286+
throw new Error('Encode failed');
287+
});
288+
289+
const observer = new PerformanceObserverSink({
290+
sink,
291+
encode: failingEncode,
292+
flushThreshold: 1,
293+
});
294+
295+
observer.subscribe();
296+
297+
performance.mark('test-mark');
298+
299+
expect(() => observer.flush()).toThrow(
300+
expect.objectContaining({
301+
message: 'PerformanceObserverSink failed to write items to sink.',
302+
cause: expect.objectContaining({
303+
message: 'Encode failed',
304+
}),
305+
}),
306+
);
307+
});
222308
});

0 commit comments

Comments
 (0)