Skip to content

Commit 1db226e

Browse files
committed
Additional wait handling when backfill is deferred
1 parent d83def0 commit 1db226e

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

db/change_cache_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ func TestLowSequenceHandlingWithAccessGrant(t *testing.T) {
617617
// Go-routine to work the feed channel and write to an array for use by assertions
618618
var changes = make([]*ChangeEntry, 0, 50)
619619

620-
time.Sleep(50 * time.Millisecond)
620+
time.Sleep(500 * time.Millisecond)
621621

622622
// Validate the initial sequences arrive as expected
623623
err = appendFromFeed(&changes, feed, 3)
@@ -637,7 +637,7 @@ func TestLowSequenceHandlingWithAccessGrant(t *testing.T) {
637637

638638
db.changeCache.waitForSequence(9)
639639

640-
time.Sleep(50 * time.Millisecond)
640+
time.Sleep(500 * time.Millisecond)
641641
err = appendFromFeed(&changes, feed, 4)
642642
assert.True(t, err == nil)
643643
assert.Equals(t, len(changes), 7)

db/changes.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"errors"
1515
"fmt"
1616
"sort"
17+
"time"
1718

1819
"github.com/couchbase/sync_gateway/base"
1920
"github.com/couchbase/sync_gateway/channels"
@@ -277,6 +278,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
277278
var userCounter uint64 // Wait counter used to identify changes to the user document
278279
var addedChannels base.Set // Tracks channels added to the user during changes processing.
279280
var userChanged bool // Whether the user document has changed in a given iteration loop
281+
var deferredBackfill bool // Whether there's a backfill identified in the user doc that's deferred while the SG cache catches up
280282

281283
// lowSequence is used to send composite keys to clients, so that they can obtain any currently
282284
// skipped sequences in a future iteration or request.
@@ -357,6 +359,7 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
357359
// could be written to channel X during one iteration, and channel Y during another. Users
358360
// with access to both channels would see two versions on the feed.
359361

362+
deferredBackfill = false
360363
for name, vbSeqAddedAt := range channelsSince {
361364
chanOpts := options
362365
seqAddedAt := vbSeqAddedAt.Sequence
@@ -378,6 +381,9 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
378381

379382
// Backfill required when seqAddedAt is before current sequence
380383
backfillRequired := seqAddedAt > 1 && options.Since.Before(SequenceID{Seq: seqAddedAt}) && seqAddedAt <= currentCachedSequence
384+
if seqAddedAt > currentCachedSequence {
385+
deferredBackfill = true
386+
}
381387

382388
// Ensure backfill isn't already in progress for this seqAddedAt
383389
backfillPending := options.Since.TriggeredBy == 0 || options.Since.TriggeredBy < seqAddedAt
@@ -530,6 +536,14 @@ func (db *Database) SimpleMultiChangesFeed(chans base.Set, options ChangesOption
530536
output <- nil
531537
waitForChanges:
532538
for {
539+
// If we're in a deferred Backfill, the user may not get notification when the cache catches up to the backfill (e.g. when the granting doc isn't
540+
// visible to the user), and so changeWaiter.Wait() would block until the next user-visible doc arrives. Use a hardcoded wait instead
541+
if deferredBackfill {
542+
time.Sleep(250 * time.Millisecond)
543+
if db.changeCache.GetStableSequence("").Seq != currentCachedSequence {
544+
break waitForChanges
545+
}
546+
}
533547
waitResponse := changeWaiter.Wait()
534548
if waitResponse == WaiterClosed {
535549
break outer

0 commit comments

Comments
 (0)