Skip to content

Commit 3c73cbe

Browse files
authored
[bug] Fix datalayer Collector test flake (#1342)
* synchronize Start with goroutine Signed-off-by: Etai Lev Ran <[email protected]> * avoid test race condition Signed-off-by: Etai Lev Ran <[email protected]> * always create channel Signed-off-by: Etai Lev Ran <[email protected]> * reverrt to using a channel var Signed-off-by: Etai Lev Ran <[email protected]> * clarify comment Signed-off-by: Etai Lev Ran <[email protected]> * clarify use of ready channel in testing Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent c6f3df2 commit 3c73cbe

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

pkg/epp/datalayer/collector.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,13 @@ func NewCollector() *Collector {
8383

8484
// Start initiates data source collection for the endpoint.
8585
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
86+
var ready chan struct{}
8687
started := false
88+
8789
c.startOnce.Do(func() {
8890
c.ctx, c.cancel = context.WithCancel(ctx)
8991
started = true
92+
ready = make(chan struct{})
9093

9194
go func(endpoint Endpoint, sources []DataSource) {
9295
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
@@ -97,6 +100,8 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
97100
ticker.Stop()
98101
}()
99102

103+
close(ready) // signal ready to accept ticks
104+
100105
for {
101106
select {
102107
case <-c.ctx.Done(): // per endpoint context cancelled
@@ -115,7 +120,21 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
115120
if !started {
116121
return errors.New("collector start called multiple times")
117122
}
118-
return nil
123+
124+
// Wait for goroutine to signal readiness.
125+
// The use of ready channel is mostly to make the function testable, by ensuring
126+
// synchronous order of events. Ignoring test requirements, one could let the
127+
// go routine start at some arbitrary point in the future, possibly after this
128+
// function has returned.
129+
select {
130+
case <-ready:
131+
return nil
132+
case <-ctx.Done():
133+
if c.cancel != nil {
134+
c.cancel() // ensure clean up
135+
}
136+
return ctx.Err()
137+
}
119138
}
120139

121140
// Stop terminates the collector.

pkg/epp/datalayer/collector_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ func TestCollectorCollectsOnTicks(t *testing.T) {
9898
c := NewCollector()
9999
ticker := mocks.NewTicker()
100100
ctx := context.Background()
101-
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
102101

102+
require.NoError(t, c.Start(ctx, ticker, endpoint, []DataSource{source}))
103103
ticker.Tick()
104104
ticker.Tick()
105-
time.Sleep(20 * time.Millisecond) // let collector process the ticks
106105

107-
got := atomic.LoadInt64(&source.callCount)
108-
want := int64(2)
109-
assert.Equal(t, want, got, "call count mismatch")
106+
// use Eventually for async processing
107+
require.Eventually(t, func() bool {
108+
return atomic.LoadInt64(&source.callCount) == 2
109+
}, 1*time.Second, 2*time.Millisecond, "expected 2 collections")
110+
110111
require.NoError(t, c.Stop())
111112
}
112113

0 commit comments

Comments
 (0)