diff --git a/pkg/database/cosmosdb/openshiftcluster_ext.go b/pkg/database/cosmosdb/openshiftcluster_ext.go deleted file mode 100644 index 9d80800e118..00000000000 --- a/pkg/database/cosmosdb/openshiftcluster_ext.go +++ /dev/null @@ -1,17 +0,0 @@ -package cosmosdb - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -// AllIteratorsConsumed returns whether all fake changefeeds have consumed their -// full contents -func (c *FakeOpenShiftClusterDocumentClient) AllIteratorsConsumed() bool { - c.lock.Lock() - defer c.lock.Unlock() - for _, i := range c.changeFeedIterators { - if !i.done { - return false - } - } - return true -} diff --git a/pkg/database/cosmosdb/subscriptions_ext.go b/pkg/database/cosmosdb/subscriptions_ext.go deleted file mode 100644 index c73bdba9b97..00000000000 --- a/pkg/database/cosmosdb/subscriptions_ext.go +++ /dev/null @@ -1,17 +0,0 @@ -package cosmosdb - -// Copyright (c) Microsoft Corporation. -// Licensed under the Apache License 2.0. - -// AllIteratorsConsumed returns whether all fake changefeeds have consumed their -// full contents -func (c *FakeSubscriptionDocumentClient) AllIteratorsConsumed() bool { - c.lock.Lock() - defer c.lock.Unlock() - for _, i := range c.changeFeedIterators { - if !i.done { - return false - } - } - return true -} diff --git a/pkg/monitor/cache.go b/pkg/monitor/cache.go index e047f82306b..5e175e935c5 100644 --- a/pkg/monitor/cache.go +++ b/pkg/monitor/cache.go @@ -14,33 +14,22 @@ type cacheDoc struct { // deleteDoc deletes the given document from mon.docs, signalling the associated // monitoring goroutine to stop if it exists. Caller must hold mon.mu.Lock. -func (mon *monitor) deleteDoc(doc *api.OpenShiftClusterDocument) { - v := mon.docs[doc.ID] - - if v != nil { - if v.stop != nil { - close(mon.docs[doc.ID].stop) - } - - delete(mon.docs, doc.ID) +func (mon *clusterChangeFeedResponder) deleteDoc(doc *api.OpenShiftClusterDocument) { + v, loaded := mon.docs.LoadAndDelete(doc.ID) + if loaded { + close(v.stop) } } // upsertDoc inserts or updates the given document into mon.docs, starting an // associated monitoring goroutine if the document is in a bucket owned by us. // Caller must hold mon.mu.Lock. -func (mon *monitor) upsertDoc(doc *api.OpenShiftClusterDocument) { - v := mon.docs[doc.ID] - - if v == nil { - v = &cacheDoc{} - mon.docs[doc.ID] = v +func (mon *clusterChangeFeedResponder) upsertDoc(doc *api.OpenShiftClusterDocument) { + v, loaded := mon.docs.LoadOrStore(doc.ID, &cacheDoc{doc: stripUnusedFields(doc)}) + if loaded { + v.doc = stripUnusedFields(doc) } - - // Strip unused fields to reduce memory usage. The monitor only needs - // a subset of the document fields for monitoring operations. - v.doc = stripUnusedFields(doc) - mon.fixDoc(v.doc) + mon.fixDoc(v) } // stripUnusedFields creates a copy of the document with only the fields @@ -149,17 +138,16 @@ func stripUnusedFields(doc *api.OpenShiftClusterDocument) *api.OpenShiftClusterD } // fixDocs ensures that there is a monitoring goroutine for all documents in all -// buckets owned by us. Caller must hold mon.mu.Lock. -func (mon *monitor) fixDocs() { - for _, v := range mon.docs { - mon.fixDoc(v.doc) +// buckets owned by us. +func (mon *clusterChangeFeedResponder) fixDocs() { + for _, v := range mon.docs.All() { + mon.fixDoc(v) } } // fixDoc ensures that there is a monitoring goroutine for the given document -// iff it is in a bucket owned by us. Caller must hold mon.mu.Lock. -func (mon *monitor) fixDoc(doc *api.OpenShiftClusterDocument) { - v := mon.docs[doc.ID] +// iff it is in a bucket owned by us. +func (mon *clusterChangeFeedResponder) fixDoc(v *cacheDoc) { _, ours := mon.buckets[v.doc.Bucket] if !ours && v.stop != nil { @@ -168,6 +156,6 @@ func (mon *monitor) fixDoc(doc *api.OpenShiftClusterDocument) { } else if ours && v.stop == nil { ch := make(chan struct{}) v.stop = ch - go mon.worker(ch, doc.ID) + go mon.newWorker(ch, v.doc.ID) } } diff --git a/pkg/monitor/cache_test.go b/pkg/monitor/cache_test.go index e3fcb0f81e5..ba7a9726639 100644 --- a/pkg/monitor/cache_test.go +++ b/pkg/monitor/cache_test.go @@ -27,9 +27,7 @@ func TestUpsertAndDelete(t *testing.T) { testMon := env.CreateTestMonitor("test-cache") // Set owned buckets for the entire test sequence ownedBuckets := []int{1, 2, 5} - for _, bucket := range ownedBuckets { - testMon.buckets[bucket] = struct{}{} - } + testMon.clusters.UpdateBuckets(ownedBuckets) type operation struct { name string @@ -48,10 +46,10 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 5, state: api.ProvisioningStateSucceeded, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 1 { - t.Errorf("%s: expected 1 document, got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 1 { + t.Errorf("%s: expected 1 document, got %d", stepName, mon.clusters.GetCacheSize()) } - cacheDoc, exists := mon.docs["cluster-1"] + cacheDoc, exists := mon.clusters.docs.Load("cluster-1") if !exists { t.Fatalf("%s: document was not added to cache", stepName) } @@ -70,10 +68,10 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 10, // not in owned buckets state: api.ProvisioningStateSucceeded, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 2 { - t.Errorf("%s: expected 2 documents, got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 2 { + t.Errorf("%s: expected 2 documents, got %d", stepName, mon.clusters.GetCacheSize()) } - cacheDoc, exists := mon.docs["cluster-2"] + cacheDoc, exists := mon.clusters.docs.Load("cluster-2") if !exists { t.Fatalf("%s: document was not added to cache", stepName) } @@ -89,10 +87,10 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 5, state: api.ProvisioningStateUpdating, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 2 { - t.Errorf("%s: expected 2 documents, got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 2 { + t.Errorf("%s: expected 2 documents, got %d", stepName, mon.clusters.GetCacheSize()) } - cacheDoc, exists := mon.docs["cluster-1"] + cacheDoc, exists := mon.clusters.docs.Load("cluster-1") if !exists { t.Fatalf("%s: document not found in cache", stepName) } @@ -111,10 +109,10 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 2, state: api.ProvisioningStateSucceeded, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 3 { - t.Errorf("%s: expected 3 documents, got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 3 { + t.Errorf("%s: expected 3 documents, got %d", stepName, mon.clusters.GetCacheSize()) } - cacheDoc, exists := mon.docs["cluster-3"] + cacheDoc, exists := mon.clusters.docs.Load("cluster-3") if !exists { t.Fatalf("%s: document was not added to cache", stepName) } @@ -130,17 +128,17 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 5, state: api.ProvisioningStateDeleting, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 2 { - t.Errorf("%s: expected 2 documents after deletion, got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 2 { + t.Errorf("%s: expected 2 documents after deletion, got %d", stepName, mon.clusters.GetCacheSize()) } - if _, exists := mon.docs["cluster-1"]; exists { + if _, exists := mon.clusters.GetCluster("cluster-1"); exists { t.Errorf("%s: document should have been deleted from cache", stepName) } // Verify other documents still exist - if _, exists := mon.docs["cluster-2"]; !exists { + if _, exists := mon.clusters.GetCluster("cluster-2"); !exists { t.Errorf("%s: cluster-2 should not be affected by deletion", stepName) } - if _, exists := mon.docs["cluster-3"]; !exists { + if _, exists := mon.clusters.GetCluster("cluster-3"); !exists { t.Errorf("%s: cluster-3 should not be affected by deletion", stepName) } }, @@ -152,14 +150,14 @@ func TestUpsertAndDelete(t *testing.T) { bucket: 5, state: api.ProvisioningStateDeleting, validate: func(t *testing.T, stepName string, mon *monitor) { - if len(mon.docs) != 2 { - t.Errorf("%s: expected 2 documents (no change), got %d", stepName, len(mon.docs)) + if mon.clusters.GetCacheSize() != 2 { + t.Errorf("%s: expected 2 documents (no change), got %d", stepName, mon.clusters.GetCacheSize()) } // Verify existing documents are unaffected - if _, exists := mon.docs["cluster-2"]; !exists { + if _, exists := mon.clusters.GetCluster("cluster-2"); !exists { t.Errorf("%s: existing documents should not be affected", stepName) } - if _, exists := mon.docs["cluster-3"]; !exists { + if _, exists := mon.clusters.GetCluster("cluster-3"); !exists { t.Errorf("%s: existing documents should not be affected", stepName) } }, @@ -172,19 +170,21 @@ func TestUpsertAndDelete(t *testing.T) { state: api.ProvisioningStateSucceeded, validate: func(t *testing.T, stepName string, mon *monitor) { // First verify worker exists - if mon.docs["cluster-3"].stop == nil { + cl, _ := mon.clusters.docs.Load("cluster-3") + if cl.stop == nil { t.Fatalf("%s: worker should exist before ownership change", stepName) } // Remove bucket ownership - delete(mon.buckets, 2) + delete(mon.clusters.buckets, 2) - // Call fixDoc + // Call upsertDoc doc := createMockClusterDoc("cluster-3", 2, api.ProvisioningStateSucceeded) - mon.fixDoc(doc) + mon.clusters.upsertDoc(doc) // Verify worker was stopped - if mon.docs["cluster-3"].stop != nil { + cl, _ = mon.clusters.docs.Load("cluster-3") + if cl.stop != nil { t.Errorf("%s: worker should be stopped when bucket no longer owned", stepName) } }, @@ -199,9 +199,9 @@ func TestUpsertAndDelete(t *testing.T) { switch op.action { case Upsert: - testMon.upsertDoc(doc) + testMon.clusters.upsertDoc(doc) case Delete: - testMon.deleteDoc(doc) + testMon.clusters.deleteDoc(doc) case NoOp: // Do nothing, we don't need to call any func for the test to run default: @@ -222,7 +222,7 @@ func TestConcurrentUpsert(t *testing.T) { doc := createMockClusterDoc("cluster-concurrent", 1, api.ProvisioningStateSucceeded) mon := env.CreateTestMonitor("cluster-concurrent") - mon.buckets[1] = struct{}{} + mon.clusters.buckets[1] = struct{}{} wg := sync.WaitGroup{} for i := 0; i < 3; i++ { wg.Add(1) @@ -231,14 +231,14 @@ func TestConcurrentUpsert(t *testing.T) { // Holding the lock for a random duration, up to a second // As we're upserting the same doc over and over, lenght should be 1 time.Sleep(time.Duration(rand.Intn(int(time.Second)))) - mon.upsertDoc(doc) + mon.clusters.upsertDoc(doc) mon.mu.Unlock() wg.Done() }() } wg.Wait() - if len(mon.docs) != 1 { - t.Errorf("Expected 1 doc after the same concurrent upsert, found %d", len(mon.docs)) + if mon.clusters.GetCacheSize() != 1 { + t.Errorf("Expected 1 doc after the same concurrent upsert, found %d", mon.clusters.GetCacheSize()) } } @@ -248,15 +248,16 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) { mon := env.CreateTestMonitor("test-channel-safety") - mon.buckets[5] = struct{}{} + mon.clusters.buckets[5] = struct{}{} doc := createMockClusterDoc("cluster-1", 5, api.ProvisioningStateSucceeded) mon.mu.Lock() - mon.upsertDoc(doc) + mon.clusters.upsertDoc(doc) mon.mu.Unlock() mon.mu.Lock() - if mon.docs["cluster-1"].stop == nil { + cl, _ := mon.clusters.docs.Load("cluster-1") + if cl.stop == nil { t.Fatal("worker should have been created") } mon.mu.Unlock() @@ -278,7 +279,7 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) { }() mon.mu.Lock() - mon.deleteDoc(doc) + mon.clusters.deleteDoc(doc) mon.mu.Unlock() }() } @@ -298,7 +299,7 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) { } mon.mu.Lock() - if _, exists := mon.docs["cluster-1"]; exists { + if _, exists := mon.clusters.GetCluster("cluster-1"); exists { t.Error("document should have been deleted") } mon.mu.Unlock() diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 8b60c0f4eae..551ac1ccfd8 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -56,17 +56,15 @@ type monitor struct { m metrics.Emitter clusterm metrics.Emitter mu sync.RWMutex - docs map[string]*cacheDoc + clusters *clusterChangeFeedResponder subs changefeed.SubscriptionsCache env env.Interface isMaster bool bucketCount int - buckets map[int]struct{} - lastBucketlist atomic.Value // time.Time - lastClusterChangefeed atomic.Value // time.Time - startTime time.Time + lastBucketlist atomic.Value // time.Time + startTime time.Time hiveClusterManagers map[int]hive.ClusterManager @@ -86,7 +84,7 @@ type Runnable interface { } func NewMonitor(log *logrus.Entry, dialer proxy.Dialer, dbGroup monitorDBs, m, clusterm metrics.Emitter, e env.Interface) Runnable { - return &monitor{ + mon := &monitor{ baseLog: log, dialer: dialer, @@ -94,12 +92,10 @@ func NewMonitor(log *logrus.Entry, dialer proxy.Dialer, dbGroup monitorDBs, m, c m: m, clusterm: clusterm, - docs: map[string]*cacheDoc{}, subs: changefeed.NewSubscriptionsChangefeedCache(true), env: e, bucketCount: bucket.Buckets, - buckets: map[int]struct{}{}, startTime: time.Now(), @@ -115,6 +111,9 @@ func NewMonitor(log *logrus.Entry, dialer proxy.Dialer, dbGroup monitorDBs, m, c readyIfChangefeedWithin: defaultChangefeedReadinessInterval, readyDelay: defaultMonitorReadinessDelay, } + + mon.clusters = NewClusterChangefeedResponder(log, mon.worker) + return mon } func (mon *monitor) Run(ctx context.Context) error { @@ -199,13 +198,12 @@ func (mon *monitor) startChangefeeds(ctx context.Context, stop <-chan struct{}) } // fill the cache from the database change feed - clusterResponder := &clusterChangeFeedResponder{mon: mon} go changefeed.RunChangefeed( ctx, mon.baseLog.WithField("component", "changefeed"), dbOpenShiftClusters.ChangeFeed(), // Align this time with the deletion mechanism. // Go to docs/monitoring.md for the details. mon.changefeedInterval, - changefeedBatchSize, clusterResponder, stop, + changefeedBatchSize, mon.clusters, stop, ) // fill the cache from the database change feed @@ -227,7 +225,7 @@ func (mon *monitor) checkReady() bool { if !ok { return false } - lastClusterChangefeedTime, ok := mon.lastClusterChangefeed.Load().(time.Time) + lastClusterChangefeedTime, ok := mon.clusters.GetLastProcessed() if !ok { return false } diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index ba677703167..38dc47a8f5f 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -96,8 +96,8 @@ func TestMonitor(t *testing.T) { // bucketcount is the total number of buckets that should be across all // workers, each one should have less than that w.mu.RLock() - require.Less(t, len(w.buckets), w.bucketCount) - buckets = slices.AppendSeq(buckets, maps.Keys(w.buckets)) + require.Less(t, len(w.clusters.buckets), w.bucketCount) + buckets = slices.AppendSeq(buckets, maps.Keys(w.clusters.buckets)) w.mu.RUnlock() } require.Len(t, buckets, 256) diff --git a/pkg/monitor/worker.go b/pkg/monitor/worker.go index 46eea2b610e..31cce4aa389 100644 --- a/pkg/monitor/worker.go +++ b/pkg/monitor/worker.go @@ -10,8 +10,10 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "time" + "github.com/puzpuzpuz/xsync/v4" "github.com/sirupsen/logrus" "github.com/Azure/go-autorest/autorest/azure" @@ -19,6 +21,7 @@ import ( "github.com/Azure/ARO-RP/pkg/api" "github.com/Azure/ARO-RP/pkg/monitor/dimension" "github.com/Azure/ARO-RP/pkg/monitor/monitoring" + "github.com/Azure/ARO-RP/pkg/util/changefeed" utillog "github.com/Azure/ARO-RP/pkg/util/log" "github.com/Azure/ARO-RP/pkg/util/recover" "github.com/Azure/ARO-RP/pkg/util/restconfig" @@ -53,34 +56,54 @@ func (mon *monitor) listBuckets(ctx context.Context) error { mon.mu.Lock() defer mon.mu.Unlock() - - oldBuckets := mon.buckets - mon.buckets = make(map[int]struct{}, len(buckets)) - - for _, i := range buckets { - mon.buckets[i] = struct{}{} - } - - if !reflect.DeepEqual(mon.buckets, oldBuckets) { - mon.baseLog.Printf("servicing %d buckets", len(mon.buckets)) - mon.fixDocs() - } + mon.clusters.UpdateBuckets(buckets) return err } type clusterChangeFeedResponder struct { - mon *monitor + log *logrus.Entry + docs *xsync.Map[string, *cacheDoc] + buckets map[int]struct{} + + lastChangefeedProcessed atomic.Value // time.Time + lastChangefeedDataUpdate atomic.Value // time.Time + + newWorker func(<-chan struct{}, string) } -func (c *clusterChangeFeedResponder) Lock() { - c.mon.mu.Lock() +func NewClusterChangefeedResponder(log *logrus.Entry, workerFunc func(<-chan struct{}, string)) *clusterChangeFeedResponder { + return &clusterChangeFeedResponder{ + log: log, + docs: xsync.NewMap[string, *cacheDoc](), + buckets: map[int]struct{}{}, + + newWorker: workerFunc, + } } -func (c *clusterChangeFeedResponder) Unlock() { - c.mon.mu.Unlock() +var _ changefeed.ChangefeedConsumer[*api.OpenShiftClusterDocument] = &clusterChangeFeedResponder{} + +// Update the buckets that we want to pay attention to. +func (c *clusterChangeFeedResponder) UpdateBuckets(buckets []int) { + oldBuckets := c.buckets + c.buckets = make(map[int]struct{}, len(buckets)) + + for _, i := range buckets { + c.buckets[i] = struct{}{} + } + + if !reflect.DeepEqual(c.buckets, oldBuckets) { + c.log.Printf("servicing %d buckets", len(c.buckets)) + c.fixDocs() + } } +// we don't use a mutex internally, we use a xsync.Map, so Lock/Unlock are +// no-ops +func (c *clusterChangeFeedResponder) Lock() {} +func (c *clusterChangeFeedResponder) Unlock() {} + func (c *clusterChangeFeedResponder) OnDoc(doc *api.OpenShiftClusterDocument) { ps := doc.OpenShiftCluster.Properties.ProvisioningState fps := doc.OpenShiftCluster.Properties.FailedProvisioningState @@ -99,14 +122,40 @@ func (c *clusterChangeFeedResponder) OnDoc(doc *api.OpenShiftClusterDocument) { // start deletion. // // If the cluster is already not monitored, deleteDoc will be a no-op. - c.mon.deleteDoc(doc) + c.deleteDoc(doc) default: - c.mon.upsertDoc(doc) + c.upsertDoc(doc) } } -func (c *clusterChangeFeedResponder) OnAllPendingProcessed() { - c.mon.lastClusterChangefeed.Store(time.Now()) +func (c *clusterChangeFeedResponder) GetCacheSize() int { + return c.docs.Size() +} + +func (c *clusterChangeFeedResponder) GetLastProcessed() (time.Time, bool) { + t, ok := c.lastChangefeedProcessed.Load().(time.Time) + return t, ok +} + +func (c *clusterChangeFeedResponder) GetLastDataUpdate() (time.Time, bool) { + t, ok := c.lastChangefeedDataUpdate.Load().(time.Time) + return t, ok +} + +func (c *clusterChangeFeedResponder) GetCluster(id string) (*api.OpenShiftClusterDocument, bool) { + v, ok := c.docs.Load(id) + if !ok { + return nil, ok + } + return v.doc, ok +} + +func (c *clusterChangeFeedResponder) OnAllPendingProcessed(didUpdate bool) { + now := time.Now() + c.lastChangefeedProcessed.Store(now) + if didUpdate { + c.lastChangefeedDataUpdate.Store(now) + } } // changefeedMetrics emits metrics tracking the size of the changefeed caches. @@ -116,7 +165,7 @@ func (mon *monitor) changefeedMetrics(stop <-chan struct{}) { t := time.NewTicker(time.Minute) defer t.Stop() for { - mon.m.EmitGauge("monitor.cache.size", int64(len(mon.docs)), map[string]string{"cache": "openshiftclusters"}) + mon.m.EmitGauge("monitor.cache.size", int64(mon.clusters.GetCacheSize()), map[string]string{"cache": "openshiftclusters"}) mon.m.EmitGauge("monitor.cache.size", int64(mon.subs.GetCacheSize()), map[string]string{"cache": "subscriptions"}) select { @@ -137,18 +186,15 @@ func (mon *monitor) worker(stop <-chan struct{}, id string) { log := mon.baseLog { - mon.mu.RLock() - v := mon.docs[id] - mon.mu.RUnlock() - - if v == nil { + doc, ok := mon.clusters.GetCluster(id) + if !ok { return } - log = utillog.EnrichWithResourceID(log, v.doc.OpenShiftCluster.ID) + log = utillog.EnrichWithResourceID(log, doc.OpenShiftCluster.ID) var err error - r, err = azure.ParseResourceID(v.doc.OpenShiftCluster.ID) + r, err = azure.ParseResourceID(doc.OpenShiftCluster.ID) if err != nil { log.Error(err) return @@ -168,15 +214,12 @@ func (mon *monitor) worker(stop <-chan struct{}, id string) { out: for { - mon.mu.RLock() - v := mon.docs[id] - mon.mu.RUnlock() - subID := strings.ToLower(r.SubscriptionID) - sub, subok := mon.subs.GetSubscription(subID) - - if v == nil { + doc, ok := mon.clusters.GetCluster(id) + if !ok { break } + subID := strings.ToLower(r.SubscriptionID) + sub, subok := mon.subs.GetSubscription(subID) newh := time.Now().Hour() @@ -184,7 +227,7 @@ out: // cached metrics in the remaining minutes if subok { - mon.workOne(context.Background(), log, v.doc, subID, sub.TenantID, newh != h, nsgMonitoringTicker) + mon.workOne(context.Background(), log, doc, subID, sub.TenantID, newh != h, nsgMonitoringTicker) } select { @@ -193,7 +236,7 @@ out: case <-subscriptionStateLoggingTicker.C: // The changefeed filters out subscriptions in invalid states if !subok { - log.Warningf("Skipped monitoring cluster %s because its subscription is in an invalid state", v.doc.OpenShiftCluster.ID) + log.Warningf("Skipped monitoring cluster %s because its subscription is in an invalid state", doc.OpenShiftCluster.ID) } default: } diff --git a/pkg/monitor/worker_test.go b/pkg/monitor/worker_test.go index 4b1e179e918..24293099694 100644 --- a/pkg/monitor/worker_test.go +++ b/pkg/monitor/worker_test.go @@ -62,6 +62,9 @@ func TestChangefeedOperations(t *testing.T) { stopChan := make(chan struct{}) + var lastClusterDataUpdate time.Time + var lastSubDataUpdate time.Time + mon.changefeedInterval = time.Millisecond * 5 mon.startChangefeeds(ctx, stopChan) @@ -139,12 +142,21 @@ func TestChangefeedOperations(t *testing.T) { } // Wait for changefeeds to be consumed - assert.Eventually(t, env.OpenShiftClusterClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond) - assert.Eventually(t, env.SubscriptionsClient.AllIteratorsConsumed, time.Second, 10*time.Millisecond) + assert.Eventually(t, func() bool { + lastProc, _ := mon.subs.GetLastProcessed() + lastData, _ := mon.subs.GetLastDataUpdate() + return lastData != lastSubDataUpdate && lastProc != lastData + }, time.Second, 1*time.Millisecond) + assert.Eventually(t, func() bool { + return mon.clusters.lastChangefeedDataUpdate.Load() != lastClusterDataUpdate && mon.clusters.lastChangefeedDataUpdate.Load() != mon.clusters.lastChangefeedProcessed.Load() + }, time.Second, 1*time.Millisecond) + + lastClusterDataUpdate = mon.clusters.lastChangefeedDataUpdate.Load().(time.Time) + lastSubDataUpdate, _ = mon.subs.GetLastDataUpdate() // Validate expected results - if len(mon.docs) != op.expectDocs { - t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, len(mon.docs)) + if mon.clusters.GetCacheSize() != op.expectDocs { + t.Errorf("%s: expected %d documents in cache, got %d", op.name, op.expectDocs, mon.clusters.GetCacheSize()) } if mon.subs.GetCacheSize() != op.expectSubs { t.Errorf("%s: expected %d subscriptions in cache, got %d", op.name, op.expectSubs, mon.subs.GetCacheSize()) diff --git a/pkg/util/changefeed/changefeed.go b/pkg/util/changefeed/changefeed.go index 63d2cf4a881..bfc53b0e116 100644 --- a/pkg/util/changefeed/changefeed.go +++ b/pkg/util/changefeed/changefeed.go @@ -19,8 +19,9 @@ import ( type ChangefeedConsumer[F any] interface { // OnDoc is called with each document returned from the list from Next() OnDoc(F) - // OnAllPendingProcessed is when no more pages are returned from Next() - OnAllPendingProcessed() + // OnAllPendingProcessed is when no more pages are returned from Next() with + // whether any documents were retrieved during this timer iteration + OnAllPendingProcessed(bool) // Lock is called before a page is processed Lock() // Unlock is called after a page is processed @@ -42,6 +43,7 @@ func RunChangefeed[F any, X api.DocumentList[F]]( defer t.Stop() for { + documentsRetrieved := false successful := true for { docs, err := iterator.Next(ctx, changefeedBatchSize) @@ -61,10 +63,11 @@ func RunChangefeed[F any, X api.DocumentList[F]]( responder.OnDoc(doc) } responder.Unlock() + documentsRetrieved = true } if successful { - responder.OnAllPendingProcessed() + responder.OnAllPendingProcessed(documentsRetrieved) } select { diff --git a/pkg/util/changefeed/changefeed_test.go b/pkg/util/changefeed/changefeed_test.go index a601f0e5588..218718216dd 100644 --- a/pkg/util/changefeed/changefeed_test.go +++ b/pkg/util/changefeed/changefeed_test.go @@ -51,7 +51,7 @@ type fakeResponder struct { unlocks int } -func (f *fakeResponder) OnAllPendingProcessed() { +func (f *fakeResponder) OnAllPendingProcessed(gotAny bool) { f.allPendingProcessed += 1 } func (f *fakeResponder) Lock() { f.locks += 1 } diff --git a/pkg/util/changefeed/subscriptioncache.go b/pkg/util/changefeed/subscriptioncache.go index 1ebd2df4f27..c5563cbcd7d 100644 --- a/pkg/util/changefeed/subscriptioncache.go +++ b/pkg/util/changefeed/subscriptioncache.go @@ -29,6 +29,7 @@ type SubscriptionsCache interface { GetCacheSize() int GetSubscription(string) (subscriptionInfo, bool) GetLastProcessed() (time.Time, bool) + GetLastDataUpdate() (time.Time, bool) WaitForInitialPopulation() } @@ -47,6 +48,7 @@ type subscriptionsChangeFeedResponder struct { // Do we want to only include valid (i.e. not suspended) subscriptions? onlyValidSubscriptions bool + lastChangefeedDataUpdate atomic.Value // time.Time lastChangefeedProcessed atomic.Value // time.Time initialPopulationWaitGroup *sync.WaitGroup @@ -73,6 +75,11 @@ func (c *subscriptionsChangeFeedResponder) GetLastProcessed() (time.Time, bool) return t, ok } +func (c *subscriptionsChangeFeedResponder) GetLastDataUpdate() (time.Time, bool) { + t, ok := c.lastChangefeedDataUpdate.Load().(time.Time) + return t, ok +} + // we don't use a mutex internally, we use a xsync.Map, so Lock/Unlock are // no-ops func (c *subscriptionsChangeFeedResponder) Lock() {} @@ -106,8 +113,12 @@ func (r *subscriptionsChangeFeedResponder) OnDoc(sub *api.SubscriptionDocument) }) } -func (c *subscriptionsChangeFeedResponder) OnAllPendingProcessed() { - old := c.lastChangefeedProcessed.Swap(time.Now()) +func (c *subscriptionsChangeFeedResponder) OnAllPendingProcessed(gotAny bool) { + now := time.Now() + old := c.lastChangefeedProcessed.Swap(now) + if gotAny { + c.lastChangefeedDataUpdate.Store(now) + } // we've consumed the initial documents, unlock the waitgroup if old == nil { c.initialPopulationWaitGroup.Done() diff --git a/pkg/util/changefeed/subscriptioncache_test.go b/pkg/util/changefeed/subscriptioncache_test.go index 4e35cdd884c..d0eee013899 100644 --- a/pkg/util/changefeed/subscriptioncache_test.go +++ b/pkg/util/changefeed/subscriptioncache_test.go @@ -70,7 +70,7 @@ func TestSubscriptionChangefeed(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { startedTime := time.Now().UnixNano() - subscriptionsDB, subscriptionsClient := testdatabase.NewFakeSubscriptions() + subscriptionsDB, _ := testdatabase.NewFakeSubscriptions() _, log := testlog.LogForTesting(t) // need to register the changefeed before making documents @@ -139,6 +139,7 @@ func TestSubscriptionChangefeed(t *testing.T) { ) require.NoError(t, fixtures.Create()) + var lastProcessed time.Time cache := NewSubscriptionsChangefeedCache(tC.validOnly) stop := make(chan struct{}) @@ -147,7 +148,10 @@ func TestSubscriptionChangefeed(t *testing.T) { go RunChangefeed(t.Context(), log, subscriptionChangefeed, 100*time.Microsecond, 1, cache, stop) cache.WaitForInitialPopulation() - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + assert.Eventually(t, func() bool { + return cache.lastChangefeedDataUpdate.Load() != lastProcessed && cache.lastChangefeedDataUpdate.Load() != cache.lastChangefeedProcessed.Load() + }, time.Second, 1*time.Millisecond) + lastProcessed = cache.lastChangefeedDataUpdate.Load().(time.Time) // Create some after initially populated _, err := subscriptionsDB.Create(t.Context(), &api.SubscriptionDocument{ @@ -185,7 +189,10 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + assert.Eventually(t, func() bool { + return cache.lastChangefeedDataUpdate.Load() != lastProcessed && cache.lastChangefeedDataUpdate.Load() != cache.lastChangefeedProcessed.Load() + }, time.Second, 1*time.Millisecond) + lastProcessed = cache.lastChangefeedDataUpdate.Load().(time.Time) // Switch a registered to suspended old2, err := subscriptionsDB.Get(t.Context(), "8c90b62a-3783-4ea6-a8c8-cbaee4667ffd") @@ -201,7 +208,10 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + assert.Eventually(t, func() bool { + return cache.lastChangefeedDataUpdate.Load() != lastProcessed && cache.lastChangefeedDataUpdate.Load() != cache.lastChangefeedProcessed.Load() + }, time.Second, 1*time.Millisecond) + lastProcessed = cache.lastChangefeedDataUpdate.Load().(time.Time) // Switch a registered to deleted old3, err := subscriptionsDB.Get(t.Context(), "4e07b0f5-c789-4817-9079-94012b04e1c9") @@ -217,7 +227,9 @@ func TestSubscriptionChangefeed(t *testing.T) { }, }) require.NoError(t, err) - assert.Eventually(t, subscriptionsClient.AllIteratorsConsumed, time.Second, 1*time.Millisecond) + assert.Eventually(t, func() bool { + return cache.lastChangefeedDataUpdate.Load() != lastProcessed && cache.lastChangefeedDataUpdate.Load() != cache.lastChangefeedProcessed.Load() + }, time.Second, 1*time.Millisecond) // Validate the expected cache contents assert.Equal(t, tC.expected, maps.Collect(cache.subs.All()))