@@ -396,8 +396,11 @@ func TestEventShutdown(t *testing.T) {
396396 payload : pcommon .TraceID ([16 ]byte {1 , 2 , 3 , 4 }),
397397 })
398398
399- time .Sleep (10 * time .Millisecond ) // give it a bit of time to process the items
400- assert .Equal (t , 1 , em .numEvents ()) // we should have one pending event in the queue, the second traceRemoved event
399+ // wait for events to process - we should have one pending event in the queue, the second traceRemoved event
400+ assert .Eventually (t , func () bool {
401+ return em .numEvents () == 1
402+ }, 1 * time .Second , 10 * time .Millisecond )
403+ assert .Equal (t , 1 , em .numEvents ())
401404
402405 shutdownWg := sync.WaitGroup {}
403406 shutdownWg .Add (1 )
@@ -406,9 +409,11 @@ func TestEventShutdown(t *testing.T) {
406409 shutdownWg .Done ()
407410 }()
408411
409- wg .Done () // the pending event should be processed
410- time .Sleep (100 * time .Millisecond ) // give it a bit of time to process the items
411-
412+ wg .Done () // the pending event should be processed
413+ // wait for shutdown to process remaining events
414+ assert .Eventually (t , func () bool {
415+ return em .numEvents () == 0
416+ }, 1 * time .Second , 10 * time .Millisecond )
412417 assert .Equal (t , 0 , em .numEvents ())
413418
414419 // new events should *not* be processed
@@ -422,8 +427,10 @@ func TestEventShutdown(t *testing.T) {
422427
423428 // If the code is wrong, there's a chance that the test will still pass
424429 // in case the event is processed after the assertion.
425- // for this reason, we add a small delay here
426- time .Sleep (10 * time .Millisecond )
430+ // Verify that the expired event is not processed (should remain 0)
431+ assert .Eventually (t , func () bool {
432+ return traceExpiredFired .Load () == 0
433+ }, 100 * time .Millisecond , 5 * time .Millisecond )
427434 assert .Equal (t , int64 (0 ), traceExpiredFired .Load ())
428435
429436 // wait until the shutdown has returned
@@ -467,11 +474,13 @@ func TestPeriodicMetrics(t *testing.T) {
467474 em .workers [0 ].fire (event {typ : traceReceived }) // the first is consumed right away, the second is in the queue
468475 go em .periodicMetrics ()
469476
470- // TODO: Remove time.Sleep below, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/42515
471- time .Sleep (10 * time .Millisecond )
472477 // ensure our gauge is showing 1 item in the queue
473478 assert .EventuallyWithT (t , func (tt * assert.CollectT ) {
474479 val := getGaugeValue (t .Context (), tt , "otelcol_processor_groupbytrace_num_events_in_queue" , s )
480+ if val == - 1 {
481+ tt .Errorf ("gauge not yet created or has no data points" )
482+ return
483+ }
475484 assert .Equal (tt , int64 (1 ), val )
476485 }, 1 * time .Second , 10 * time .Millisecond )
477486
@@ -480,14 +489,23 @@ func TestPeriodicMetrics(t *testing.T) {
480489 // ensure our gauge is now showing no items in the queue
481490 assert .EventuallyWithT (t , func (tt * assert.CollectT ) {
482491 val := getGaugeValue (t .Context (), tt , "otelcol_processor_groupbytrace_num_events_in_queue" , s )
492+ if val == - 1 {
493+ tt .Errorf ("gauge not yet created or has no data points" )
494+ return
495+ }
483496 assert .Equal (tt , int64 (0 ), val )
484497 }, 1 * time .Second , 10 * time .Millisecond )
485498
486499 // signal and wait for the recursive call to finish
487500 em .shutdownLock .Lock ()
488501 em .closed = true
489502 em .shutdownLock .Unlock ()
490- time .Sleep (5 * time .Millisecond )
503+ // Wait for periodicMetrics to detect the closed flag and return
504+ assert .Eventually (t , func () bool {
505+ em .shutdownLock .RLock ()
506+ defer em .shutdownLock .RUnlock ()
507+ return em .closed
508+ }, 100 * time .Millisecond , 5 * time .Millisecond )
491509}
492510
493511func TestForceShutdown (t * testing.T ) {
@@ -507,8 +525,12 @@ func TestForceShutdown(t *testing.T) {
507525 // verify
508526 assert .Greater (t , duration , 20 * time .Millisecond )
509527
510- // wait for shutdown goroutine to end
511- time .Sleep (100 * time .Millisecond )
528+ // Verify shutdown completed - the machine should be closed
529+ assert .Eventually (t , func () bool {
530+ em .shutdownLock .RLock ()
531+ defer em .shutdownLock .RUnlock ()
532+ return em .closed
533+ }, 100 * time .Millisecond , 5 * time .Millisecond )
512534}
513535
514536func TestDoWithTimeout_NoTimeout (t * testing.T ) {
@@ -525,10 +547,11 @@ func TestDoWithTimeout_NoTimeout(t *testing.T) {
525547func TestDoWithTimeout_TimeoutTrigger (t * testing.T ) {
526548 // prepare
527549 start := time .Now ()
550+ blockCh := make (chan struct {}) // channel that will never be closed/signaled
528551
529552 // test
530553 succeed , err := doWithTimeout (20 * time .Millisecond , func () error {
531- time . Sleep ( 1 * time . Second )
554+ <- blockCh // block forever (simulating a long-running function )
532555 return nil
533556 })
534557 assert .False (t , succeed )
@@ -541,13 +564,18 @@ func TestDoWithTimeout_TimeoutTrigger(t *testing.T) {
541564func getGaugeValue (ctx context.Context , t * assert.CollectT , name string , tt testTelemetry ) int64 {
542565 var md metricdata.ResourceMetrics
543566 require .NoError (t , tt .reader .Collect (ctx , & md ))
544- m := tt .getMetric (name , md ).Data
567+ metric := tt .getMetric (name , md )
568+ if metric == (metricdata.Metrics {}) {
569+ return - 1 // return sentinel value to indicate metric doesn't exist yet
570+ }
571+ m := metric .Data
545572 var g metricdata.Gauge [int64 ]
546573 var ok bool
547574 if g , ok = m .(metricdata.Gauge [int64 ]); ! ok {
548- assert .Fail (t , "missing gauge data" )
549- } else {
550- assert .Len (t , g .DataPoints , 1 , "expected exactly one data point" )
575+ return - 1 // return sentinel value to indicate gauge data is missing
576+ }
577+ if len (g .DataPoints ) == 0 {
578+ return - 1 // return sentinel value to indicate no data points yet
551579 }
552580 return g .DataPoints [0 ].Value
553581}
0 commit comments