Skip to content

Commit 16629a4

Browse files
author
John Doe
committed
refactor: add perf observer
1 parent db25345 commit 16629a4

File tree

3 files changed

+691
-0
lines changed

3 files changed

+691
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import {
2+
type PerformanceEntry,
3+
PerformanceObserver,
4+
performance,
5+
} from 'node:perf_hooks';
6+
import type { Buffered, Encoder, Sink } from './sink-source.types.js';
7+
8+
export interface PerformanceObserverOptions<T> {
9+
sink: Sink<T, unknown>;
10+
encode: (entry: PerformanceEntry) => T[];
11+
onEntry?: (entry: T) => void;
12+
captureBuffered?: boolean;
13+
flushEveryN?: number;
14+
}
15+
16+
export class PerformanceObserverHandle<T>
17+
implements Buffered, Encoder<PerformanceEntry, T[]>
18+
{
19+
#encode: (entry: PerformanceEntry) => T[];
20+
#captureBuffered: boolean;
21+
#flushEveryN: number;
22+
#flushThreshold: number;
23+
#onEntry?: (entry: T) => void;
24+
#processedEntries = new Set<string>();
25+
#sink: Sink<T, unknown>;
26+
#observer: PerformanceObserver | undefined;
27+
#closed = false;
28+
29+
constructor(options: PerformanceObserverOptions<T>) {
30+
this.#encode = options.encode;
31+
this.#sink = options.sink;
32+
this.#captureBuffered = options.captureBuffered ?? false;
33+
this.#flushThreshold = options.flushEveryN ?? 20;
34+
this.#flushEveryN = 0;
35+
this.#onEntry = options.onEntry;
36+
}
37+
38+
encode(entry: PerformanceEntry): T[] {
39+
return this.#encode(entry);
40+
}
41+
42+
connect(): void {
43+
if (this.#observer || this.#closed) return;
44+
this.#observer = new PerformanceObserver(() => {
45+
this.#flushEveryN++;
46+
if (this.#flushEveryN >= this.#flushThreshold) {
47+
this.flush();
48+
this.#flushEveryN = 0;
49+
}
50+
});
51+
52+
this.#observer.observe({
53+
entryTypes: ['mark', 'measure'],
54+
buffered: this.#captureBuffered,
55+
});
56+
}
57+
58+
flush(clear = false): void {
59+
if (this.#closed || !this.#sink) return;
60+
const entries = [
61+
...performance.getEntriesByType('mark'),
62+
...performance.getEntriesByType('measure'),
63+
];
64+
65+
// Process all entries
66+
for (const e of entries) {
67+
if (e.entryType !== 'mark' && e.entryType !== 'measure') continue;
68+
69+
// Skip if already processed (unless clearing)
70+
if (!clear && this.#processedEntries.has(e.name)) continue;
71+
72+
const encoded = this.encode(e);
73+
for (const item of encoded) {
74+
this.#sink.write(item);
75+
this.#onEntry?.(item);
76+
}
77+
78+
if (clear) {
79+
this.#processedEntries.delete(e.name);
80+
if (e.entryType === 'mark') performance.clearMarks(e.name);
81+
if (e.entryType === 'measure') performance.clearMeasures(e.name);
82+
} else {
83+
this.#processedEntries.add(e.name);
84+
}
85+
}
86+
}
87+
88+
disconnect(): void {
89+
if (!this.#observer) return;
90+
this.#observer?.disconnect();
91+
this.#observer = undefined;
92+
}
93+
94+
close(): void {
95+
if (this.#closed) return;
96+
this.flush();
97+
this.#closed = true;
98+
this.disconnect();
99+
}
100+
101+
isConnected(): boolean {
102+
return this.#observer !== undefined && !this.#closed;
103+
}
104+
}

0 commit comments

Comments
 (0)