diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 94aa9b6bd..87d2a62e6 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -83,10 +83,13 @@ func NewCollector() *Collector { // Start initiates data source collection for the endpoint. func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error { + var ready chan struct{} started := false + c.startOnce.Do(func() { c.ctx, c.cancel = context.WithCancel(ctx) started = true + ready = make(chan struct{}) go func(endpoint Endpoint, sources []DataSource) { logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) @@ -97,6 +100,8 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc ticker.Stop() }() + close(ready) // signal ready to accept ticks + for { select { case <-c.ctx.Done(): // per endpoint context cancelled @@ -115,7 +120,21 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc if !started { return errors.New("collector start called multiple times") } - return nil + + // Wait for goroutine to signal readiness. + // The use of ready channel is mostly to make the function testable, by ensuring + // synchronous order of events. Ignoring test requirements, one could let the + // go routine start at some arbitrary point in the future, possibly after this + // function has returned. + select { + case <-ready: + return nil + case <-ctx.Done(): + if c.cancel != nil { + c.cancel() // ensure clean up + } + return ctx.Err() + } } // Stop terminates the collector. diff --git a/pkg/epp/datalayer/collector_test.go b/pkg/epp/datalayer/collector_test.go index b5f348d24..2d47de30a 100644 --- a/pkg/epp/datalayer/collector_test.go +++ b/pkg/epp/datalayer/collector_test.go @@ -98,15 +98,16 @@ func TestCollectorCollectsOnTicks(t *testing.T) { c := NewCollector() ticker := mocks.NewTicker() ctx := context.Background() - require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source})) + require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source})) ticker.Tick() ticker.Tick() - time.Sleep(20 * time.Millisecond) // let collector process the ticks - got := atomic.LoadInt64(&source.callCount) - want := int64(2) - assert.Equal(t, want, got, "call count mismatch") + // use Eventually for async processing + require.Eventually(t, func() bool { + return atomic.LoadInt64(&source.callCount) == 2 + }, 1*time.Second, 2*time.Millisecond, "expected 2 collections") + require.NoError(t, c.Stop()) }