Skip to content

Commit d469545

Browse files
authored
CBG-4746: Deduplicate channel set outside processEntry (#7861)
1 parent 2d69700 commit d469545

File tree

3 files changed

+72
-57
lines changed

3 files changed

+72
-57
lines changed

db/change_cache.go

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type changeCache struct {
6868
initialSequence uint64 // DB's current sequence at startup time.
6969
receivedSeqs map[uint64]struct{} // Set of all sequences received
7070
pendingLogs LogPriorityQueue // Out-of-sequence entries waiting to be cached
71-
notifyChange func(context.Context, channels.Set) // Client callback that notifies of channel changes
71+
notifyChangeFunc func(context.Context, channels.Set) // Client callback that notifies of channel changes
7272
started base.AtomicBool // Set by the Start method
7373
stopped base.AtomicBool // Set by the Stop method
7474
skippedSeqs *SkippedSequenceSkiplist // Skipped sequences still pending on the DCP caching feed
@@ -145,15 +145,15 @@ func DefaultCacheOptions() CacheOptions {
145145

146146
// Initializes a new changeCache.
147147
// lastSequence is the last known database sequence assigned.
148-
// notifyChange is an optional function that will be called to notify of channel changes.
148+
// notifyChangeFunc is an optional function that will be called to notify of channel changes.
149149
// After calling Init(), you must call .Start() to start using the cache, otherwise it will be in a locked state
150150
// and callers will block on trying to obtain the lock.
151151

152152
func (c *changeCache) Init(ctx context.Context, dbContext *DatabaseContext, channelCache ChannelCache, notifyChange func(context.Context, channels.Set), options *CacheOptions, metaKeys *base.MetadataKeys) error {
153153
c.db = dbContext
154154
c.logCtx = ctx
155155

156-
c.notifyChange = notifyChange
156+
c.notifyChangeFunc = notifyChange
157157
c.receivedSeqs = make(map[uint64]struct{})
158158
c.terminator = make(chan bool)
159159
c.initTime = time.Now()
@@ -277,11 +277,10 @@ func (c *changeCache) InsertPendingEntries(ctx context.Context) error {
277277
// Trigger _addPendingLogs to process any entries that have been pending too long:
278278
c.lock.Lock()
279279
changedChannels := c._addPendingLogs(ctx)
280-
if c.notifyChange != nil && len(changedChannels) > 0 {
281-
c.notifyChange(ctx, changedChannels)
282-
}
283280
c.lock.Unlock()
284281

282+
c.notifyChange(ctx, changedChannels)
283+
285284
return nil
286285
}
287286

@@ -450,7 +449,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
450449
UnusedSequence: true,
451450
}
452451
changedChannels := c.processEntry(ctx, change)
453-
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
452+
channelSet := channels.SetFromArrayNoValidate(changedChannels)
453+
changedChannelsCombined = changedChannelsCombined.Update(channelSet)
454454
}
455455
base.DebugfCtx(ctx, base.KeyCache, "Received unused sequences in unused_sequences property for (%q / %q): %v", base.UD(docID), syncData.GetRevTreeID(), syncData.UnusedSequences)
456456
}
@@ -498,7 +498,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
498498
}
499499

500500
changedChannels := c.processEntry(ctx, change)
501-
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
501+
channelSet := channels.SetFromArrayNoValidate(changedChannels)
502+
changedChannelsCombined = changedChannelsCombined.Update(channelSet)
502503
}
503504
}
504505
if len(seqsCached) > 0 {
@@ -544,11 +545,12 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent, docType DocumentType)
544545
}
545546

546547
changedChannels := c.processEntry(ctx, change)
547-
changedChannelsCombined = changedChannelsCombined.Update(changedChannels)
548+
channelSet := channels.SetFromArrayNoValidate(changedChannels)
549+
changedChannelsCombined = changedChannelsCombined.Update(channelSet)
548550

549551
// Notify change listeners for all of the changed channels
550-
if c.notifyChange != nil && len(changedChannelsCombined) > 0 {
551-
c.notifyChange(ctx, changedChannelsCombined)
552+
if c.notifyChangeFunc != nil && len(changedChannelsCombined) > 0 {
553+
c.notifyChangeFunc(ctx, changedChannelsCombined)
552554
}
553555

554556
}
@@ -559,6 +561,13 @@ type cachePrincipal struct {
559561
Sequence uint64 `json:"sequence"`
560562
}
561563

564+
func (c *changeCache) notifyChange(ctx context.Context, chs []channels.ID) {
565+
if c.notifyChangeFunc == nil || len(chs) == 0 {
566+
return
567+
}
568+
c.notifyChangeFunc(ctx, channels.SetFromArrayNoValidate(chs))
569+
}
570+
562571
func (c *changeCache) Remove(ctx context.Context, collectionID uint32, docIDs []string, startTime time.Time) (count int) {
563572
return c.channelCache.Remove(ctx, collectionID, docIDs, startTime)
564573
}
@@ -592,14 +601,16 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
592601

593602
// Since processEntry may unblock pending sequences, if there were any changed channels we need
594603
// to notify any change listeners that are working changes feeds for these channels
604+
var channelSet channels.Set
595605
changedChannels := c.processEntry(ctx, change)
596606
if changedChannels == nil {
597-
changedChannels = channels.SetOfNoValidate(unusedSeqChannelID)
607+
channelSet = channels.SetOfNoValidate(unusedSeqChannelID)
598608
} else {
599-
changedChannels.Add(unusedSeqChannelID)
609+
channelSet = channels.SetFromArrayNoValidate(changedChannels)
610+
channelSet.Add(unusedSeqChannelID)
600611
}
601-
if c.notifyChange != nil && len(changedChannels) > 0 {
602-
c.notifyChange(ctx, changedChannels)
612+
if c.notifyChangeFunc != nil && len(channelSet) > 0 {
613+
c.notifyChangeFunc(ctx, channelSet)
603614
}
604615
}
605616

@@ -619,36 +630,38 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
619630
UnusedSequence: true,
620631
}
621632
changedChannels := c.processEntry(ctx, change)
622-
allChangedChannels = allChangedChannels.Update(changedChannels)
623-
if c.notifyChange != nil {
624-
c.notifyChange(ctx, allChangedChannels)
633+
channelSet := channels.SetFromArrayNoValidate(changedChannels)
634+
allChangedChannels = allChangedChannels.Update(channelSet)
635+
if c.notifyChangeFunc != nil {
636+
c.notifyChangeFunc(ctx, allChangedChannels)
625637
}
626638
return
627639
}
628640

629641
// push unused range to either pending or skipped lists based on current state of the change cache
630-
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)
642+
changedChannels := c.processUnusedRange(ctx, fromSequence, toSequence, timeReceived)
643+
allChangedChannels.Update(channels.SetFromArrayNoValidate(changedChannels))
631644

632-
if c.notifyChange != nil {
633-
c.notifyChange(ctx, allChangedChannels)
645+
if c.notifyChangeFunc != nil {
646+
c.notifyChangeFunc(ctx, allChangedChannels)
634647
}
635648
}
636649

637650
// processUnusedRange handles pushing unused range to pending or skipped lists
638-
func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSequence uint64, allChangedChannels channels.Set, timeReceived channels.FeedTimestamp) channels.Set {
651+
func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSequence uint64, timeReceived channels.FeedTimestamp) []channels.ID {
639652
c.lock.Lock()
640653
defer c.lock.Unlock()
641654

642655
var numSkipped int64
656+
var changedChannels []channels.ID
643657
if toSequence < c.nextSequence {
644658
// batch remove from skipped
645659
numSkipped = c.skippedSeqs.processUnusedSequenceRangeAtSkipped(ctx, fromSequence, toSequence)
646660
} else if fromSequence >= c.nextSequence {
647661
// whole range to pending
648662
c._pushRangeToPending(fromSequence, toSequence, timeReceived)
649663
// unblock any pending sequences we can after new range(s) have been pushed to pending
650-
changedChannels := c._addPendingLogs(ctx)
651-
allChangedChannels = allChangedChannels.Update(changedChannels)
664+
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
652665
c.internalStats.pendingSeqLen = len(c.pendingLogs)
653666
} else {
654667
// An unused sequence range than includes c.nextSequence in the middle of the range
@@ -662,7 +675,7 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
662675
if numSkipped == 0 {
663676
c.db.BroadcastSlowMode.CompareAndSwap(true, false)
664677
}
665-
return allChangedChannels
678+
return changedChannels
666679
}
667680

668681
// _pushRangeToPending will push an unused sequence range to pendingLogs
@@ -732,13 +745,14 @@ func (c *changeCache) processPrincipalDoc(ctx context.Context, docID string, doc
732745
base.InfofCtx(ctx, base.KeyChanges, "Received #%d (%q)", change.Sequence, base.UD(change.DocID))
733746

734747
changedChannels := c.processEntry(ctx, change)
735-
if c.notifyChange != nil && len(changedChannels) > 0 {
736-
c.notifyChange(ctx, changedChannels)
737-
}
748+
749+
c.notifyChange(ctx, changedChannels)
738750
}
739751

740-
// Handles a newly-arrived LogEntry.
741-
func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channels.Set {
752+
// processEntry handles a newly-arrived LogEntry and returns the changes channels from this revision.
753+
// This can be any existing, removed or newly added channels. Its possible for channels slice returned to have duplicates
754+
// in it. It is the callers responsibility to de-duplicate before notifying any changes.
755+
func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) []channels.ID {
742756
c.lock.Lock()
743757
defer c.lock.Unlock()
744758
if c.logsDisabled {
@@ -773,12 +787,12 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
773787
}
774788
c.receivedSeqs[sequence] = struct{}{}
775789

776-
var changedChannels channels.Set
790+
var changedChannels []channels.ID
777791
if sequence == c.nextSequence || c.nextSequence == 0 {
778792
// This is the expected next sequence so we can add it now:
779793
changedChannels = c._addToCache(ctx, change)
780794
// Also add any pending sequences that are now contiguous:
781-
changedChannels = changedChannels.Update(c._addPendingLogs(ctx))
795+
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
782796
} else if sequence > c.nextSequence {
783797
// There's a missing sequence (or several), so put this one on ice until it arrives:
784798
heap.Push(&c.pendingLogs, change)
@@ -795,7 +809,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
795809

796810
if numPending > c.options.CachePendingSeqMaxNum {
797811
// Too many pending; add the oldest one:
798-
changedChannels = c._addPendingLogs(ctx)
812+
changedChannels = append(changedChannels, c._addPendingLogs(ctx)...)
799813
}
800814
} else if sequence > c.initialSequence {
801815
// Out-of-order sequence received!
@@ -807,7 +821,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
807821
base.DebugfCtx(ctx, base.KeyCache, " Received previously skipped out-of-order change (seq %d, expecting %d) doc %q / %q ", sequence, c.nextSequence, base.UD(change.DocID), change.RevID)
808822
}
809823

810-
changedChannels = changedChannels.Update(c._addToCache(ctx, change))
824+
changedChannels = append(changedChannels, c._addToCache(ctx, change)...)
811825
// Add to cache before removing from skipped, to ensure lowSequence doesn't get incremented until results are available
812826
// in cache
813827
err := c.RemoveSkipped(sequence)
@@ -820,7 +834,7 @@ func (c *changeCache) processEntry(ctx context.Context, change *LogEntry) channe
820834

821835
// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
822836
// flag indicates whether it was a change arriving out of sequence
823-
func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channels.Set {
837+
func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []channels.ID {
824838

825839
if change.Sequence >= c.nextSequence {
826840
c.nextSequence = change.Sequence + 1
@@ -857,11 +871,12 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) channel
857871
return updatedChannels
858872
}
859873

860-
// Add the first change(s) from pendingLogs if they're the next sequence. If not, and we've been
874+
// _addPendingLogs Add the first change(s) from pendingLogs if they're the next sequence. If not, and we've been
861875
// waiting too long for nextSequence, move nextSequence to skipped queue.
862-
// Returns the channels that changed.
863-
func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
864-
var changedChannels channels.Set
876+
// Returns the channels that changed. This may return the same channel more than once, channels should be deduplicated
877+
// before notifying the changes.
878+
func (c *changeCache) _addPendingLogs(ctx context.Context) []channels.ID {
879+
var changedChannels []channels.ID
865880
var isNext bool
866881

867882
for len(c.pendingLogs) > 0 {
@@ -870,7 +885,7 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
870885

871886
if isNext {
872887
oldestPending = c._popPendingLog(ctx)
873-
changedChannels = changedChannels.Update(c._addToCache(ctx, oldestPending))
888+
changedChannels = append(changedChannels, c._addToCache(ctx, oldestPending)...)
874889
} else if oldestPending.Sequence < c.nextSequence {
875890
// oldest pending is lower than next sequence, should be ignored
876891
base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence)

db/change_cache_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,10 +1338,10 @@ func readNextFromFeed(feed <-chan (*ChangeEntry), timeout time.Duration) (*Chang
13381338
//
13391339
// Create doc1 w/ unused sequences 1, actual sequence 3.
13401340
// Create doc2 w/ sequence 2, channel ABC
1341-
// Send feed event for doc2. This won't trigger notifyChange, as buffering is waiting for seq 1
1342-
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChange for channel ABC.
1341+
// Send feed event for doc2. This won't trigger notifyChangeFunc, as buffering is waiting for seq 1
1342+
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChangeFunc for channel ABC.
13431343
//
1344-
// Verify that notifyChange for channel ABC was sent.
1344+
// Verify that notifyChangeFunc for channel ABC was sent.
13451345
func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
13461346
base.LongRunningTest(t)
13471347

@@ -1359,12 +1359,12 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
13591359
collection := GetSingleDatabaseCollection(t, db.DatabaseContext)
13601360
collectionID := collection.GetCollectionID()
13611361

1362-
// -------- Setup notifyChange callback ----------------
1362+
// -------- Setup notifyChangeFunc callback ----------------
13631363

1364-
// Detect whether the 2nd was ignored using an notifyChange listener callback and make sure it was not added to the ABC channel
1364+
// Detect whether the 2nd was ignored using an notifyChangeFunc listener callback and make sure it was not added to the ABC channel
13651365
waitForOnChangeCallback := sync.WaitGroup{}
13661366
waitForOnChangeCallback.Add(1)
1367-
db.changeCache.notifyChange = func(_ context.Context, chans channels.Set) {
1367+
db.changeCache.notifyChangeFunc = func(_ context.Context, chans channels.Set) {
13681368
expectedChan := channels.NewID("ABC", collectionID)
13691369
for ch := range chans {
13701370
if ch == expectedChan {
@@ -1445,7 +1445,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
14451445
require.NoError(t, err)
14461446
}
14471447

1448-
// Send feed event for doc2. This won't trigger notifyChange, as buffering is waiting for seq 1
1448+
// Send feed event for doc2. This won't trigger notifyChangeFunc, as buffering is waiting for seq 1
14491449
feedEventDoc2 := sgbucket.FeedEvent{
14501450
Synchronous: true,
14511451
Key: []byte(doc2Id),
@@ -1455,7 +1455,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
14551455
}
14561456
db.changeCache.DocChanged(feedEventDoc2, DocTypeDocument)
14571457

1458-
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChange for channel ABC.
1458+
// Send feed event for doc1. This should trigger caching for doc2, and trigger notifyChangeFunc for channel ABC.
14591459
feedEventDoc1 := sgbucket.FeedEvent{
14601460
Synchronous: true,
14611461
Key: []byte(doc1Id),
@@ -1466,7 +1466,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
14661466

14671467
// -------- Wait for waitgroup ----------------
14681468

1469-
// Block until the notifyChange callback was invoked with the expected channels.
1469+
// Block until the notifyChangeFunc callback was invoked with the expected channels.
14701470
// If the callback is never called back with expected, will block forever.
14711471
waitForOnChangeCallback.Wait()
14721472

@@ -1620,7 +1620,7 @@ func TestInitializeCacheUnderLoad(t *testing.T) {
16201620

16211621
}
16221622

1623-
// Verify that notifyChange for channel zero is sent even when the channel isn't active in the cache.
1623+
// Verify that notifyChangeFunc for channel zero is sent even when the channel isn't active in the cache.
16241624
func TestNotifyForInactiveChannel(t *testing.T) {
16251625

16261626
// Enable relevant logging
@@ -1633,10 +1633,10 @@ func TestNotifyForInactiveChannel(t *testing.T) {
16331633
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
16341634
collectionID := collection.GetCollectionID()
16351635

1636-
// -------- Setup notifyChange callback ----------------
1636+
// -------- Setup notifyChangeFunc callback ----------------
16371637

16381638
notifyChannel := make(chan struct{})
1639-
db.changeCache.notifyChange = func(_ context.Context, chans channels.Set) {
1639+
db.changeCache.notifyChangeFunc = func(_ context.Context, chans channels.Set) {
16401640
expectedChan := channels.NewID("zero", collectionID)
16411641
if chans.Contains(expectedChan) {
16421642
notifyChannel <- struct{}{}

db/channel_cache.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type ChannelCache interface {
3939
Init(initialSequence uint64)
4040

4141
// Adds an entry to the cache, returns set of channels it was added to
42-
AddToCache(ctx context.Context, change *LogEntry) channels.Set
42+
AddToCache(ctx context.Context, change *LogEntry) []channels.ID
4343

4444
// Notifies the cache of a principal update. Updates the cache's high sequence
4545
AddPrincipal(change *LogEntry)
@@ -197,14 +197,14 @@ func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {
197197

198198
// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
199199
// flag indicates whether it was a change arriving out of sequence
200-
func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) channels.Set {
200+
func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) []channels.ID {
201201

202202
ch := change.Channels
203203
change.Channels = nil // not needed anymore, so free some memory
204204

205205
// updatedChannels tracks the set of channels that should be notified of the change. This includes
206206
// the change's active channels, as well as any channel removals for the active revision.
207-
updatedChannels := make(channels.Set, len(ch)+1) // +1 for the star channel
207+
updatedChannels := make([]channels.ID, 0, len(ch)+1) // +1 for the star channel
208208

209209
// If it's a late sequence, we want to add to all channel late queues within a single write lock,
210210
// to avoid a changes feed seeing the same late sequence in different iteration loops (and sending
@@ -234,7 +234,7 @@ func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) cha
234234
}
235235
}
236236
// Need to notify even if channel isn't active, for case where number of connected changes channels exceeds cache capacity
237-
updatedChannels.Add(channelID)
237+
updatedChannels = append(updatedChannels, channelID)
238238
}
239239
}
240240

@@ -247,7 +247,7 @@ func (c *channelCacheImpl) AddToCache(ctx context.Context, change *LogEntry) cha
247247
channelCache.AddLateSequence(change)
248248
}
249249
}
250-
updatedChannels.Add(starChannelID)
250+
updatedChannels = append(updatedChannels, starChannelID)
251251
}
252252

253253
c.updateHighCacheSequence(change.Sequence)

0 commit comments

Comments
 (0)