@@ -30,10 +30,16 @@ export class TelemetryQueueManager {
3030 private cleanupInterval : NodeJS . Timeout | null = null
3131 private readonly QUEUE_VERSION = 1
3232 private readonly MAX_EVENT_AGE = 24 * 60 * 60 * 1000 // 24 hours instead of 7 days
33+ private debug = false
34+ private persistPromise : Promise < void > | null = null
35+ private pendingPersist = false
3336
34- private constructor ( storagePath : string ) {
37+ private constructor ( storagePath : string , debug = false ) {
38+ this . debug = debug || process . env . DEBUG_TELEMETRY === "true"
3539 this . persistPath = path . join ( storagePath , "telemetry-queue.json" )
36- console . log ( `[TelemetryQueue] Initializing with path: ${ this . persistPath } ` )
40+ if ( this . debug ) {
41+ console . log ( `[TelemetryQueue] Initializing with path: ${ this . persistPath } ` )
42+ }
3743 this . loadQueue ( )
3844 this . startPeriodicFlush ( )
3945 this . startPeriodicCleanup ( )
@@ -42,26 +48,40 @@ export class TelemetryQueueManager {
4248 /**
4349 * Get or create the singleton instance
4450 */
45- public static getInstance ( storagePath : string ) : TelemetryQueueManager {
51+ public static getInstance ( storagePath : string , debug = false ) : TelemetryQueueManager {
4652 if ( ! this . instance ) {
47- this . instance = new TelemetryQueueManager ( storagePath )
53+ this . instance = new TelemetryQueueManager ( storagePath , debug )
4854 }
4955 return this . instance
5056 }
5157
58+ /**
59+ * Reset the singleton instance (for testing)
60+ */
61+ public static resetInstance ( ) : void {
62+ if ( this . instance ) {
63+ this . instance . shutdown ( )
64+ this . instance = null
65+ }
66+ }
67+
5268 /**
5369 * Add an event to the queue
5470 */
5571 public enqueue ( event : TelemetryEvent , clientId : string ) : void {
56- console . log ( `[TelemetryQueue] Enqueueing event: ${ event . event } for client: ${ clientId } ` )
72+ if ( this . debug ) {
73+ console . log ( `[TelemetryQueue] Enqueueing event: ${ event . event } for client: ${ clientId } ` )
74+ }
5775
5876 // Don't queue if we've reached the maximum size
5977 if ( this . queue . length >= this . maxQueueSize ) {
6078 // Remove oldest events to make room (FIFO)
6179 const removed = this . queue . shift ( )
62- console . log (
63- `[TelemetryQueue] Queue full (${ this . maxQueueSize } ), removed oldest event: ${ removed ?. event . event } ` ,
64- )
80+ if ( this . debug ) {
81+ console . log (
82+ `[TelemetryQueue] Queue full (${ this . maxQueueSize } ), removed oldest event: ${ removed ?. event . event } ` ,
83+ )
84+ }
6585 }
6686
6787 const queuedEvent : QueuedEvent = {
@@ -72,8 +92,10 @@ export class TelemetryQueueManager {
7292 }
7393
7494 this . queue . push ( queuedEvent )
75- console . log ( `[TelemetryQueue] Queue size after enqueue: ${ this . queue . length } ` )
76- this . persistQueue ( )
95+ if ( this . debug ) {
96+ console . log ( `[TelemetryQueue] Queue size after enqueue: ${ this . queue . length } ` )
97+ }
98+ this . schedulePersist ( )
7799 }
78100
79101 /**
@@ -89,23 +111,31 @@ export class TelemetryQueueManager {
89111 try {
90112 const clientEvents = this . queue . filter ( ( e ) => e . clientId === clientId )
91113
92- console . log ( `[TelemetryQueue] Processing ${ clientEvents . length } events for client: ${ clientId } ` )
114+ if ( this . debug ) {
115+ console . log ( `[TelemetryQueue] Processing ${ clientEvents . length } events for client: ${ clientId } ` )
116+ }
93117
94118 for ( const queuedEvent of clientEvents ) {
95119 try {
96- console . log (
97- `[TelemetryQueue] Attempting to send event: ${ queuedEvent . event . event } , retry count: ${ queuedEvent . retryCount } ` ,
98- )
120+ if ( this . debug ) {
121+ console . log (
122+ `[TelemetryQueue] Attempting to send event: ${ queuedEvent . event . event } , retry count: ${ queuedEvent . retryCount } ` ,
123+ )
124+ }
99125 await sendFunction ( queuedEvent . event )
100126 // Remove successfully sent event
101- console . log ( `[TelemetryQueue] Successfully sent event: ${ queuedEvent . event . event } ` )
127+ if ( this . debug ) {
128+ console . log ( `[TelemetryQueue] Successfully sent event: ${ queuedEvent . event . event } ` )
129+ }
102130 this . removeEvent ( queuedEvent )
103131 } catch ( error ) {
104132 // Increment retry count
105133 queuedEvent . retryCount ++
106- console . log (
107- `[TelemetryQueue] Failed to send event: ${ queuedEvent . event . event } , retry count now: ${ queuedEvent . retryCount } , error: ${ error } ` ,
108- )
134+ if ( this . debug ) {
135+ console . log (
136+ `[TelemetryQueue] Failed to send event: ${ queuedEvent . event . event } , retry count now: ${ queuedEvent . retryCount } , error: ${ error } ` ,
137+ )
138+ }
109139
110140 // Don't remove based on retry count - let it keep trying until 24 hours
111141 }
@@ -168,33 +198,75 @@ export class TelemetryQueueManager {
168198 const originalCount = state . events . length
169199 this . queue = state . events . filter ( ( e ) => e . timestamp > cutoffTime )
170200
171- console . log (
172- `[TelemetryQueue] Loaded ${ this . queue . length } events from disk (filtered ${ originalCount - this . queue . length } old events)` ,
173- )
201+ if ( this . debug ) {
202+ console . log (
203+ `[TelemetryQueue] Loaded ${ this . queue . length } events from disk (filtered ${ originalCount - this . queue . length } old events)` ,
204+ )
205+ }
174206
175207 // If we filtered out any events, persist the cleaned queue
176208 if ( this . queue . length < originalCount ) {
177- this . persistQueue ( )
209+ this . schedulePersist ( )
178210 }
179211 }
180212 } catch ( error ) {
181213 // File doesn't exist or is corrupted, start with empty queue
182- console . log ( `[TelemetryQueue] No existing queue file found or error loading: ${ error } ` )
214+ if ( this . debug ) {
215+ console . log ( `[TelemetryQueue] No existing queue file found or error loading: ${ error } ` )
216+ }
183217 this . queue = [ ]
184218 }
185219 }
186220
221+ /**
222+ * Schedule a persist operation with debouncing to avoid race conditions
223+ */
224+ private schedulePersist ( ) : void {
225+ if ( this . pendingPersist ) {
226+ // A persist is already scheduled
227+ return
228+ }
229+
230+ this . pendingPersist = true
231+
232+ // Use setImmediate to batch multiple rapid enqueue operations
233+ setImmediate ( ( ) => {
234+ this . pendingPersist = false
235+ this . persistQueue ( )
236+ } )
237+ }
238+
187239 /**
188240 * Persist queue to disk
189241 */
190242 private async persistQueue ( ) : Promise < void > {
243+ // If a persist is already in progress, wait for it to complete
244+ if ( this . persistPromise ) {
245+ await this . persistPromise
246+ return
247+ }
248+
249+ this . persistPromise = this . doPersist ( )
250+ try {
251+ await this . persistPromise
252+ } finally {
253+ this . persistPromise = null
254+ }
255+ }
256+
257+ /**
258+ * Actually perform the persist operation
259+ */
260+ private async doPersist ( ) : Promise < void > {
191261 try {
192262 const state : QueueState = {
193263 events : this . queue ,
194264 version : this . QUEUE_VERSION ,
195265 }
196266
197- console . log ( `[TelemetryQueue] Persisting ${ this . queue . length } events to disk` )
267+ if ( this . debug ) {
268+ console . log ( `[TelemetryQueue] Persisting ${ this . queue . length } events to disk` )
269+ }
198270
199271 // Ensure directory exists
200272 const dir = path . dirname ( this . persistPath )
@@ -205,10 +277,14 @@ export class TelemetryQueueManager {
205277 await fs . writeFile ( tempPath , JSON . stringify ( state , null , 2 ) )
206278 await fs . rename ( tempPath , this . persistPath )
207279
208- console . log ( `[TelemetryQueue] Successfully persisted queue to: ${ this . persistPath } ` )
280+ if ( this . debug ) {
281+ console . log ( `[TelemetryQueue] Successfully persisted queue to: ${ this . persistPath } ` )
282+ }
209283 } catch ( error ) {
210284 // Log error but don't throw - telemetry should not break the app
211- console . error ( "[TelemetryQueue] Failed to persist telemetry queue:" , error )
285+ if ( this . debug ) {
286+ console . error ( "[TelemetryQueue] Failed to persist telemetry queue:" , error )
287+ }
212288 }
213289 }
214290
@@ -243,14 +319,16 @@ export class TelemetryQueueManager {
243319 */
244320 private performAggressiveCleanup ( ) : void {
245321 const originalSize = this . queue . length
246- console . log ( `[TelemetryQueue] Running aggressive cleanup, current queue size: ${ originalSize } ` )
322+ if ( this . debug ) {
323+ console . log ( `[TelemetryQueue] Running aggressive cleanup, current queue size: ${ originalSize } ` )
324+ }
247325
248326 // Remove old events
249327 const cutoffTime = Date . now ( ) - this . MAX_EVENT_AGE
250328 const beforeOldFilter = this . queue . length
251329 this . queue = this . queue . filter ( ( e ) => e . timestamp > cutoffTime )
252330 const removedOld = beforeOldFilter - this . queue . length
253- if ( removedOld > 0 ) {
331+ if ( removedOld > 0 && this . debug ) {
254332 console . log ( `[TelemetryQueue] Removed ${ removedOld } events older than 24 hours` )
255333 }
256334
@@ -262,15 +340,19 @@ export class TelemetryQueueManager {
262340 this . queue . sort ( ( a , b ) => b . timestamp - a . timestamp )
263341 const beforeTrim = this . queue . length
264342 this . queue = this . queue . slice ( 0 , this . maxQueueSize )
265- console . log ( `[TelemetryQueue] Trimmed queue from ${ beforeTrim } to ${ this . maxQueueSize } events` )
343+ if ( this . debug ) {
344+ console . log ( `[TelemetryQueue] Trimmed queue from ${ beforeTrim } to ${ this . maxQueueSize } events` )
345+ }
266346 }
267347
268348 // Persist if we made changes
269349 if ( this . queue . length !== originalSize ) {
270- console . log (
271- `[TelemetryQueue] Cleanup complete, queue size changed from ${ originalSize } to ${ this . queue . length } ` ,
272- )
273- this . persistQueue ( )
350+ if ( this . debug ) {
351+ console . log (
352+ `[TelemetryQueue] Cleanup complete, queue size changed from ${ originalSize } to ${ this . queue . length } ` ,
353+ )
354+ }
355+ this . schedulePersist ( )
274356 }
275357 }
276358
@@ -283,7 +365,7 @@ export class TelemetryQueueManager {
283365 this . queue = this . queue . filter ( ( e ) => e . timestamp > cutoffTime )
284366
285367 if ( this . queue . length < originalLength ) {
286- this . persistQueue ( )
368+ this . schedulePersist ( )
287369 }
288370 }
289371
0 commit comments