Skip to content

Commit afb6f8a

Browse files
authored
CBG-3748: Support legacy RevTree IDs for Delta Sync (#7718)
1 parent 03b1fe7 commit afb6f8a

15 files changed

+261
-57
lines changed

db/blip_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
914914
func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
915915
bsc.replicationStats.SendRevDeltaRequestedCount.Add(1)
916916

917-
revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID, bsc.useHLV())
917+
revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID)
918918
if err == ErrForbidden { // nolint: gocritic // can't convert if/else if to switch since base.IsFleeceDeltaError is not switchable
919919
return err
920920
} else if base.IsFleeceDeltaError(err) {

db/crud.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -449,14 +449,15 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c
449449

450450
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
451451
// returns nil.
452-
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string, useCVRevCache bool) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
452+
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
453453

454454
if docID == "" || fromRev == "" || toRev == "" {
455455
return nil, nil, nil
456456
}
457457
var fromRevision DocumentRevision
458458
var fromRevVrs Version
459-
if useCVRevCache {
459+
fromRevIsCV := !base.IsRevTreeID(fromRev)
460+
if fromRevIsCV {
460461
fromRevVrs, err = ParseVersion(fromRev)
461462
if err != nil {
462463
return nil, nil, err
@@ -510,7 +511,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
510511

511512
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
512513
var toRevision DocumentRevision
513-
if useCVRevCache {
514+
if !base.IsRevTreeID(toRev) {
514515
cv, err := ParseVersion(toRev)
515516
if err != nil {
516517
return nil, nil, err
@@ -539,7 +540,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
539540
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
540541
if deleted {
541542
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
542-
if useCVRevCache {
543+
if fromRevIsCV {
543544
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
544545
} else {
545546
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
@@ -583,7 +584,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
583584
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)
584585

585586
// Write the newly calculated delta back into the cache before returning
586-
if useCVRevCache {
587+
if fromRevIsCV {
587588
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
588589
} else {
589590
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
@@ -2044,7 +2045,7 @@ func (db *DatabaseCollectionWithUser) tombstoneActiveRevision(ctx context.Contex
20442045
// Backup previous revision body, then remove the current body from the doc
20452046
bodyBytes, err := doc.BodyBytes(ctx)
20462047
if err == nil {
2047-
_ = db.setOldRevisionJSON(ctx, doc.ID, revID, bodyBytes, db.oldRevExpirySeconds())
2048+
_ = db.setOldRevisionJSONBody(ctx, doc.ID, revID, bodyBytes, db.oldRevExpirySeconds())
20482049
}
20492050
doc.RemoveBody()
20502051

@@ -2872,7 +2873,11 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do
28722873
}
28732874
}
28742875
revHash := base.Crc32cHashString([]byte(doc.HLV.GetCurrentVersionString()))
2875-
_ = db.setOldRevisionJSON(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds())
2876+
_ = db.setOldRevisionJSONBody(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds())
2877+
// Optionally store a lookup document to find the CV-based revHash by legacy RevTree ID
2878+
if db.storeLegacyRevTreeData() {
2879+
_ = db.setOldRevisionJSONPtr(ctx, doc, db.deltaSyncRevMaxAgeSeconds())
2880+
}
28762881
}
28772882
return doc
28782883
}

db/database.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const (
7676
var (
7777
DefaultDeltaSyncEnabled = false
7878
DefaultDeltaSyncRevMaxAge = uint32(60 * 60 * 24) // 24 hours in seconds
79+
DefaultStoreLegacyRevs = true // for 4.0, this is opt-out - we can switch to opt-in at a later date
7980
)
8081

8182
var (
@@ -240,6 +241,7 @@ type UserViewsOptions struct {
240241
type DeltaSyncOptions struct {
241242
Enabled bool // Whether delta sync is enabled (EE only)
242243
RevMaxAgeSeconds uint32 // The number of seconds deltas for old revs are available for
244+
StoreLegacyRevs bool // Whether to store additional data to allow legacy RevTree ID support for delta sync
243245
}
244246

245247
type APIEndpoints struct {

db/database_collection.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ func (c *DatabaseCollection) deltaSyncRevMaxAgeSeconds() uint32 {
196196
return c.dbCtx.Options.DeltaSyncOptions.RevMaxAgeSeconds
197197
}
198198

199+
// storeLegacyRevTreeData returns true if legacy revision tree pointer data is stored. This is controlled at the database level.
200+
func (c *DatabaseCollection) storeLegacyRevTreeData() bool {
201+
return c.dbCtx.Options.DeltaSyncOptions.StoreLegacyRevs
202+
}
203+
199204
// eventMgr manages nofication events. This is controlled at database level.
200205
func (c *DatabaseCollection) eventMgr() *EventManager {
201206
return c.dbCtx.EventMgr

db/database_test.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -967,17 +967,50 @@ func TestGetRemovalMultiChannel(t *testing.T) {
967967
require.Equal(t, bodyExpected, body)
968968
}
969969

970+
func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) {
971+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)
972+
973+
if !base.IsEnterpriseEdition() {
974+
t.Skip("Delta sync only supported in EE")
975+
}
976+
977+
db, ctx := setupTestDB(t)
978+
db.Options.DeltaSyncOptions = DeltaSyncOptions{
979+
Enabled: true,
980+
RevMaxAgeSeconds: 300,
981+
StoreLegacyRevs: true,
982+
}
983+
984+
defer db.Close(ctx)
985+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
986+
require.NoError(t, db.DbStats.InitDeltaSyncStats())
987+
988+
rev1, _, err := collection.Put(ctx, "doc1", Body{"foo": "bar", "bar": "buzz", "quux": "quax"})
989+
require.NoError(t, err, "Error creating doc1")
990+
rev2, _, err := collection.Put(ctx, "doc1", Body{"foo": "bar", "quux": "fuzz", BodyRev: rev1})
991+
require.NoError(t, err, "Error updating doc1")
992+
993+
// force retrieval from backing store
994+
db.FlushRevisionCacheForTest()
995+
996+
// get delta using legacy RevTree IDs - this should force a lookup for CV1 via the pointer backup revision doc
997+
delta, _, err := collection.GetDelta(ctx, "doc1", rev1, rev2)
998+
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", "doc1", rev1, rev2)
999+
require.NotNil(t, delta)
1000+
assert.Equal(t, rev2, delta.ToRevID)
1001+
assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes)
1002+
}
1003+
9701004
// Test delta sync behavior when the fromRevision is a channel removal.
9711005
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
9721006
testCases := []struct {
9731007
name string
9741008
versionVector bool
9751009
}{
976-
// Revs are backed up by hash of CV now, now way to fetch backup revs by revID till CBG-3748 (backwards compatibility for revID)
977-
//{
978-
// name: "revTree test",
979-
// versionVector: false,
980-
//},
1010+
{
1011+
name: "revTree test",
1012+
versionVector: false,
1013+
},
9811014
{
9821015
name: "versionVector test",
9831016
versionVector: true,
@@ -1024,7 +1057,9 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
10241057
require.NoError(t, err, "Error purging old revision JSON")
10251058
} else {
10261059
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID)
1027-
require.NoError(t, err, "Error purging old revision JSON")
1060+
_ = err
1061+
// TODO: CBG-4840 - Requires restoration of RevTree ID-based old revision storage
1062+
//require.NoError(t, err, "Error purging old revision JSON")
10281063
}
10291064

10301065
// Request delta between rev2ID and rev3ID (toRevision "rev2ID" is channel removal)
@@ -1039,12 +1074,12 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
10391074
if testCase.versionVector {
10401075
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
10411076
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
1042-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
1077+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
10431078
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
10441079
assert.Nil(t, delta)
10451080
assert.Nil(t, redactedRev)
10461081
} else {
1047-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID, false)
1082+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID)
10481083
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
10491084
assert.Nil(t, delta)
10501085
assert.Nil(t, redactedRev)
@@ -1061,12 +1096,12 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
10611096
if testCase.versionVector {
10621097
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
10631098
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
1064-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
1099+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
10651100
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
10661101
assert.Nil(t, delta)
10671102
assert.Nil(t, redactedRev)
10681103
} else {
1069-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID, false)
1104+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID)
10701105
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
10711106
assert.Nil(t, delta)
10721107
assert.Nil(t, redactedRev)
@@ -1142,12 +1177,12 @@ func TestDeltaSyncWhenToRevIsChannelRemoval(t *testing.T) {
11421177
if testCase.versionVector {
11431178
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
11441179
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
1145-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
1180+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
11461181
require.NoError(t, err)
11471182
assert.Nil(t, delta)
11481183
assert.Equal(t, `{"_removed":true}`, string(redactedRev.BodyBytes))
11491184
} else {
1150-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID, false)
1185+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID)
11511186
require.NoError(t, err)
11521187
assert.Nil(t, delta)
11531188
assert.Equal(t, `{"_removed":true}`, string(redactedRev.BodyBytes))
@@ -1163,12 +1198,12 @@ func TestDeltaSyncWhenToRevIsChannelRemoval(t *testing.T) {
11631198
if testCase.versionVector {
11641199
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
11651200
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
1166-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
1201+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
11671202
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
11681203
assert.Nil(t, delta)
11691204
assert.Nil(t, redactedRev)
11701205
} else {
1171-
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID, false)
1206+
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID)
11721207
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
11731208
assert.Nil(t, delta)
11741209
assert.Nil(t, redactedRev)

db/import.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ func (db *DatabaseCollectionWithUser) backupPreImportRevision(ctx context.Contex
478478
return err
479479
}
480480

481-
setOldRevErr := db.setOldRevisionJSON(ctx, docid, revid, oldRevJSON, db.oldRevExpirySeconds())
481+
setOldRevErr := db.setOldRevisionJSONBody(ctx, docid, revid, oldRevJSON, db.oldRevExpirySeconds())
482482
if setOldRevErr != nil {
483483
return fmt.Errorf("Persistence error: %v", setOldRevErr)
484484
}

db/revision.go

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -229,25 +229,28 @@ func (body Body) getExpiry() (uint32, bool, error) {
229229
return *expiry, true, err
230230
}
231231

232-
// nonJSONPrefix is used to ensure old revision bodies aren't hidden from N1QL/Views.
233-
const nonJSONPrefix = byte(1)
234-
235-
// Looks up the raw JSON data of a revision that's been archived to a separate doc.
232+
// getOldRevisionJSON looks up the raw JSON data of a revision that's been archived to a separate doc.
236233
// If the revision isn't found (e.g. has been deleted by compaction) returns 404 error.
237-
func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid string, rev string) ([]byte, error) {
238-
data, _, err := c.dataStore.GetRaw(oldRevisionKey(docid, rev))
234+
func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid string, revOrCV string) ([]byte, error) {
235+
data, _, err := c.dataStore.GetRaw(oldRevisionKey(docid, revOrCV))
239236
if base.IsDocNotFoundError(err) {
240-
base.DebugfCtx(ctx, base.KeyCRUD, "No old revision %q / %q", base.UD(docid), rev)
241-
err = ErrMissing
242-
}
243-
if data != nil {
244-
// Strip out the non-JSON prefix
245-
if len(data) > 0 && data[0] == nonJSONPrefix {
246-
data = data[1:]
247-
}
248-
base.DebugfCtx(ctx, base.KeyCRUD, "Got old revision %q / %q --> %d bytes", base.UD(docid), rev, len(data))
237+
base.DebugfCtx(ctx, base.KeyCRUD, "No old revision %q / %q", base.UD(docid), revOrCV)
238+
return nil, ErrMissing
239+
} else if err != nil {
240+
return nil, err
241+
}
242+
kind, revData := stripNonJSONPrefix(data)
243+
switch kind {
244+
case nonJSONPrefixKindRevBody:
245+
base.DebugfCtx(ctx, base.KeyCRUD, "Got old revision %q / %q --> %d bytes", base.UD(docid), revOrCV, len(revData))
246+
return revData, nil
247+
case nonJSONPrefixKindRevPtr:
248+
// pointer to a CV-keyed backup revision - do another fetch by CV for the body
249+
base.DebugfCtx(ctx, base.KeyCRUD, "Found old revision pointer %q -> %q / %q", revOrCV, base.UD(docid), string(revData))
250+
return c.getOldRevisionJSON(ctx, docid, string(revData))
251+
default:
252+
return nil, fmt.Errorf("unexpected prefix %b for old revision doc %q / %q", kind, base.UD(docid).String(), revOrCV)
249253
}
250-
return data, err
251254
}
252255

253256
// Makes a backup of revision body for use by delta sync, and in-flight replications requesting an old revision.
@@ -260,12 +263,11 @@ func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid strin
260263
// delta=true && shared_bucket_access=false
261264
// - old revision stored, with expiry rev_max_age_seconds
262265
func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, docId, oldRev string, oldBody []byte) {
263-
264266
// Without delta sync, store the old rev for in-flight replication purposes
265267
if !db.deltaSyncEnabled() || db.deltaSyncRevMaxAgeSeconds() == 0 {
266268
if len(oldBody) > 0 {
267269
oldRevHash := base.Crc32cHashString([]byte(oldRev))
268-
_ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.oldRevExpirySeconds())
270+
_ = db.setOldRevisionJSONBody(ctx, docId, oldRevHash, oldBody, db.oldRevExpirySeconds())
269271
}
270272
return
271273
}
@@ -276,41 +278,37 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do
276278
if db.UseXattrs() {
277279
// Refresh the expiry on the previous revision backup
278280
oldRevHash := base.Crc32cHashString([]byte(oldRev))
279-
_ = db.refreshPreviousRevisionBackup(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
281+
_ = db.refreshOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
280282
return
281283
}
282284

283285
// Non-xattr only need to store the previous revision, as all writes come through SG
284286
if len(oldBody) > 0 {
285287
oldRevHash := base.Crc32cHashString([]byte(oldRev))
286-
_ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
288+
_ = db.setOldRevisionJSONBody(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
287289
}
288290
}
289291

290-
func (db *DatabaseCollectionWithUser) setOldRevisionJSON(ctx context.Context, docid string, rev string, body []byte, expiry uint32) error {
291-
292+
// setOldRevisionJSONBody stores the raw JSON body of a revision that has been archived to a separate doc.
293+
func (db *DatabaseCollectionWithUser) setOldRevisionJSONBody(ctx context.Context, docid string, rev string, body []byte, expiry uint32) error {
292294
// Setting the binary flag isn't sufficient to make N1QL ignore the doc - the binary flag is only used by the SDKs.
293295
// To ensure it's not available via N1QL, need to prefix the raw bytes with non-JSON data.
294-
// Copying byte slice to make sure we don't modify the version stored in the revcache.
295-
nonJSONBytes := make([]byte, 1, len(body)+1)
296-
nonJSONBytes[0] = nonJSONPrefix
297-
nonJSONBytes = append(nonJSONBytes, body...)
296+
nonJSONBytes := withNonJSONPrefix(nonJSONPrefixKindRevBody, body)
298297
err := db.dataStore.SetRaw(oldRevisionKey(docid, rev), expiry, nil, nonJSONBytes)
299298
if err == nil {
300299
base.DebugfCtx(ctx, base.KeyCRUD, "Backed up revision body %q/%q (%d bytes, ttl:%d)", base.UD(docid), rev, len(body), expiry)
301300
} else {
302-
base.WarnfCtx(ctx, "setOldRevisionJSON failed: doc=%q rev=%q err=%v", base.UD(docid), rev, err)
301+
base.WarnfCtx(ctx, "setOldRevisionJSONBody failed: doc=%q rev=%q err=%v", base.UD(docid), rev, err)
303302
}
304303
return err
305304
}
306305

307306
// Extends the expiry on a revision backup. If this fails w/ key not found, will attempt to
308307
// recreate the revision backup when body is non-empty.
309-
func (db *DatabaseCollectionWithUser) refreshPreviousRevisionBackup(ctx context.Context, docid string, revid string, body []byte, expiry uint32) error {
310-
308+
func (db *DatabaseCollectionWithUser) refreshOldRevisionJSON(ctx context.Context, docid string, revid string, body []byte, expiry uint32) error {
311309
_, err := db.dataStore.Touch(oldRevisionKey(docid, revid), expiry)
312310
if base.IsDocNotFoundError(err) && len(body) > 0 {
313-
return db.setOldRevisionJSON(ctx, docid, revid, body, expiry)
311+
return db.setOldRevisionJSONBody(ctx, docid, revid, body, expiry)
314312
}
315313
return err
316314
}
@@ -323,6 +321,7 @@ func (c *DatabaseCollection) PurgeOldRevisionJSON(ctx context.Context, docid str
323321

324322
// ////// UTILITY FUNCTIONS:
325323

324+
// oldRevisionKey returns the key for the document that stores a copy of the document's body for a given old revision.
326325
func oldRevisionKey(docid string, rev string) string {
327326
return fmt.Sprintf("%s%s:%d:%s", base.RevPrefix, docid, len(rev), rev)
328327
}

0 commit comments

Comments
 (0)