Skip to content

[bug] Fix datalayer Collector test flake #1342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/epp/datalayer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func NewCollector() *Collector {

// Start initiates data source collection for the endpoint.
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
var ready chan struct{}
started := false

c.startOnce.Do(func() {
c.ctx, c.cancel = context.WithCancel(ctx)
started = true
ready = make(chan struct{})

go func(endpoint Endpoint, sources []DataSource) {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
Expand All @@ -97,6 +100,8 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
ticker.Stop()
}()

close(ready) // signal ready to accept ticks

for {
select {
case <-c.ctx.Done(): // per endpoint context cancelled
Expand All @@ -115,7 +120,21 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
if !started {
return errors.New("collector start called multiple times")
}
return nil

// Wait for goroutine to signal readiness.
// The use of ready channel is mostly to make the function testable, by ensuring
// synchronous order of events. Ignoring test requirements, one could let the
// go routine start at some arbitrary point in the future, possibly after this
// function has returned.
select {
case <-ready:
return nil
case <-ctx.Done():
if c.cancel != nil {
c.cancel() // ensure clean up
}
return ctx.Err()
}
}

// Stop terminates the collector.
Expand Down
11 changes: 6 additions & 5 deletions pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ func TestCollectorCollectsOnTicks(t *testing.T) {
c := NewCollector()
ticker := mocks.NewTicker()
ctx := context.Background()
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))

require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
ticker.Tick()
ticker.Tick()
time.Sleep(20 * time.Millisecond) // let collector process the ticks

got := atomic.LoadInt64(&source.callCount)
want := int64(2)
assert.Equal(t, want, got, "call count mismatch")
// use Eventually for async processing
require.Eventually(t, func() bool {
return atomic.LoadInt64(&source.callCount) == 2
}, 1*time.Second, 2*time.Millisecond, "expected 2 collections")

require.NoError(t, c.Stop())
}

Expand Down