Skip to content

Commit 900888c

Browse files
authored
feat(utils): add performance observer (#1206)
1 parent a301f1e commit 900888c

File tree

13 files changed

+772
-49
lines changed

13 files changed

+772
-49
lines changed

packages/utils/mocks/sink.mock.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Sink } from '../src/lib/sink-source.types';
2+
3+
export class MockSink implements Sink<string, string> {
4+
private writtenItems: string[] = [];
5+
private closed = false;
6+
7+
open(): void {
8+
this.closed = false;
9+
}
10+
11+
write(input: string): void {
12+
this.writtenItems.push(input);
13+
}
14+
15+
close(): void {
16+
this.closed = true;
17+
}
18+
19+
isClosed(): boolean {
20+
return this.closed;
21+
}
22+
23+
encode(input: string): string {
24+
return `${input}-${this.constructor.name}-encoded`;
25+
}
26+
27+
getWrittenItems(): string[] {
28+
return [...this.writtenItems];
29+
}
30+
}

packages/utils/src/lib/clock-epoch.int.test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,6 @@ describe('epochClock', () => {
2020
expect(c.fromDateNowMs).toBeFunction();
2121
});
2222

23-
it('should support performance clock by default for epochNowUs', () => {
24-
const c = epochClock();
25-
expect(c.timeOriginMs).toBe(performance.timeOrigin);
26-
const nowUs = c.epochNowUs();
27-
expect(nowUs).toBe(Math.round(nowUs));
28-
const expectedUs = Date.now() * 1000;
29-
30-
expect(nowUs).toBeWithin(expectedUs - 2000, expectedUs + 1000);
31-
});
32-
3323
it('should convert epoch milliseconds to microseconds correctly', () => {
3424
const c = epochClock();
3525
const epochMs = Date.now();

packages/utils/src/lib/clock-epoch.unit.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ describe('epochClock', () => {
3333
it('should support performance clock by default for epochNowUs', () => {
3434
const c = epochClock();
3535
expect(c.timeOriginMs).toBe(500_000);
36-
expect(c.epochNowUs()).toBe(1_000_000_000); // timeOrigin + (Date.now() - timeOrigin) = Date.now()
36+
expect(c.epochNowUs()).toBe(500_000_000); // timeOrigin + performance.now() = timeOrigin + 0
3737
});
3838

3939
it.each([
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import { type PerformanceEntry, performance } from 'node:perf_hooks';
2+
import {
3+
type MockedFunction,
4+
beforeEach,
5+
describe,
6+
expect,
7+
it,
8+
vi,
9+
} from 'vitest';
10+
import { MockSink } from '../../mocks/sink.mock';
11+
import {
12+
type PerformanceObserverOptions,
13+
PerformanceObserverSink,
14+
} from './performance-observer.js';
15+
16+
describe('PerformanceObserverSink', () => {
17+
let encode: MockedFunction<(entry: PerformanceEntry) => string[]>;
18+
let sink: MockSink;
19+
let options: PerformanceObserverOptions<string>;
20+
21+
const awaitObserverCallback = () =>
22+
new Promise(resolve => setTimeout(resolve, 10));
23+
24+
beforeEach(() => {
25+
sink = new MockSink();
26+
encode = vi.fn((entry: PerformanceEntry) => [
27+
`${entry.name}:${entry.entryType}`,
28+
]);
29+
30+
options = {
31+
sink,
32+
encode,
33+
};
34+
35+
performance.clearMarks();
36+
performance.clearMeasures();
37+
});
38+
39+
it('creates instance with required options', () => {
40+
expect(() => new PerformanceObserverSink(options)).not.toThrow();
41+
});
42+
43+
it('internal PerformanceObserver should process observed entries', () => {
44+
const observer = new PerformanceObserverSink(options);
45+
observer.subscribe();
46+
47+
performance.mark('test-mark');
48+
performance.measure('test-measure');
49+
observer.flush();
50+
expect(encode).toHaveBeenCalledTimes(2);
51+
expect(encode).toHaveBeenNthCalledWith(
52+
1,
53+
expect.objectContaining({
54+
name: 'test-mark',
55+
entryType: 'mark',
56+
}),
57+
);
58+
expect(encode).toHaveBeenNthCalledWith(
59+
2,
60+
expect.objectContaining({
61+
name: 'test-measure',
62+
entryType: 'measure',
63+
}),
64+
);
65+
});
66+
67+
it('internal PerformanceObserver calls flush if flushThreshold exceeded', async () => {
68+
const observer = new PerformanceObserverSink({
69+
...options,
70+
flushThreshold: 3,
71+
});
72+
observer.subscribe();
73+
74+
performance.mark('test-mark1');
75+
performance.mark('test-mark2');
76+
performance.mark('test-mark3');
77+
78+
await awaitObserverCallback();
79+
80+
expect(encode).toHaveBeenCalledTimes(3);
81+
});
82+
83+
it('flush flushes observed entries when subscribed', () => {
84+
const observer = new PerformanceObserverSink(options);
85+
observer.subscribe();
86+
87+
performance.mark('test-mark1');
88+
performance.mark('test-mark2');
89+
expect(sink.getWrittenItems()).toStrictEqual([]);
90+
91+
observer.flush();
92+
expect(sink.getWrittenItems()).toStrictEqual([
93+
'test-mark1:mark',
94+
'test-mark2:mark',
95+
]);
96+
});
97+
98+
it('flush calls encode for each entry', () => {
99+
const observer = new PerformanceObserverSink(options);
100+
observer.subscribe();
101+
102+
performance.mark('test-mark1');
103+
performance.mark('test-mark2');
104+
105+
observer.flush();
106+
107+
expect(encode).toHaveBeenCalledWith(
108+
expect.objectContaining({
109+
name: 'test-mark1',
110+
entryType: 'mark',
111+
}),
112+
);
113+
expect(encode).toHaveBeenCalledWith(
114+
expect.objectContaining({
115+
name: 'test-mark2',
116+
entryType: 'mark',
117+
}),
118+
);
119+
});
120+
121+
it('unsubscribe stops observing performance entries', async () => {
122+
const observer = new PerformanceObserverSink({
123+
...options,
124+
flushThreshold: 1,
125+
});
126+
127+
observer.subscribe();
128+
performance.mark('subscribed-mark1');
129+
performance.mark('subscribed-mark2');
130+
await awaitObserverCallback();
131+
expect(encode).toHaveBeenCalledTimes(2);
132+
133+
observer.unsubscribe();
134+
performance.mark('unsubscribed-mark1');
135+
performance.mark('unsubscribed-mark2');
136+
await awaitObserverCallback();
137+
expect(encode).toHaveBeenCalledTimes(2);
138+
});
139+
140+
it('should observe performance entries and write them to the sink on flush', () => {
141+
const observer = new PerformanceObserverSink(options);
142+
143+
observer.subscribe();
144+
performance.mark('test-mark');
145+
observer.flush();
146+
expect(sink.getWrittenItems()).toHaveLength(1);
147+
});
148+
149+
it('should observe buffered performance entries when buffered is enabled', async () => {
150+
const observer = new PerformanceObserverSink({
151+
...options,
152+
buffered: true,
153+
});
154+
155+
performance.mark('test-mark-1');
156+
performance.mark('test-mark-2');
157+
await new Promise(resolve => setTimeout(resolve, 10));
158+
observer.subscribe();
159+
await new Promise(resolve => setTimeout(resolve, 10));
160+
expect(performance.getEntries()).toHaveLength(2);
161+
observer.flush();
162+
expect(sink.getWrittenItems()).toHaveLength(2);
163+
});
164+
165+
it('handles multiple encoded items per performance entry', () => {
166+
const multiEncodeFn = vi.fn(e => [
167+
`${e.entryType}-item1`,
168+
`${e.entryType}item2`,
169+
]);
170+
const observer = new PerformanceObserverSink({
171+
...options,
172+
encode: multiEncodeFn,
173+
});
174+
175+
observer.subscribe();
176+
177+
performance.mark('test-mark');
178+
observer.flush();
179+
180+
expect(sink.getWrittenItems()).toHaveLength(2);
181+
});
182+
});
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import {
2+
type EntryType,
3+
type PerformanceEntry,
4+
PerformanceObserver,
5+
type PerformanceObserverEntryList,
6+
performance,
7+
} from 'node:perf_hooks';
8+
import type { Buffered, Encoder, Observer, Sink } from './sink-source.types.js';
9+
10+
export const DEFAULT_FLUSH_THRESHOLD = 20;
11+
12+
export type PerformanceObserverOptions<T> = {
13+
sink: Sink<T, unknown>;
14+
encode: (entry: PerformanceEntry) => T[];
15+
buffered?: boolean;
16+
flushThreshold?: number;
17+
};
18+
19+
export class PerformanceObserverSink<T>
20+
implements Observer, Buffered, Encoder<PerformanceEntry, T[]>
21+
{
22+
#encode: (entry: PerformanceEntry) => T[];
23+
#buffered: boolean;
24+
#flushThreshold: number;
25+
#sink: Sink<T, unknown>;
26+
#observer: PerformanceObserver | undefined;
27+
#observedTypes: EntryType[] = ['mark', 'measure'];
28+
#getEntries = (list: PerformanceObserverEntryList) =>
29+
this.#observedTypes.flatMap(t => list.getEntriesByType(t));
30+
#observedCount: number = 0;
31+
32+
constructor(options: PerformanceObserverOptions<T>) {
33+
this.#encode = options.encode;
34+
this.#sink = options.sink;
35+
this.#buffered = options.buffered ?? false;
36+
this.#flushThreshold = options.flushThreshold ?? DEFAULT_FLUSH_THRESHOLD;
37+
}
38+
39+
encode(entry: PerformanceEntry): T[] {
40+
return this.#encode(entry);
41+
}
42+
43+
subscribe(): void {
44+
if (this.#observer) {
45+
return;
46+
}
47+
48+
this.#observer = new PerformanceObserver(list => {
49+
const entries = this.#getEntries(list);
50+
this.#observedCount += entries.length;
51+
if (this.#observedCount >= this.#flushThreshold) {
52+
this.flush(entries);
53+
}
54+
});
55+
56+
this.#observer.observe({
57+
entryTypes: this.#observedTypes,
58+
buffered: this.#buffered,
59+
});
60+
}
61+
62+
flush(entriesToProcess?: PerformanceEntry[]): void {
63+
if (!this.#observer) {
64+
return;
65+
}
66+
67+
const entries = entriesToProcess || this.#getEntries(performance);
68+
entries.forEach(entry => {
69+
const encoded = this.encode(entry);
70+
encoded.forEach(item => {
71+
this.#sink.write(item);
72+
});
73+
});
74+
75+
// In real PerformanceObserver, entries remain in the global buffer
76+
// They are only cleared when explicitly requested via performance.clearMarks/clearMeasures
77+
78+
this.#observedCount = 0;
79+
}
80+
81+
unsubscribe(): void {
82+
if (!this.#observer) {
83+
return;
84+
}
85+
this.#observer?.disconnect();
86+
this.#observer = undefined;
87+
}
88+
89+
isSubscribed(): boolean {
90+
return this.#observer !== undefined;
91+
}
92+
}

0 commit comments

Comments
 (0)