Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions pkg/database/cosmosdb/openshiftcluster_ext.go

This file was deleted.

17 changes: 0 additions & 17 deletions pkg/database/cosmosdb/subscriptions_ext.go

This file was deleted.

44 changes: 16 additions & 28 deletions pkg/monitor/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,22 @@ type cacheDoc struct {

// deleteDoc deletes the given document from mon.docs, signalling the associated
// monitoring goroutine to stop if it exists. Caller must hold mon.mu.Lock.
func (mon *monitor) deleteDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]

if v != nil {
if v.stop != nil {
close(mon.docs[doc.ID].stop)
}

delete(mon.docs, doc.ID)
func (mon *clusterChangeFeedResponder) deleteDoc(doc *api.OpenShiftClusterDocument) {
v, loaded := mon.docs.LoadAndDelete(doc.ID)
if loaded {
close(v.stop)
}
}

// upsertDoc inserts or updates the given document into mon.docs, starting an
// associated monitoring goroutine if the document is in a bucket owned by us.
// Caller must hold mon.mu.Lock.
func (mon *monitor) upsertDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]

if v == nil {
v = &cacheDoc{}
mon.docs[doc.ID] = v
func (mon *clusterChangeFeedResponder) upsertDoc(doc *api.OpenShiftClusterDocument) {
v, loaded := mon.docs.LoadOrStore(doc.ID, &cacheDoc{doc: stripUnusedFields(doc)})
if loaded {
v.doc = stripUnusedFields(doc)
}

// Strip unused fields to reduce memory usage. The monitor only needs
// a subset of the document fields for monitoring operations.
v.doc = stripUnusedFields(doc)
mon.fixDoc(v.doc)
mon.fixDoc(v)
}

// stripUnusedFields creates a copy of the document with only the fields
Expand Down Expand Up @@ -149,17 +138,16 @@ func stripUnusedFields(doc *api.OpenShiftClusterDocument) *api.OpenShiftClusterD
}

// fixDocs ensures that there is a monitoring goroutine for all documents in all
// buckets owned by us. Caller must hold mon.mu.Lock.
func (mon *monitor) fixDocs() {
for _, v := range mon.docs {
mon.fixDoc(v.doc)
// buckets owned by us.
func (mon *clusterChangeFeedResponder) fixDocs() {
for _, v := range mon.docs.All() {
mon.fixDoc(v)
}
}

// fixDoc ensures that there is a monitoring goroutine for the given document
// iff it is in a bucket owned by us. Caller must hold mon.mu.Lock.
func (mon *monitor) fixDoc(doc *api.OpenShiftClusterDocument) {
v := mon.docs[doc.ID]
// iff it is in a bucket owned by us.
func (mon *clusterChangeFeedResponder) fixDoc(v *cacheDoc) {
_, ours := mon.buckets[v.doc.Bucket]

if !ours && v.stop != nil {
Expand All @@ -168,6 +156,6 @@ func (mon *monitor) fixDoc(doc *api.OpenShiftClusterDocument) {
} else if ours && v.stop == nil {
ch := make(chan struct{})
v.stop = ch
go mon.worker(ch, doc.ID)
go mon.newWorker(ch, v.doc.ID)
}
}
81 changes: 41 additions & 40 deletions pkg/monitor/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ func TestUpsertAndDelete(t *testing.T) {
testMon := env.CreateTestMonitor("test-cache")
// Set owned buckets for the entire test sequence
ownedBuckets := []int{1, 2, 5}
for _, bucket := range ownedBuckets {
testMon.buckets[bucket] = struct{}{}
}
testMon.clusters.UpdateBuckets(ownedBuckets)

type operation struct {
name string
Expand All @@ -48,10 +46,10 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 5,
state: api.ProvisioningStateSucceeded,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 1 {
t.Errorf("%s: expected 1 document, got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 1 {
t.Errorf("%s: expected 1 document, got %d", stepName, mon.clusters.GetCacheSize())
}
cacheDoc, exists := mon.docs["cluster-1"]
cacheDoc, exists := mon.clusters.docs.Load("cluster-1")
if !exists {
t.Fatalf("%s: document was not added to cache", stepName)
}
Expand All @@ -70,10 +68,10 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 10, // not in owned buckets
state: api.ProvisioningStateSucceeded,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 2 {
t.Errorf("%s: expected 2 documents, got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 2 {
t.Errorf("%s: expected 2 documents, got %d", stepName, mon.clusters.GetCacheSize())
}
cacheDoc, exists := mon.docs["cluster-2"]
cacheDoc, exists := mon.clusters.docs.Load("cluster-2")
if !exists {
t.Fatalf("%s: document was not added to cache", stepName)
}
Expand All @@ -89,10 +87,10 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 5,
state: api.ProvisioningStateUpdating,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 2 {
t.Errorf("%s: expected 2 documents, got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 2 {
t.Errorf("%s: expected 2 documents, got %d", stepName, mon.clusters.GetCacheSize())
}
cacheDoc, exists := mon.docs["cluster-1"]
cacheDoc, exists := mon.clusters.docs.Load("cluster-1")
if !exists {
t.Fatalf("%s: document not found in cache", stepName)
}
Expand All @@ -111,10 +109,10 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 2,
state: api.ProvisioningStateSucceeded,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 3 {
t.Errorf("%s: expected 3 documents, got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 3 {
t.Errorf("%s: expected 3 documents, got %d", stepName, mon.clusters.GetCacheSize())
}
cacheDoc, exists := mon.docs["cluster-3"]
cacheDoc, exists := mon.clusters.docs.Load("cluster-3")
if !exists {
t.Fatalf("%s: document was not added to cache", stepName)
}
Expand All @@ -130,17 +128,17 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 5,
state: api.ProvisioningStateDeleting,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 2 {
t.Errorf("%s: expected 2 documents after deletion, got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 2 {
t.Errorf("%s: expected 2 documents after deletion, got %d", stepName, mon.clusters.GetCacheSize())
}
if _, exists := mon.docs["cluster-1"]; exists {
if _, exists := mon.clusters.GetCluster("cluster-1"); exists {
t.Errorf("%s: document should have been deleted from cache", stepName)
}
// Verify other documents still exist
if _, exists := mon.docs["cluster-2"]; !exists {
if _, exists := mon.clusters.GetCluster("cluster-2"); !exists {
t.Errorf("%s: cluster-2 should not be affected by deletion", stepName)
}
if _, exists := mon.docs["cluster-3"]; !exists {
if _, exists := mon.clusters.GetCluster("cluster-3"); !exists {
t.Errorf("%s: cluster-3 should not be affected by deletion", stepName)
}
},
Expand All @@ -152,14 +150,14 @@ func TestUpsertAndDelete(t *testing.T) {
bucket: 5,
state: api.ProvisioningStateDeleting,
validate: func(t *testing.T, stepName string, mon *monitor) {
if len(mon.docs) != 2 {
t.Errorf("%s: expected 2 documents (no change), got %d", stepName, len(mon.docs))
if mon.clusters.GetCacheSize() != 2 {
t.Errorf("%s: expected 2 documents (no change), got %d", stepName, mon.clusters.GetCacheSize())
}
// Verify existing documents are unaffected
if _, exists := mon.docs["cluster-2"]; !exists {
if _, exists := mon.clusters.GetCluster("cluster-2"); !exists {
t.Errorf("%s: existing documents should not be affected", stepName)
}
if _, exists := mon.docs["cluster-3"]; !exists {
if _, exists := mon.clusters.GetCluster("cluster-3"); !exists {
t.Errorf("%s: existing documents should not be affected", stepName)
}
},
Expand All @@ -172,19 +170,21 @@ func TestUpsertAndDelete(t *testing.T) {
state: api.ProvisioningStateSucceeded,
validate: func(t *testing.T, stepName string, mon *monitor) {
// First verify worker exists
if mon.docs["cluster-3"].stop == nil {
cl, _ := mon.clusters.docs.Load("cluster-3")
if cl.stop == nil {
t.Fatalf("%s: worker should exist before ownership change", stepName)
}

// Remove bucket ownership
delete(mon.buckets, 2)
delete(mon.clusters.buckets, 2)

// Call fixDoc
// Call upsertDoc
doc := createMockClusterDoc("cluster-3", 2, api.ProvisioningStateSucceeded)
mon.fixDoc(doc)
mon.clusters.upsertDoc(doc)

// Verify worker was stopped
if mon.docs["cluster-3"].stop != nil {
cl, _ = mon.clusters.docs.Load("cluster-3")
if cl.stop != nil {
t.Errorf("%s: worker should be stopped when bucket no longer owned", stepName)
}
},
Expand All @@ -199,9 +199,9 @@ func TestUpsertAndDelete(t *testing.T) {

switch op.action {
case Upsert:
testMon.upsertDoc(doc)
testMon.clusters.upsertDoc(doc)
case Delete:
testMon.deleteDoc(doc)
testMon.clusters.deleteDoc(doc)
case NoOp:
// Do nothing, we don't need to call any func for the test to run
default:
Expand All @@ -222,7 +222,7 @@ func TestConcurrentUpsert(t *testing.T) {

doc := createMockClusterDoc("cluster-concurrent", 1, api.ProvisioningStateSucceeded)
mon := env.CreateTestMonitor("cluster-concurrent")
mon.buckets[1] = struct{}{}
mon.clusters.buckets[1] = struct{}{}
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
Expand All @@ -231,14 +231,14 @@ func TestConcurrentUpsert(t *testing.T) {
// Holding the lock for a random duration, up to a second
// As we're upserting the same doc over and over, lenght should be 1
time.Sleep(time.Duration(rand.Intn(int(time.Second))))
mon.upsertDoc(doc)
mon.clusters.upsertDoc(doc)
mon.mu.Unlock()
wg.Done()
}()
}
wg.Wait()
if len(mon.docs) != 1 {
t.Errorf("Expected 1 doc after the same concurrent upsert, found %d", len(mon.docs))
if mon.clusters.GetCacheSize() != 1 {
t.Errorf("Expected 1 doc after the same concurrent upsert, found %d", mon.clusters.GetCacheSize())
}
}

Expand All @@ -248,15 +248,16 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) {

mon := env.CreateTestMonitor("test-channel-safety")

mon.buckets[5] = struct{}{}
mon.clusters.buckets[5] = struct{}{}

doc := createMockClusterDoc("cluster-1", 5, api.ProvisioningStateSucceeded)
mon.mu.Lock()
mon.upsertDoc(doc)
mon.clusters.upsertDoc(doc)
mon.mu.Unlock()

mon.mu.Lock()
if mon.docs["cluster-1"].stop == nil {
cl, _ := mon.clusters.docs.Load("cluster-1")
if cl.stop == nil {
t.Fatal("worker should have been created")
}
mon.mu.Unlock()
Expand All @@ -278,7 +279,7 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) {
}()

mon.mu.Lock()
mon.deleteDoc(doc)
mon.clusters.deleteDoc(doc)
mon.mu.Unlock()
}()
}
Expand All @@ -298,7 +299,7 @@ func TestConcurrentDeleteChannelCloseSafety(t *testing.T) {
}

mon.mu.Lock()
if _, exists := mon.docs["cluster-1"]; exists {
if _, exists := mon.clusters.GetCluster("cluster-1"); exists {
t.Error("document should have been deleted")
}
mon.mu.Unlock()
Expand Down
Loading
Loading