88 "path/filepath"
99 "runtime"
1010 "sync"
11- "syscall"
1211 "text/template"
1312 "time"
1413
@@ -25,8 +24,9 @@ type TelemetryManager struct {
2524 mu sync.RWMutex
2625
2726 // Components
28- receiver * uploader.Receiver
29- s3Uploader * uploader.S3Uploader
27+ receiver * uploader.Receiver
28+ s3Uploader * uploader.S3Uploader
29+ otelCollectorCmd * exec.Cmd
3030
3131 // Configuration
3232 port int
@@ -78,6 +78,10 @@ func (tm *TelemetryManager) Start() error {
7878 tm .wg .Add (1 )
7979 go tm .startOtelCollector ()
8080
81+ // Start telemetry status monitoring
82+ tm .wg .Add (1 )
83+ go tm .monitorTelemetryStatus ()
84+
8185 log .Logger ().Infof ("Telemetry manager started successfully" )
8286 return nil
8387}
@@ -129,26 +133,16 @@ func (tm *TelemetryManager) startOtelCollector() {
129133
130134 log .Logger ().Infof ("OpenTelemetry Collector configuration written successfully" )
131135
132- // Channel to signal when the application should terminate
133- done := make (chan bool , 1 )
134-
135- // Start OpenTelemetry Collector Contrib
136- go func () {
137- defer tm .handlePanic ()
138- log .Logger ().Infof ("Launching OpenTelemetry Collector in background..." )
139- tm .runOtelCollector (collectorPath , done )
140- }()
136+ log .Logger ().Infof ("Launching OpenTelemetry Collector in background..." )
141137
142- // Wait for context cancellation
143- <- tm .ctx .Done ()
144- log .Logger ().Infof ("Context cancelled, stopping OTEL collector..." )
138+ // Start the OTEL collector and wait for context cancellation
139+ tm .runOtelCollector (collectorPath )
145140
146- // Signal the OpenTelemetry Collector process to terminate
147- done <- true
141+ log .Logger ().Infof ("OTEL collector goroutine exited" )
148142}
149143
150144// runOtelCollector runs the OTEL collector process
151- func (tm * TelemetryManager ) runOtelCollector (collectorPath string , done chan bool ) {
145+ func (tm * TelemetryManager ) runOtelCollector (collectorPath string ) {
152146 configPath := tm .getConfigFilePath ()
153147 log .Logger ().Infof ("Starting OpenTelemetry Collector with config: %s" , configPath )
154148
@@ -160,29 +154,57 @@ func (tm *TelemetryManager) runOtelCollector(collectorPath string, done chan boo
160154
161155 log .Logger ().Infof ("OpenTelemetry Collector command: %s --config %s" , collectorPath , configPath )
162156
163- err := cmd .Start ()
164- if err != nil {
157+ if err := cmd .Start (); err != nil {
165158 log .Logger ().Errorf ("Failed to start OpenTelemetry Collector: %v" , err )
166159 return
167160 }
168161
162+ // Store the command reference so we can stop it later
163+ tm .mu .Lock ()
164+ tm .otelCollectorCmd = cmd
165+ tm .mu .Unlock ()
166+
169167 log .Logger ().Infof ("OpenTelemetry Collector started with PID: %d" , cmd .Process .Pid )
170168
169+ // Channel to track when cmd.Wait() completes
170+ waitDone := make (chan error , 1 )
171+
172+ // Wait for the process to exit in a separate goroutine
171173 go func () {
172- <- done
173- log .Logger ().Infof ("Signaling OpenTelemetry Collector to terminate..." )
174- if err := cmd .Process .Signal (syscall .SIGTERM ); err != nil {
175- log .Logger ().Errorf ("Failed to terminate OpenTelemetry Collector: %v" , err )
176- }
174+ waitDone <- cmd .Wait ()
177175 }()
178176
179- go func () {
180- if err := cmd .Wait (); err != nil {
177+ // Wait for either context cancellation or process exit
178+ select {
179+ case <- tm .ctx .Done ():
180+ log .Logger ().Infof ("Context cancelled, stopping OTEL collector (PID: %d)..." , cmd .Process .Pid )
181+
182+ // Kill the process - Go handles OS-specific details
183+ if err := cmd .Process .Kill (); err != nil {
184+ log .Logger ().Errorf ("Failed to kill OpenTelemetry Collector process: %v" , err )
185+ }
186+
187+ // Wait for the process to actually exit (with timeout)
188+ select {
189+ case err := <- waitDone :
190+ if err != nil {
191+ log .Logger ().Infof ("OpenTelemetry Collector terminated with error: %v" , err )
192+ } else {
193+ log .Logger ().Infof ("OpenTelemetry Collector terminated successfully" )
194+ }
195+ case <- time .After (5 * time .Second ):
196+ log .Logger ().Warnf ("Timeout waiting for OpenTelemetry Collector to exit after 5 seconds" )
197+ }
198+
199+ case err := <- waitDone :
200+ if err != nil {
181201 log .Logger ().Errorf ("OpenTelemetry Collector exited with error: %v" , err )
182202 } else {
183203 log .Logger ().Infof ("OpenTelemetry Collector exited successfully" )
184204 }
185- }()
205+ }
206+
207+ log .Logger ().Infof ("OpenTelemetry Collector process handler completed" )
186208}
187209
188210// handlePanic handles panics in goroutines
@@ -311,3 +333,69 @@ func (tm *TelemetryManager) getOtelCollectorOutputFilePath(isMetrics bool) strin
311333 }
312334 return filepath .Join (tm .baseDirectory , "otel-out.log" )
313335}
336+
337+ // monitorTelemetryStatus monitors the telemetry enabled status via API polling
338+ func (tm * TelemetryManager ) monitorTelemetryStatus () {
339+ defer tm .wg .Done ()
340+
341+ log .Logger ().Infof ("Starting telemetry status monitoring..." )
342+
343+ ticker := time .NewTicker (1 * time .Second )
344+ defer ticker .Stop ()
345+
346+ for {
347+ select {
348+ case <- ticker .C :
349+ // Poll the API to check telemetry status
350+ allocationDetails , resp , err := tm .warpbuildAPI .V1RunnerInstanceAPI .
351+ GetRunnerInstanceAllocationDetails (tm .ctx , tm .runnerID ).
352+ XPOLLINGSECRET (tm .pollingSecret ).
353+ Execute ()
354+
355+ if err != nil {
356+ log .Logger ().Debugf ("Failed to get runner instance allocation details: %v" , err )
357+ if resp != nil {
358+ log .Logger ().Debugf ("Response: %+v" , resp )
359+ }
360+ continue
361+ }
362+
363+ if allocationDetails == nil {
364+ log .Logger ().Debugf ("No runner instance allocation details found" )
365+ continue
366+ }
367+
368+ // Check if telemetry is disabled
369+ if allocationDetails .HasTelemetryEnabled () {
370+ telemetryEnabled := allocationDetails .GetTelemetryEnabled ()
371+
372+ if ! telemetryEnabled {
373+ log .Logger ().Infof ("Telemetry has been disabled via API. Stopping telemetry collection..." )
374+ tm .stopOtelCollector ()
375+
376+ // Cancel the context to stop the entire telemetry manager
377+ tm .cancel ()
378+ return
379+ }
380+ }
381+
382+ case <- tm .ctx .Done ():
383+ log .Logger ().Infof ("Context cancelled, stopping telemetry status monitoring..." )
384+ return
385+ }
386+ }
387+ }
388+
389+ // stopOtelCollector stops the OTEL collector process by canceling the context
390+ func (tm * TelemetryManager ) stopOtelCollector () {
391+ tm .mu .RLock ()
392+ defer tm .mu .RUnlock ()
393+
394+ if tm .otelCollectorCmd == nil || tm .otelCollectorCmd .Process == nil {
395+ log .Logger ().Infof ("OTEL collector process not running" )
396+ return
397+ }
398+
399+ log .Logger ().Infof ("Stopping OTEL collector process (PID: %d)..." , tm .otelCollectorCmd .Process .Pid )
400+ // The actual termination will be handled by the context cancellation in runOtelCollector
401+ }
0 commit comments