@@ -7,16 +7,29 @@ import { MACHINE_METADATA } from "./constants.js";
7
7
import { EventCache } from "./eventCache.js" ;
8
8
import { detectContainerEnv } from "../helpers/container.js" ;
9
9
import type { DeviceId } from "../helpers/deviceId.js" ;
10
+ import { EventEmitter } from "stream" ;
10
11
11
12
type EventResult = {
12
13
success : boolean ;
13
14
error ?: Error ;
14
15
} ;
15
16
17
+ async function timeout ( promise : Promise < unknown > , ms : number ) : Promise < void > {
18
+ await Promise . race ( [ new Promise ( ( resolve ) => setTimeout ( resolve , ms ) ) , promise ] ) ;
19
+ }
20
+
21
+ export interface TelemetryEvents {
22
+ "events-emitted" : [ ] ;
23
+ "events-send-failed" : [ ] ;
24
+ "events-skipped" : [ ] ;
25
+ }
26
+
16
27
export class Telemetry {
17
28
private isBufferingEvents : boolean = true ;
18
29
/** Resolves when the setup is complete or a timeout occurs */
19
30
public setupPromise : Promise < [ string , boolean ] > | undefined ;
31
+ public readonly events : EventEmitter < TelemetryEvents > = new EventEmitter ( ) ;
32
+
20
33
private eventCache : EventCache ;
21
34
private deviceId : DeviceId ;
22
35
@@ -57,6 +70,12 @@ export class Telemetry {
57
70
58
71
private async setup ( ) : Promise < void > {
59
72
if ( ! this . isTelemetryEnabled ( ) ) {
73
+ this . session . logger . info ( {
74
+ id : LogId . telemetryEmitFailure ,
75
+ context : "telemetry" ,
76
+ message : "Telemetry is disabled." ,
77
+ noRedaction : true ,
78
+ } ) ;
60
79
return ;
61
80
}
62
81
@@ -71,34 +90,22 @@ export class Telemetry {
71
90
72
91
public async close ( ) : Promise < void > {
73
92
this . isBufferingEvents = false ;
74
- await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
93
+ await timeout ( this . emit ( [ ] ) , 5_000 ) ;
75
94
}
76
95
77
96
/**
78
97
* Emits events through the telemetry pipeline
79
98
* @param events - The events to emit
80
99
*/
81
- public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
82
- try {
83
- if ( ! this . isTelemetryEnabled ( ) ) {
84
- this . session . logger . info ( {
85
- id : LogId . telemetryEmitFailure ,
86
- context : "telemetry" ,
87
- message : "Telemetry is disabled." ,
88
- noRedaction : true ,
89
- } ) ;
90
- return ;
91
- }
92
-
93
- await this . emit ( events ) ;
94
- } catch {
95
- this . session . logger . debug ( {
96
- id : LogId . telemetryEmitFailure ,
97
- context : "telemetry" ,
98
- message : "Error emitting telemetry events." ,
99
- noRedaction : true ,
100
- } ) ;
100
+ public emitEvents ( events : BaseEvent [ ] ) : void {
101
+ if ( ! this . isTelemetryEnabled ( ) ) {
102
+ this . events . emit ( "events-skipped" ) ;
103
+ return ;
101
104
}
105
+
106
+ // Don't wait for events to be sent - we should not block regular server
107
+ // operations on telemetry
108
+ void this . emit ( events ) ;
102
109
}
103
110
104
111
/**
@@ -144,32 +151,44 @@ export class Telemetry {
144
151
return ;
145
152
}
146
153
147
- const cachedEvents = this . eventCache . getEvents ( ) ;
148
- const allEvents = [ ...cachedEvents , ...events ] ;
154
+ try {
155
+ const cachedEvents = this . eventCache . getEvents ( ) ;
156
+ const allEvents = [ ...cachedEvents , ...events ] ;
149
157
150
- this . session . logger . debug ( {
151
- id : LogId . telemetryEmitStart ,
152
- context : "telemetry" ,
153
- message : `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)` ,
154
- } ) ;
158
+ this . session . logger . debug ( {
159
+ id : LogId . telemetryEmitStart ,
160
+ context : "telemetry" ,
161
+ message : `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)` ,
162
+ } ) ;
163
+
164
+ const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
165
+ if ( result . success ) {
166
+ this . eventCache . clearEvents ( ) ;
167
+ this . session . logger . debug ( {
168
+ id : LogId . telemetryEmitSuccess ,
169
+ context : "telemetry" ,
170
+ message : `Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } ` ,
171
+ } ) ;
172
+ this . events . emit ( "events-emitted" ) ;
173
+ return ;
174
+ }
155
175
156
- const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157
- if ( result . success ) {
158
- this . eventCache . clearEvents ( ) ;
159
176
this . session . logger . debug ( {
160
- id : LogId . telemetryEmitSuccess ,
177
+ id : LogId . telemetryEmitFailure ,
161
178
context : "telemetry" ,
162
- message : `Sent ${ allEvents . length } events successfully : ${ JSON . stringify ( allEvents , null , 2 ) } ` ,
179
+ message : `Error sending event to client : ${ result . error instanceof Error ? result . error . message : String ( result . error ) } ` ,
163
180
} ) ;
164
- return ;
181
+ this . eventCache . appendEvents ( events ) ;
182
+ this . events . emit ( "events-send-failed" ) ;
183
+ } catch ( error ) {
184
+ this . session . logger . debug ( {
185
+ id : LogId . telemetryEmitFailure ,
186
+ context : "telemetry" ,
187
+ message : `Error emitting telemetry events: ${ error instanceof Error ? error . message : String ( error ) } ` ,
188
+ noRedaction : true ,
189
+ } ) ;
190
+ this . events . emit ( "events-send-failed" ) ;
165
191
}
166
-
167
- this . session . logger . debug ( {
168
- id : LogId . telemetryEmitFailure ,
169
- context : "telemetry" ,
170
- message : `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } ` ,
171
- } ) ;
172
- this . eventCache . appendEvents ( events ) ;
173
192
}
174
193
175
194
/**
0 commit comments