Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
func (bsc *BlipSyncContext) sendRevAsDelta(ctx context.Context, sender *blip.Sender, docID, revID string, deltaSrcRevID string, seq SequenceID, knownRevs map[string]bool, maxHistory int, handleChangesResponseCollection *DatabaseCollectionWithUser, collectionIdx *int) error {
bsc.replicationStats.SendRevDeltaRequestedCount.Add(1)

revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID, bsc.useHLV())
revDelta, redactedRev, err := handleChangesResponseCollection.GetDelta(ctx, docID, deltaSrcRevID, revID)
if err == ErrForbidden { // nolint: gocritic // can't convert if/else if to switch since base.IsFleeceDeltaError is not switchable
return err
} else if base.IsFleeceDeltaError(err) {
Expand Down
19 changes: 12 additions & 7 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,14 +449,15 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string, useCVRevCache bool) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {

if docID == "" || fromRev == "" || toRev == "" {
return nil, nil, nil
}
var fromRevision DocumentRevision
var fromRevVrs Version
if useCVRevCache {
fromRevIsCV := !base.IsRevTreeID(fromRev)
if fromRevIsCV {
fromRevVrs, err = ParseVersion(fromRev)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -510,7 +511,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR

db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
var toRevision DocumentRevision
if useCVRevCache {
if !base.IsRevTreeID(toRev) {
cv, err := ParseVersion(toRev)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -539,7 +540,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
if deleted {
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
if useCVRevCache {
if fromRevIsCV {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
Expand Down Expand Up @@ -583,7 +584,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)

// Write the newly calculated delta back into the cache before returning
if useCVRevCache {
if fromRevIsCV {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
db.revisionCache.UpdateDelta(ctx, docID, fromRev, revCacheDelta)
Expand Down Expand Up @@ -2015,7 +2016,7 @@ func (db *DatabaseCollectionWithUser) tombstoneActiveRevision(ctx context.Contex
// Backup previous revision body, then remove the current body from the doc
bodyBytes, err := doc.BodyBytes(ctx)
if err == nil {
_ = db.setOldRevisionJSON(ctx, doc.ID, revID, bodyBytes, db.oldRevExpirySeconds())
_ = db.setOldRevisionJSONBody(ctx, doc.ID, revID, bodyBytes, db.oldRevExpirySeconds())
}
doc.RemoveBody()

Expand Down Expand Up @@ -2843,7 +2844,11 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do
}
}
revHash := base.Crc32cHashString([]byte(doc.HLV.GetCurrentVersionString()))
_ = db.setOldRevisionJSON(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds())
_ = db.setOldRevisionJSONBody(ctx, doc.ID, revHash, newBodyWithAtts, db.deltaSyncRevMaxAgeSeconds())
// Optionally store a lookup document to find the CV-based revHash by legacy RevTree ID
if db.storeLegacyRevTreeData() {
_ = db.setOldRevisionJSONPtr(ctx, doc, db.deltaSyncRevMaxAgeSeconds())
}
}
return doc
}
Expand Down
2 changes: 2 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
var (
DefaultDeltaSyncEnabled = false
DefaultDeltaSyncRevMaxAge = uint32(60 * 60 * 24) // 24 hours in seconds
DefaultStoreLegacyRevs = true // for 4.0, this is opt-out - we can switch to opt-in at a later date
)

var (
Expand Down Expand Up @@ -240,6 +241,7 @@ type UserViewsOptions struct {
type DeltaSyncOptions struct {
Enabled bool // Whether delta sync is enabled (EE only)
RevMaxAgeSeconds uint32 // The number of seconds deltas for old revs are available for
StoreLegacyRevs bool // Whether to store additional data to allow legacy RevTree ID support for delta sync
}

type APIEndpoints struct {
Expand Down
5 changes: 5 additions & 0 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (c *DatabaseCollection) deltaSyncRevMaxAgeSeconds() uint32 {
return c.dbCtx.Options.DeltaSyncOptions.RevMaxAgeSeconds
}

// storeLegacyRevTreeData returns true if legacy revision tree pointer data is stored. This is controlled at the database level.
func (c *DatabaseCollection) storeLegacyRevTreeData() bool {
return c.dbCtx.Options.DeltaSyncOptions.StoreLegacyRevs
}

// eventMgr manages nofication events. This is controlled at database level.
func (c *DatabaseCollection) eventMgr() *EventManager {
return c.dbCtx.EventMgr
Expand Down
63 changes: 49 additions & 14 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,17 +967,50 @@ func TestGetRemovalMultiChannel(t *testing.T) {
require.Equal(t, bodyExpected, body)
}

func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

if !base.IsEnterpriseEdition() {
t.Skip("Delta sync only supported in EE")
}

db, ctx := setupTestDB(t)
db.Options.DeltaSyncOptions = DeltaSyncOptions{
Enabled: true,
RevMaxAgeSeconds: 300,
StoreLegacyRevs: true,
}

defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
require.NoError(t, db.DbStats.InitDeltaSyncStats())

rev1, _, err := collection.Put(ctx, "doc1", Body{"foo": "bar", "bar": "buzz", "quux": "quax"})
require.NoError(t, err, "Error creating doc1")
rev2, _, err := collection.Put(ctx, "doc1", Body{"foo": "bar", "quux": "fuzz", BodyRev: rev1})
require.NoError(t, err, "Error updating doc1")

// force retrieval from backing store
db.FlushRevisionCacheForTest()

// get delta using legacy RevTree IDs - this should force a lookup for CV1 via the pointer backup revision doc
delta, _, err := collection.GetDelta(ctx, "doc1", rev1, rev2)
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", "doc1", rev1, rev2)
require.NotNil(t, delta)
assert.Equal(t, rev2, delta.ToRevID)
assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes)
}

// Test delta sync behavior when the fromRevision is a channel removal.
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
testCases := []struct {
name string
versionVector bool
}{
// Revs are backed up by hash of CV now, now way to fetch backup revs by revID till CBG-3748 (backwards compatibility for revID)
//{
// name: "revTree test",
// versionVector: false,
//},
{
name: "revTree test",
versionVector: false,
},
{
name: "versionVector test",
versionVector: true,
Expand Down Expand Up @@ -1024,7 +1057,9 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
require.NoError(t, err, "Error purging old revision JSON")
} else {
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2ID)
require.NoError(t, err, "Error purging old revision JSON")
_ = err
// TODO: CBG-4840 - Requires restoration of RevTree ID-based old revision storage
//require.NoError(t, err, "Error purging old revision JSON")
}

// Request delta between rev2ID and rev3ID (toRevision "rev2ID" is channel removal)
Expand All @@ -1039,12 +1074,12 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
if testCase.versionVector {
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
} else {
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID, false)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID)
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
Expand All @@ -1061,12 +1096,12 @@ func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
if testCase.versionVector {
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
} else {
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID, false)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2ID, rev3ID)
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
Expand Down Expand Up @@ -1142,12 +1177,12 @@ func TestDeltaSyncWhenToRevIsChannelRemoval(t *testing.T) {
if testCase.versionVector {
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
require.NoError(t, err)
assert.Nil(t, delta)
assert.Equal(t, `{"_removed":true}`, string(redactedRev.BodyBytes))
} else {
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID, false)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID)
require.NoError(t, err)
assert.Nil(t, delta)
assert.Equal(t, `{"_removed":true}`, string(redactedRev.BodyBytes))
Expand All @@ -1163,12 +1198,12 @@ func TestDeltaSyncWhenToRevIsChannelRemoval(t *testing.T) {
if testCase.versionVector {
rev2 := docRev2.HLV.ExtractCurrentVersionFromHLV()
rev3 := docRev3.HLV.ExtractCurrentVersionFromHLV()
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String(), true)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev2.String(), rev3.String())
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
} else {
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID, false)
delta, redactedRev, err := collection.GetDelta(ctx, "doc1", rev1ID, rev2ID)
require.Equal(t, base.HTTPErrorf(404, "missing"), err)
assert.Nil(t, delta)
assert.Nil(t, redactedRev)
Expand Down
2 changes: 1 addition & 1 deletion db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (db *DatabaseCollectionWithUser) backupPreImportRevision(ctx context.Contex
return err
}

setOldRevErr := db.setOldRevisionJSON(ctx, docid, revid, oldRevJSON, db.oldRevExpirySeconds())
setOldRevErr := db.setOldRevisionJSONBody(ctx, docid, revid, oldRevJSON, db.oldRevExpirySeconds())
if setOldRevErr != nil {
return fmt.Errorf("Persistence error: %v", setOldRevErr)
}
Expand Down
59 changes: 29 additions & 30 deletions db/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,25 +229,28 @@ func (body Body) getExpiry() (uint32, bool, error) {
return *expiry, true, err
}

// nonJSONPrefix is used to ensure old revision bodies aren't hidden from N1QL/Views.
const nonJSONPrefix = byte(1)

// Looks up the raw JSON data of a revision that's been archived to a separate doc.
// getOldRevisionJSON looks up the raw JSON data of a revision that's been archived to a separate doc.
// If the revision isn't found (e.g. has been deleted by compaction) returns 404 error.
func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid string, rev string) ([]byte, error) {
data, _, err := c.dataStore.GetRaw(oldRevisionKey(docid, rev))
func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid string, revOrCV string) ([]byte, error) {
data, _, err := c.dataStore.GetRaw(oldRevisionKey(docid, revOrCV))
if base.IsDocNotFoundError(err) {
base.DebugfCtx(ctx, base.KeyCRUD, "No old revision %q / %q", base.UD(docid), rev)
err = ErrMissing
}
if data != nil {
// Strip out the non-JSON prefix
if len(data) > 0 && data[0] == nonJSONPrefix {
data = data[1:]
}
base.DebugfCtx(ctx, base.KeyCRUD, "Got old revision %q / %q --> %d bytes", base.UD(docid), rev, len(data))
base.DebugfCtx(ctx, base.KeyCRUD, "No old revision %q / %q", base.UD(docid), revOrCV)
return nil, ErrMissing
} else if err != nil {
return nil, err
}
kind, revData := stripNonJSONPrefix(data)
switch kind {
case nonJSONPrefixKindRevBody:
base.DebugfCtx(ctx, base.KeyCRUD, "Got old revision %q / %q --> %d bytes", base.UD(docid), revOrCV, len(revData))
return revData, nil
case nonJSONPrefixKindRevPtr:
// pointer to a CV-keyed backup revision - do another fetch by CV for the body
base.DebugfCtx(ctx, base.KeyCRUD, "Found old revision pointer %q -> %q / %q", revOrCV, base.UD(docid), string(revData))
return c.getOldRevisionJSON(ctx, docid, string(revData))
default:
return nil, fmt.Errorf("unexpected prefix %b for old revision doc %q / %q", kind, base.UD(docid).String(), revOrCV)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to do, or are these equivalent?

Suggested change
return nil, fmt.Errorf("unexpected prefix %b for old revision doc %q / %q", kind, base.UD(docid).String(), revOrCV)
return nil, base.RedactErrorf("unexpected prefix %b for old revision doc %q / %q", kind, base.UD(docid).String(), revOrCV)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're mostly equivalent here. The only difference that can occur when we use RedactErrorf is if we're using the error in a returned HTTP response, where the response gets the unredacted version, and the logging includes the redacted version.

In this case it can do that, but it's not a huge deal and I don't expect this default case to be hit anyway.

}
return data, err
}

// Makes a backup of revision body for use by delta sync, and in-flight replications requesting an old revision.
Expand All @@ -260,12 +263,11 @@ func (c *DatabaseCollection) getOldRevisionJSON(ctx context.Context, docid strin
// delta=true && shared_bucket_access=false
// - old revision stored, with expiry rev_max_age_seconds
func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, docId, oldRev string, oldBody []byte) {

// Without delta sync, store the old rev for in-flight replication purposes
if !db.deltaSyncEnabled() || db.deltaSyncRevMaxAgeSeconds() == 0 {
if len(oldBody) > 0 {
oldRevHash := base.Crc32cHashString([]byte(oldRev))
_ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.oldRevExpirySeconds())
_ = db.setOldRevisionJSONBody(ctx, docId, oldRevHash, oldBody, db.oldRevExpirySeconds())
}
return
}
Expand All @@ -276,41 +278,37 @@ func (db *DatabaseCollectionWithUser) backupRevisionJSON(ctx context.Context, do
if db.UseXattrs() {
// Refresh the expiry on the previous revision backup
oldRevHash := base.Crc32cHashString([]byte(oldRev))
_ = db.refreshPreviousRevisionBackup(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
_ = db.refreshOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
return
}

// Non-xattr only need to store the previous revision, as all writes come through SG
if len(oldBody) > 0 {
oldRevHash := base.Crc32cHashString([]byte(oldRev))
_ = db.setOldRevisionJSON(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
_ = db.setOldRevisionJSONBody(ctx, docId, oldRevHash, oldBody, db.deltaSyncRevMaxAgeSeconds())
}
}

func (db *DatabaseCollectionWithUser) setOldRevisionJSON(ctx context.Context, docid string, rev string, body []byte, expiry uint32) error {

// setOldRevisionJSONBody stores the raw JSON body of a revision that has been archived to a separate doc.
func (db *DatabaseCollectionWithUser) setOldRevisionJSONBody(ctx context.Context, docid string, rev string, body []byte, expiry uint32) error {
// Setting the binary flag isn't sufficient to make N1QL ignore the doc - the binary flag is only used by the SDKs.
// To ensure it's not available via N1QL, need to prefix the raw bytes with non-JSON data.
// Copying byte slice to make sure we don't modify the version stored in the revcache.
nonJSONBytes := make([]byte, 1, len(body)+1)
nonJSONBytes[0] = nonJSONPrefix
nonJSONBytes = append(nonJSONBytes, body...)
nonJSONBytes := withNonJSONPrefix(nonJSONPrefixKindRevBody, body)
err := db.dataStore.SetRaw(oldRevisionKey(docid, rev), expiry, nil, nonJSONBytes)
if err == nil {
base.DebugfCtx(ctx, base.KeyCRUD, "Backed up revision body %q/%q (%d bytes, ttl:%d)", base.UD(docid), rev, len(body), expiry)
} else {
base.WarnfCtx(ctx, "setOldRevisionJSON failed: doc=%q rev=%q err=%v", base.UD(docid), rev, err)
base.WarnfCtx(ctx, "setOldRevisionJSONBody failed: doc=%q rev=%q err=%v", base.UD(docid), rev, err)
}
return err
}

// Extends the expiry on a revision backup. If this fails w/ key not found, will attempt to
// recreate the revision backup when body is non-empty.
func (db *DatabaseCollectionWithUser) refreshPreviousRevisionBackup(ctx context.Context, docid string, revid string, body []byte, expiry uint32) error {

func (db *DatabaseCollectionWithUser) refreshOldRevisionJSON(ctx context.Context, docid string, revid string, body []byte, expiry uint32) error {
_, err := db.dataStore.Touch(oldRevisionKey(docid, revid), expiry)
if base.IsDocNotFoundError(err) && len(body) > 0 {
return db.setOldRevisionJSON(ctx, docid, revid, body, expiry)
return db.setOldRevisionJSONBody(ctx, docid, revid, body, expiry)
}
return err
}
Expand All @@ -323,6 +321,7 @@ func (c *DatabaseCollection) PurgeOldRevisionJSON(ctx context.Context, docid str

// ////// UTILITY FUNCTIONS:

// oldRevisionKey returns the key for the document that stores a copy of the document's body for a given old revision.
func oldRevisionKey(docid string, rev string) string {
return fmt.Sprintf("%s%s:%d:%s", base.RevPrefix, docid, len(rev), rev)
}
Expand Down
Loading
Loading