Skip to content

Commit a6b8e99

Browse files
ventifusclaude
andcommitted
pkg/monitor,pkg/util/changefeed,Makefile: fix changefeed test races
Replace AllIteratorsConsumed()-based synchronization in changefeed tests with barriers derived from GetLastProcessed(), which advances only after OnAllPendingProcessed() completes a full sweep. This is the correct settled-cache barrier. Key changes: - pkg/util/changefeed/subscriptioncache_test.go: wait for two GetLastProcessed() advancements after each mutation group to guarantee a complete post-mutation sweep has run - pkg/monitor/worker_test.go: replace vacuous len(docs) barrier with lastClusterChangefeed timestamp barrier; same two-advancement pattern for both cluster and subscription caches - pkg/database/cosmosdb: add per-iterator sync.Mutex to fakeSubscriptionDocumentIterator and fakeOpenShiftClusterDocumentIterator; fix ChangeFeed() to hold a write lock (not read lock) while appending to changeFeedIterators - pkg/database/cosmosdb: delete AllIteratorsConsumed() extension methods (no callers remain) - pkg/monitor/test_helpers.go: remove unused fake client fields from TestEnvironment - Makefile: add -race flag to unit-test-go target Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent cff838e commit a6b8e99

File tree

8 files changed

+158
-83
lines changed

8 files changed

+158
-83
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ validate-fips: $(BINGO)
334334

335335
.PHONY: unit-test-go
336336
unit-test-go: $(GOTESTSUM)
337-
$(GOTESTSUM) --format pkgname --junitfile report.xml -- -coverprofile=cover.out ./...
337+
$(GOTESTSUM) --format pkgname --junitfile report.xml -- -race -coverprofile=cover.out ./...
338338

339339
.PHONY: unit-test-go-coverpkg
340340
unit-test-go-coverpkg: $(GOTESTSUM)

pkg/database/cosmosdb/openshiftcluster_ext.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

pkg/database/cosmosdb/subscriptions_ext.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

pkg/database/cosmosdb/zz_generated_openshiftclusterdocument_fake.go

Lines changed: 21 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/database/cosmosdb/zz_generated_subscriptiondocument_fake.go

Lines changed: 21 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/monitor/test_helpers.go

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,24 @@ var fakeClusterVisitMonitoringAttempts = map[string]*int{}
3232

3333
// TestEnvironment contains all the test setup components
3434
type TestEnvironment struct {
35-
OpenShiftClusterDB database.OpenShiftClusters
36-
SubscriptionsDB database.Subscriptions
37-
MonitorsDB database.Monitors
38-
OpenShiftClusterClient *cosmosdb.FakeOpenShiftClusterDocumentClient
39-
SubscriptionsClient *cosmosdb.FakeSubscriptionDocumentClient
40-
FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient
41-
Controller *gomock.Controller
42-
TestLogger *logrus.Entry
43-
Dialer *mock_proxy.MockDialer
44-
MockEnv *mock_env.MockInterface
45-
NoopMetricsEmitter noop.Noop
46-
NoopClusterMetrics noop.Noop
47-
DBGroup monitorDBs
35+
OpenShiftClusterDB database.OpenShiftClusters
36+
SubscriptionsDB database.Subscriptions
37+
MonitorsDB database.Monitors
38+
FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient
39+
Controller *gomock.Controller
40+
TestLogger *logrus.Entry
41+
Dialer *mock_proxy.MockDialer
42+
MockEnv *mock_env.MockInterface
43+
NoopMetricsEmitter noop.Noop
44+
NoopClusterMetrics noop.Noop
45+
DBGroup monitorDBs
4846
}
4947

5048
// SetupTestEnvironment creates a common test environment for monitor tests
5149
func SetupTestEnvironment(t *testing.T) *TestEnvironment {
5250
// Create databases
53-
openShiftClusterDB, openShiftClusterClient := testdatabase.NewFakeOpenShiftClusters()
54-
subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions()
51+
openShiftClusterDB, _ := testdatabase.NewFakeOpenShiftClusters()
52+
subscriptionsDB, _ := testdatabase.NewFakeSubscriptions()
5553
monitorsDB, fakeMonitorsDBClient := testdatabase.NewFakeMonitors()
5654

5755
// Create mocks
@@ -85,19 +83,17 @@ func SetupTestEnvironment(t *testing.T) *TestEnvironment {
8583
f.Create()
8684

8785
return &TestEnvironment{
88-
OpenShiftClusterDB: openShiftClusterDB,
89-
SubscriptionsDB: subscriptionsDB,
90-
MonitorsDB: monitorsDB,
91-
OpenShiftClusterClient: openShiftClusterClient,
92-
SubscriptionsClient: subscriptionsClient,
93-
FakeMonitorsDBClient: fakeMonitorsDBClient,
94-
Controller: ctrl,
95-
TestLogger: testlogger,
96-
Dialer: dialer,
97-
MockEnv: mockEnv,
98-
NoopMetricsEmitter: noopMetricsEmitter,
99-
NoopClusterMetrics: noopClusterMetricsEmitter,
100-
DBGroup: dbs,
86+
OpenShiftClusterDB: openShiftClusterDB,
87+
SubscriptionsDB: subscriptionsDB,
88+
MonitorsDB: monitorsDB,
89+
FakeMonitorsDBClient: fakeMonitorsDBClient,
90+
Controller: ctrl,
91+
TestLogger: testlogger,
92+
Dialer: dialer,
93+
MockEnv: mockEnv,
94+
NoopMetricsEmitter: noopMetricsEmitter,
95+
NoopClusterMetrics: noopClusterMetricsEmitter,
96+
DBGroup: dbs,
10197
}
10298
}
10399

pkg/monitor/worker_test.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,50 @@ func TestChangefeedOperations(t *testing.T) {
138138
}
139139
}
140140

141-
// Wait for changefeeds to be consumed
142-
assert.Eventually(t, env.OpenShiftClusterClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond)
143-
assert.Eventually(t, env.SubscriptionsClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond)
141+
// Capture baselines after mutations so that a changefeed sweep
142+
// that ran before the mutations cannot satisfy the barriers below.
143+
// Require two advancements: the first sweep after the baseline may
144+
// have already fetched an empty page before the mutation was
145+
// appended, so its OnAllPendingProcessed() can fire without the
146+
// mutation being processed. A second advancement guarantees a full
147+
// sweep ran after the mutation was visible to the iterator.
148+
beforeCluster, _ := mon.lastClusterChangefeed.Load().(time.Time)
149+
beforeSubs, _ := mon.subs.GetLastProcessed()
150+
151+
seenClusterAdvance := false
152+
assert.Eventually(t, func() bool {
153+
ts, ok := mon.lastClusterChangefeed.Load().(time.Time)
154+
if !ok || !ts.After(beforeCluster) {
155+
return false
156+
}
157+
if !seenClusterAdvance {
158+
seenClusterAdvance = true
159+
beforeCluster = ts
160+
return false
161+
}
162+
return true
163+
}, time.Second, 10*time.Millisecond)
164+
165+
seenSubsAdvance := false
166+
assert.Eventually(t, func() bool {
167+
last, ok := mon.subs.GetLastProcessed()
168+
if !ok || !last.After(beforeSubs) {
169+
return false
170+
}
171+
if !seenSubsAdvance {
172+
seenSubsAdvance = true
173+
beforeSubs = last
174+
return false
175+
}
176+
return true
177+
}, time.Second, 10*time.Millisecond)
144178

145179
// Validate expected results
146-
if len(mon.docs) != op.expectDocs {
147-
t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, len(mon.docs))
180+
mon.mu.RLock()
181+
nDocs := len(mon.docs)
182+
mon.mu.RUnlock()
183+
if nDocs != op.expectDocs {
184+
t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, nDocs)
148185
}
149186
if mon.subs.GetCacheSize() != op.expectSubs {
150187
t.Errorf("%s: expected %d subscriptions in cache, got %d", op.name, op.expectSubs, mon.subs.GetCacheSize())

pkg/util/changefeed/subscriptioncache_test.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestSubscriptionChangefeed(t *testing.T) {
7070
for _, tC := range testCases {
7171
t.Run(tC.desc, func(t *testing.T) {
7272
startedTime := time.Now().UnixNano()
73-
subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions()
73+
subscriptionsDB, _ := testdatabase.NewFakeSubscriptions()
7474
_, log := testlog.LogForTesting(t)
7575

7676
// need to register the changefeed before making documents
@@ -147,7 +147,6 @@ func TestSubscriptionChangefeed(t *testing.T) {
147147
go RunChangefeed(t.Context(), log, subscriptionChangefeed, 100*time.Microsecond, 1, cache, stop)
148148

149149
cache.WaitForInitialPopulation()
150-
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
151150

152151
// Create some after initially populated
153152
_, err := subscriptionsDB.Create(t.Context(), &api.SubscriptionDocument{
@@ -185,7 +184,26 @@ func TestSubscriptionChangefeed(t *testing.T) {
185184
},
186185
})
187186
require.NoError(t, err)
188-
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
187+
// Capture baseline after mutations. Require two advancements: the
188+
// first sweep after the baseline may have already fetched an empty
189+
// page before the mutation was appended, so its
190+
// OnAllPendingProcessed() can fire without the mutation being
191+
// processed. A second advancement guarantees a full sweep ran after
192+
// the mutation was visible to the iterator.
193+
seen := false
194+
before, _ := cache.GetLastProcessed()
195+
assert.Eventually(t, func() bool {
196+
last, ok := cache.GetLastProcessed()
197+
if !ok || !last.After(before) {
198+
return false
199+
}
200+
if !seen {
201+
seen = true
202+
before = last
203+
return false
204+
}
205+
return true
206+
}, time.Second, 1*time.Millisecond)
189207

190208
// Switch a registered to suspended
191209
old2, err := subscriptionsDB.Get(t.Context(), "8c90b62a-3783-4ea6-a8c8-cbaee4667ffd")
@@ -201,7 +219,20 @@ func TestSubscriptionChangefeed(t *testing.T) {
201219
},
202220
})
203221
require.NoError(t, err)
204-
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
222+
seen = false
223+
before, _ = cache.GetLastProcessed()
224+
assert.Eventually(t, func() bool {
225+
last, ok := cache.GetLastProcessed()
226+
if !ok || !last.After(before) {
227+
return false
228+
}
229+
if !seen {
230+
seen = true
231+
before = last
232+
return false
233+
}
234+
return true
235+
}, time.Second, 1*time.Millisecond)
205236

206237
// Switch a registered to deleted
207238
old3, err := subscriptionsDB.Get(t.Context(), "4e07b0f5-c789-4817-9079-94012b04e1c9")
@@ -217,7 +248,20 @@ func TestSubscriptionChangefeed(t *testing.T) {
217248
},
218249
})
219250
require.NoError(t, err)
220-
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
251+
seen = false
252+
before, _ = cache.GetLastProcessed()
253+
assert.Eventually(t, func() bool {
254+
last, ok := cache.GetLastProcessed()
255+
if !ok || !last.After(before) {
256+
return false
257+
}
258+
if !seen {
259+
seen = true
260+
before = last
261+
return false
262+
}
263+
return true
264+
}, time.Second, 1*time.Millisecond)
221265

222266
// Validate the expected cache contents
223267
assert.Equal(t, tC.expected, maps.Collect(cache.subs.All()))

0 commit comments

Comments
 (0)