@@ -2,30 +2,86 @@ import { Pushgateway, Registry } from "prom-client"
22import delay from "delay"
33import crypto from "crypto"
44import retry from "async-retry"
5+ import path from "path"
6+ import * as lockfile from "proper-lockfile"
7+ import * as fs from "fs/promises"
8+ import * as os from "os"
59
6- import { type TelemetryEvent } from "@roo-code/types"
10+ import { type TelemetryEvent , TelemetryEventName } from "@roo-code/types"
711
812import { BaseTelemetryClient } from "../BaseTelemetryClient"
913import MetricsRecorder from "./metrics"
1014import { createLogger , ILogger } from "../../../../src/utils/logger"
1115import { getWorkspacePath } from "../../../../src/utils/path"
1216import { Package } from "../../../../src/shared/package"
1317import { ClineProvider } from "../../../../src/core/webview/ClineProvider"
18+ import { MetricsSerializer } from "./MetricsSerializer"
1419
1520export class PrometheusTelemetryClient extends BaseTelemetryClient {
1621 private endpoint : string
17- private registry : Registry
18- private metricsRecorder : MetricsRecorder
1922 private logger : ILogger
23+ private metricsSerializer : MetricsSerializer
24+ private persistenceFilePath : string
25+
2026 constructor ( endpoint : string , debug = false ) {
21- super ( undefined , debug )
27+ super (
28+ {
29+ type : "include" ,
30+ events : [
31+ TelemetryEventName . CODE_ACCEPT ,
32+ TelemetryEventName . CODE_REJECT ,
33+ TelemetryEventName . CODE_TAB_COMPLETION ,
34+ TelemetryEventName . ERROR ,
35+ ] ,
36+ } ,
37+ debug ,
38+ )
2239 this . endpoint = endpoint
2340 this . logger = createLogger ( Package . outputChannel )
24- this . registry = new Registry ( )
25- this . metricsRecorder = new MetricsRecorder ( this . registry )
41+ this . metricsSerializer = new MetricsSerializer ( this . logger )
42+
43+ const workspaceHash = this . hashWorkspaceDir ( )
44+ const homeDir = os . homedir ( )
45+ const persistenceDir = path . join ( homeDir , ".costrict" , "telemetry" )
46+ this . persistenceFilePath = path . join ( persistenceDir , `metrics-${ workspaceHash } .json` )
47+
2648 this . setupPush ( )
2749 this . updateTelemetryState ( true )
2850 }
51+
52+ private async operateWithLock < T > (
53+ operation : ( recorder : MetricsRecorder , registry : Registry ) => Promise < T > ,
54+ ) : Promise < T | undefined > {
55+ try {
56+ // Check if file exists first, only create directory and file if needed
57+ try {
58+ await fs . access ( this . persistenceFilePath )
59+ } catch ( _ ) {
60+ // File doesn't exist, create directory and file
61+ await fs . mkdir ( path . dirname ( this . persistenceFilePath ) , { recursive : true } )
62+ await fs . writeFile ( this . persistenceFilePath , "[]" , "utf-8" )
63+ }
64+
65+ await lockfile . lock ( this . persistenceFilePath )
66+
67+ const tempRegistry = new Registry ( )
68+ // Recorder is created ONCE here to register metrics.
69+ const tempRecorder = new MetricsRecorder ( tempRegistry )
70+
71+ await this . metricsSerializer . load ( tempRegistry , this . persistenceFilePath )
72+
73+ // We pass the recorder instance to the operation.
74+ const result = await operation ( tempRecorder , tempRegistry )
75+
76+ return result
77+ } catch ( error ) {
78+ this . logger . error ( `[PrometheusTelemetryClient] Lock operation failed: ${ error } ` )
79+ return undefined
80+ } finally {
81+ await lockfile . unlock ( this . persistenceFilePath )
82+ }
83+ }
84+
2985 private hashWorkspaceDir ( ) {
3086 return crypto . createHash ( "sha256" ) . update ( getWorkspacePath ( ) ) . digest ( "hex" ) . toString ( ) . slice ( 0 , 8 )
3187 }
@@ -34,40 +90,42 @@ export class PrometheusTelemetryClient extends BaseTelemetryClient {
3490 setInterval ( async ( ) => {
3591 try {
3692 if ( this . debug ) {
37- this . logger . debug ( `[PrometheusTelemetryClient#push] Pushing metrics ` )
93+ this . logger . debug ( `[PrometheusTelemetryClient] Periodic push triggered. ` )
3894 }
3995 await delay ( Math . random ( ) * 1000 )
4096 await this . pushAdd ( )
4197 } catch ( error ) {
42- this . logger . error ( `[PrometheusTelemetryClient#push ] ${ error } ` )
98+ this . logger . error ( `[PrometheusTelemetryClient#setupPush ] ${ error } ` )
4399 }
44100 } , times )
45101 }
46- // Implement an immediate push method
102+
47103 public async pushAdd ( ) {
48- const provider = this . providerRef ?. deref ( ) as unknown as ClineProvider
49- const { apiConfiguration } = await provider . getState ( )
50- const { zgsmAccessToken } = apiConfiguration
51- const client = new Pushgateway (
52- this . endpoint ,
53- {
54- headers : {
55- Authorization : `Bearer ${ zgsmAccessToken } ` ,
56- } ,
57- } ,
58- this . registry ,
59- )
60- await retry (
61- async ( ) => {
62- await client . pushAdd ( {
63- jobName : "costrict" ,
64- groupings : {
65- instance : this . hashWorkspaceDir ( ) ,
66- } ,
67- } )
68- } ,
69- { retries : 3 } ,
70- )
104+ return this . operateWithLock ( async ( recorder , registry ) => {
105+ // recorder is unused here, which is perfectly fine.
106+ if ( ( await registry . getMetricsAsJSON ( ) ) . length === 0 ) {
107+ this . logger . debug ( "[PrometheusTelemetryClient] No metrics to push." )
108+ return
109+ }
110+ const provider = this . providerRef ?. deref ( ) as unknown as ClineProvider
111+ const { apiConfiguration } = await provider . getState ( )
112+ const { zgsmAccessToken } = apiConfiguration
113+ const client = new Pushgateway (
114+ this . endpoint ,
115+ { headers : { Authorization : `Bearer ${ zgsmAccessToken } ` } } ,
116+ registry ,
117+ )
118+ await retry (
119+ ( ) =>
120+ client . pushAdd ( {
121+ jobName : "costrict" ,
122+ groupings : { instance : this . hashWorkspaceDir ( ) } ,
123+ } ) ,
124+ { retries : 3 } ,
125+ )
126+
127+ this . logger . debug ( "[PrometheusTelemetryClient] Push successful." )
128+ } )
71129 }
72130
73131 public override async capture ( event : TelemetryEvent ) : Promise < void > {
@@ -77,21 +135,21 @@ export class PrometheusTelemetryClient extends BaseTelemetryClient {
77135 }
78136 return
79137 }
80- try {
138+
139+ await this . operateWithLock ( async ( recorder , registry ) => {
140+ // Use the recorder instance passed from operateWithLock. NO `new` here.
81141 const properties = await this . getEventProperties ( event )
82- this . metricsRecorder . record ( {
83- event : event . event ,
84- properties,
85- } )
86- if ( this . debug ) {
87- this . logger . debug ( `[PrometheusTelemetryClient#capture] ${ event . event } ` )
88- }
89- } catch ( error ) {
142+ recorder . record ( { event : event . event , properties } )
143+
144+ // Save the modified state back to the file.
145+ await this . metricsSerializer . save ( registry , this . persistenceFilePath )
146+
90147 if ( this . debug ) {
91- this . logger . error ( `[PrometheusTelemetryClient#capture] ${ error } ` )
148+ this . logger . debug ( `[PrometheusTelemetryClient#capture] Captured and persisted: ${ event . event } ` )
92149 }
93- }
150+ } )
94151 }
152+
95153 protected override async getEventProperties ( event : TelemetryEvent ) {
96154 let providerProperties : TelemetryEvent [ "properties" ] = { }
97155 const { properties } = event
@@ -114,14 +172,12 @@ export class PrometheusTelemetryClient extends BaseTelemetryClient {
114172 this . telemetryEnabled = true
115173 }
116174 public override shutdown ( ) : Promise < void > {
117- return new Promise ( ( resolve , reject ) => {
175+ return new Promise ( ( resolve ) => {
118176 this . pushAdd ( )
119- . then ( ( ) => {
120- resolve ( )
121- } )
122177 . catch ( ( error ) => {
123- reject ( error )
178+ this . logger . error ( `[PrometheusTelemetryClient#shutdown] Final push failed: ${ error } ` )
124179 } )
180+ . finally ( resolve )
125181 } )
126182 }
127183}
0 commit comments