Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ unit-test-go: $(GOTESTSUM)
unit-test-go-coverpkg: $(GOTESTSUM)
$(GOTESTSUM) --format pkgname --junitfile report.xml -- -coverpkg=./... -coverprofile=cover_coverpkg.out ./...

.PHONY: test-go-race
test-go-race:
go test -count=50 -race ./...

.PHONY: fmt
fmt: $(GOLANGCI_LINT) ## Format Go source files using golangci-lint formatters (gci, gofumpt)
$(GOLANGCI_LINT) fmt
Expand Down
17 changes: 0 additions & 17 deletions pkg/database/cosmosdb/openshiftcluster_ext.go

This file was deleted.

17 changes: 0 additions & 17 deletions pkg/database/cosmosdb/subscriptions_ext.go

This file was deleted.

52 changes: 24 additions & 28 deletions pkg/monitor/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,24 @@ var fakeClusterVisitMonitoringAttempts = map[string]*int{}

// TestEnvironment contains all the test setup components
type TestEnvironment struct {
OpenShiftClusterDB database.OpenShiftClusters
SubscriptionsDB database.Subscriptions
MonitorsDB database.Monitors
OpenShiftClusterClient *cosmosdb.FakeOpenShiftClusterDocumentClient
SubscriptionsClient *cosmosdb.FakeSubscriptionDocumentClient
FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient
Controller *gomock.Controller
TestLogger *logrus.Entry
Dialer *mock_proxy.MockDialer
MockEnv *mock_env.MockInterface
NoopMetricsEmitter noop.Noop
NoopClusterMetrics noop.Noop
DBGroup monitorDBs
OpenShiftClusterDB database.OpenShiftClusters
SubscriptionsDB database.Subscriptions
MonitorsDB database.Monitors
FakeMonitorsDBClient *cosmosdb.FakeMonitorDocumentClient
Controller *gomock.Controller
TestLogger *logrus.Entry
Dialer *mock_proxy.MockDialer
MockEnv *mock_env.MockInterface
NoopMetricsEmitter noop.Noop
NoopClusterMetrics noop.Noop
DBGroup monitorDBs
}

// SetupTestEnvironment creates a common test environment for monitor tests
func SetupTestEnvironment(t *testing.T) *TestEnvironment {
// Create databases
openShiftClusterDB, openShiftClusterClient := testdatabase.NewFakeOpenShiftClusters()
subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions()
openShiftClusterDB, _ := testdatabase.NewFakeOpenShiftClusters()
subscriptionsDB, _ := testdatabase.NewFakeSubscriptions()
monitorsDB, fakeMonitorsDBClient := testdatabase.NewFakeMonitors()

// Create mocks
Expand Down Expand Up @@ -85,19 +83,17 @@ func SetupTestEnvironment(t *testing.T) *TestEnvironment {
f.Create()

return &TestEnvironment{
OpenShiftClusterDB: openShiftClusterDB,
SubscriptionsDB: subscriptionsDB,
MonitorsDB: monitorsDB,
OpenShiftClusterClient: openShiftClusterClient,
SubscriptionsClient: subscriptionsClient,
FakeMonitorsDBClient: fakeMonitorsDBClient,
Controller: ctrl,
TestLogger: testlogger,
Dialer: dialer,
MockEnv: mockEnv,
NoopMetricsEmitter: noopMetricsEmitter,
NoopClusterMetrics: noopClusterMetricsEmitter,
DBGroup: dbs,
OpenShiftClusterDB: openShiftClusterDB,
SubscriptionsDB: subscriptionsDB,
MonitorsDB: monitorsDB,
FakeMonitorsDBClient: fakeMonitorsDBClient,
Controller: ctrl,
TestLogger: testlogger,
Dialer: dialer,
MockEnv: mockEnv,
NoopMetricsEmitter: noopMetricsEmitter,
NoopClusterMetrics: noopClusterMetricsEmitter,
DBGroup: dbs,
}
}

Expand Down
41 changes: 36 additions & 5 deletions pkg/monitor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,44 @@ func TestChangefeedOperations(t *testing.T) {
}
}

// Wait for changefeeds to be consumed
assert.Eventually(t, env.OpenShiftClusterClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond)
assert.Eventually(t, env.SubscriptionsClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond)
// Require two GetLastProcessed() advancements; one sweep may fire OnAllPendingProcessed() before the mutation is visible.
beforeCluster, _ := mon.lastClusterChangefeed.Load().(time.Time)
beforeSubs, _ := mon.subs.GetLastProcessed()

seenClusterAdvance := false
assert.Eventually(t, func() bool {
ts, ok := mon.lastClusterChangefeed.Load().(time.Time)
if !ok || !ts.After(beforeCluster) {
return false
}
if !seenClusterAdvance {
seenClusterAdvance = true
beforeCluster = ts
return false
}
return true
}, time.Second, 10*time.Millisecond)

seenSubsAdvance := false
assert.Eventually(t, func() bool {
last, ok := mon.subs.GetLastProcessed()
if !ok || !last.After(beforeSubs) {
return false
}
if !seenSubsAdvance {
seenSubsAdvance = true
beforeSubs = last
return false
}
return true
}, time.Second, 10*time.Millisecond)

// Validate expected results
if len(mon.docs) != op.expectDocs {
t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, len(mon.docs))
mon.mu.RLock()
nDocs := len(mon.docs)
mon.mu.RUnlock()
if nDocs != op.expectDocs {
t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, nDocs)
}
if mon.subs.GetCacheSize() != op.expectSubs {
t.Errorf("%s: expected %d subscriptions in cache, got %d", op.name, op.expectSubs, mon.subs.GetCacheSize())
Expand Down
49 changes: 44 additions & 5 deletions pkg/util/changefeed/subscriptioncache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSubscriptionChangefeed(t *testing.T) {
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
startedTime := time.Now().UnixNano()
subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions()
subscriptionsDB, _ := testdatabase.NewFakeSubscriptions()
_, log := testlog.LogForTesting(t)

// need to register the changefeed before making documents
Expand Down Expand Up @@ -147,7 +147,6 @@ func TestSubscriptionChangefeed(t *testing.T) {
go RunChangefeed(t.Context(), log, subscriptionChangefeed, 100*time.Microsecond, 1, cache, stop)

cache.WaitForInitialPopulation()
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)

// Create some after initially populated
_, err := subscriptionsDB.Create(t.Context(), &api.SubscriptionDocument{
Expand Down Expand Up @@ -185,7 +184,21 @@ func TestSubscriptionChangefeed(t *testing.T) {
},
})
require.NoError(t, err)
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
// Require two GetLastProcessed() advancements; one sweep may fire OnAllPendingProcessed() before the mutation is visible.
seen := false
before, _ := cache.GetLastProcessed()
assert.Eventually(t, func() bool {
last, ok := cache.GetLastProcessed()
if !ok || !last.After(before) {
return false
}
if !seen {
seen = true
before = last
return false
}
return true
}, time.Second, 1*time.Millisecond)

// Switch a registered to suspended
old2, err := subscriptionsDB.Get(t.Context(), "8c90b62a-3783-4ea6-a8c8-cbaee4667ffd")
Expand All @@ -201,7 +214,20 @@ func TestSubscriptionChangefeed(t *testing.T) {
},
})
require.NoError(t, err)
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
seen = false
before, _ = cache.GetLastProcessed()
assert.Eventually(t, func() bool {
last, ok := cache.GetLastProcessed()
if !ok || !last.After(before) {
return false
}
if !seen {
seen = true
before = last
return false
}
return true
}, time.Second, 1*time.Millisecond)

// Switch a registered to deleted
old3, err := subscriptionsDB.Get(t.Context(), "4e07b0f5-c789-4817-9079-94012b04e1c9")
Expand All @@ -217,7 +243,20 @@ func TestSubscriptionChangefeed(t *testing.T) {
},
})
require.NoError(t, err)
assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond)
seen = false
before, _ = cache.GetLastProcessed()
assert.Eventually(t, func() bool {
last, ok := cache.GetLastProcessed()
if !ok || !last.After(before) {
return false
}
if !seen {
seen = true
before = last
return false
}
return true
}, time.Second, 1*time.Millisecond)

// Validate the expected cache contents
assert.Equal(t, tC.expected, maps.Collect(cache.subs.All()))
Expand Down
Loading