Skip to content

Commit 4e2f417

Browse files
authored
[4.0.0 backport] CBG-4886: delta sync for legacy rev mutations over ISGR will not send deltas (#7806)
1 parent d64fb97 commit 4e2f417

File tree

5 files changed

+222
-23
lines changed

5 files changed

+222
-23
lines changed

db/blip_handler.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,13 +1112,11 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
11121112
newDoc.HLV = incomingHLV
11131113
}
11141114

1115-
isBlipRevTreeProperty := false
11161115
// if the client is SGW and there are no legacy revs being sent (i.e. doc is not a pre-upgraded doc) check the rev tree property
11171116
if bh.clientType == BLIPClientTypeSGR2 && len(legacyRevList) == 0 {
11181117
revTree, ok := rq.Properties[RevMessageTreeHistory]
11191118
if ok {
11201119
legacyRevList = append(legacyRevList, strings.Split(revTree, ",")...)
1121-
isBlipRevTreeProperty = true
11221120
if len(legacyRevList) > 0 {
11231121
newDoc.RevID = legacyRevList[0]
11241122
}
@@ -1352,7 +1350,7 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
13521350
ExistingDoc: rawBucketDoc,
13531351
NewDocHLV: incomingHLV,
13541352
ConflictResolver: bh.conflictResolver.hlvConflictResolver,
1355-
AlignRevTrees: isBlipRevTreeProperty,
1353+
ISGRWrite: bh.clientType == BLIPClientTypeSGR2,
13561354
}
13571355
_, _, _, err = bh.collection.PutExistingCurrentVersion(bh.loggingCtx, opts)
13581356
} else {

db/blip_sync_context.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,11 +597,12 @@ func (bsc *BlipSyncContext) sendDelta(ctx context.Context, sender *blip.Sender,
597597
properties := blipRevMessageProperties(history, revDelta.ToDeleted, seq, "", revTreeProperty)
598598
properties[RevMessageDeltaSrc] = deltaSrcRevID
599599

600-
base.DebugfCtx(ctx, base.KeySync, "Sending rev %q %s as delta. DeltaSrc:%s", base.UD(docID), revDelta.ToRevID, deltaSrcRevID)
601600
if bsc.useHLV() {
601+
base.DebugfCtx(ctx, base.KeySync, "Sending rev %q %s as delta. DeltaSrc:%s", base.UD(docID), revDelta.ToCV, deltaSrcRevID)
602602
return bsc.sendRevisionWithProperties(ctx, sender, docID, revDelta.ToCV, collectionIdx, revDelta.DeltaBytes, revDelta.AttachmentStorageMeta,
603603
properties, seq, resendFullRevisionFunc)
604604
} else {
605+
base.DebugfCtx(ctx, base.KeySync, "Sending rev %q %s as delta. DeltaSrc:%s", base.UD(docID), revDelta.ToRevID, deltaSrcRevID)
605606
return bsc.sendRevisionWithProperties(ctx, sender, docID, revDelta.ToRevID, collectionIdx, revDelta.DeltaBytes, revDelta.AttachmentStorageMeta,
606607
properties, seq, resendFullRevisionFunc)
607608
}
@@ -872,7 +873,7 @@ func (bsc *BlipSyncContext) getKnownRevs(ctx context.Context, docID string, know
872873
// revtree clients. For HLV clients, use the cv as deltaSrc
873874
if bsc.useDeltas && len(knownRevsArray) > 0 {
874875
if revID, ok := knownRevsArray[0].(string); ok {
875-
if bsc.useHLV() {
876+
if bsc.useHLV() && !base.IsRevTreeID(revID) {
876877
// extract cv from the known revs array
877878
msgHLV, _, deltaSrcErr := extractHLVFromBlipString(revID)
878879
if deltaSrcErr != nil {

db/crud.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
12821282
// - NewDocHLV: new incoming doc's HLV
12831283
// - ExistingDoc: existing doc in bucket (if present)
12841284
// - RevTreeHistory: list of revID's from the incoming docs history (including docs current rev).
1285-
// - AlignRevTrees: if this is true then we will align the new write with the incoming docs rev tree. If this is
1285+
// - ISGRWrite: if this is true then we will align the new write with the incoming docs rev tree. If this is
12861286
// false and len(RevTreeHistory) > 0 then this means the local version of this doc does not have an HLV so this parameter
12871287
// will be used to check for conflicts.
12881288
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, opts PutDocOptions) (doc *Document, cv *Version, newRevID string, err error) {
@@ -1339,7 +1339,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
13391339
// if incoming rev tree list is from a legacy pre upgraded doc, we should have new revID generation based
13401340
// off the previous current rev +1. If we have rev tree list filled from ISGR's rev tree property then we
13411341
// should use the current rev of inc
1342-
if !opts.AlignRevTrees {
1342+
if !opts.ISGRWrite {
13431343
newGeneration = prevGeneration + 1
13441344
} else {
13451345
newGeneration = prevGeneration
@@ -1373,7 +1373,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
13731373
doc.HLV = NewHybridLogicalVector()
13741374
}
13751375
doc.HLV.UpdateWithIncomingHLV(opts.NewDocHLV)
1376-
if opts.AlignRevTrees {
1376+
if opts.ISGRWrite {
13771377
err := doc.alignRevTreeHistoryForHLVWrite(ctx, db, opts.NewDoc, opts.RevTreeHistory, opts.ForceAllowConflictingTombstone)
13781378
if err != nil {
13791379
return nil, nil, false, nil, err
@@ -1393,7 +1393,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
13931393
// update hlv for all newer incoming source version pairs
13941394
doc.HLV.UpdateWithIncomingHLV(opts.NewDocHLV)
13951395
// the new document has a dominating hlv, so we can just update local revtree with incoming revtree
1396-
if !opts.AlignRevTrees {
1396+
if !opts.ISGRWrite {
13971397
previousRevTreeID = doc.GetRevTreeID()
13981398
} else {
13991399
// align rev tree here for ISGR replications
@@ -1404,7 +1404,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
14041404
}
14051405
case HLVConflict:
14061406
// if we have been supplied a rev tree from cbl, perform conflict check on rev tree history
1407-
if len(opts.RevTreeHistory) > 0 && !opts.AlignRevTrees {
1407+
if len(opts.RevTreeHistory) > 0 && !opts.ISGRWrite {
14081408
parent, currentRevIndex, err := db.revTreeConflictCheck(ctx, opts.RevTreeHistory, doc, opts.NewDoc.Deleted)
14091409
if err != nil {
14101410
base.DebugfCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s, and conflict found in rev tree history", base.UD(doc.ID))
@@ -1448,7 +1448,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
14481448
// if we have revtree history available and we are communicating with CBL, we must update the rev tree
14491449
// to include the new to us revisions from the incoming rev tree history skipping history check given conflict
14501450
// check is done at this point.
1451-
if len(opts.RevTreeHistory) > 0 && !opts.AlignRevTrees && !revTreeAlignedForCBL {
1451+
if len(opts.RevTreeHistory) > 0 && !opts.ISGRWrite && !revTreeAlignedForCBL {
14521452
addNewRevErr := doc.alignRevTreeHistoryForHLVWrite(ctx, db, opts.NewDoc, opts.RevTreeHistory, true)
14531453
if addNewRevErr != nil {
14541454
return nil, nil, false, nil, addNewRevErr
@@ -1463,7 +1463,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
14631463

14641464
// generate rev id for new arriving doc
14651465
var newRev string
1466-
if !opts.AlignRevTrees {
1466+
if !opts.ISGRWrite {
14671467
// create a new revID for incoming write
14681468
strippedBody, _ := StripInternalProperties(opts.NewDoc._body)
14691469
encoding, err := base.JSONMarshalCanonical(strippedBody)
@@ -1521,7 +1521,7 @@ type PutDocOptions struct {
15211521
ConflictResolver *ConflictResolver // If provided, will be used to resolve conflicts if NoConflicts is false and a conflict is detected
15221522
ExistingDoc *sgbucket.BucketDocument // optional, prevents fetching the document from the bucket
15231523
NewDocHLV *HybridLogicalVector // incoming doc HLV if known
1524-
AlignRevTrees bool // if true, the rev tree history is from ISGR only revTree replication message property (> v4 protocol only) and will use this to align rev trees
1524+
ISGRWrite bool // if true, the write is from ISGR and will use rev tree history to align rev trees
15251525
}
15261526

15271527
// PutExistingRevWithConflictResolution adds an existing revision to a document along with its history.
@@ -3455,38 +3455,42 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un
34553455
func legacyRevToHybridLogicalVector(docID, revID string) (hlv *HybridLogicalVector, err error) {
34563456
version, err := LegacyRevToRevTreeEncodedVersion(revID)
34573457
if err != nil {
3458-
return nil, fmt.Errorf("error parsing legacy revID %q to version for doc %s: %v", revID, base.UD(docID), err)
3458+
return nil, base.RedactErrorf("error parsing legacy revID %q to version for doc %s: %v", revID, base.UD(docID), err)
34593459
}
34603460
hlv = NewHybridLogicalVector()
34613461
err = hlv.AddVersion(version)
34623462
if err != nil {
3463-
return nil, fmt.Errorf("error adding version to hlv for doc %s: %v", base.UD(docID), err)
3463+
return nil, base.RedactErrorf("error adding version to hlv for doc %s: %v", base.UD(docID), err)
34643464
}
34653465
return hlv, nil
34663466
}
34673467

34683468
// parseIncomingChange will parse incoming change version. If the change is legacy rev it will convert the revID to a CV
3469-
// otherwise it will parse the version string.
3470-
func parseIncomingChange(docid, rev string) (cvValue Version, err error) {
3469+
// otherwise it will parse the version string and will mark change as legacy rev.
3470+
func parseIncomingChange(docid, rev string) (cvValue Version, legacyRev bool, err error) {
34713471
if base.IsRevTreeID(rev) {
34723472
cvValue, err = LegacyRevToRevTreeEncodedVersion(rev)
34733473
if err != nil {
3474-
return Version{}, base.RedactErrorf("error parsing legacy revID %q to version for doc %s: %v", base.UD(rev), base.UD(docid), err)
3474+
return Version{}, legacyRev, base.RedactErrorf("error parsing legacy revID %q to version for doc %s: %v", base.UD(rev), base.UD(docid), err)
34753475
}
3476+
legacyRev = true
34763477
} else {
34773478
cvValue, err = ParseVersion(rev)
34783479
if err != nil {
3479-
return Version{}, base.RedactErrorf("error parsing change version for doc %s: %v", base.UD(docid), err)
3480+
return Version{}, legacyRev, base.RedactErrorf("error parsing change version for doc %s: %v", base.UD(docid), err)
34803481
}
34813482
}
3482-
return cvValue, nil
3483+
return cvValue, legacyRev, nil
34833484
}
34843485

34853486
func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, docid, rev string) (missing, possible []string) {
34863487
if strings.HasPrefix(docid, "_design/") && db.user != nil {
34873488
return // Users can't upload design docs, so ignore them
34883489
}
34893490

3491+
localIsLegacy := false
3492+
changeIsLegacy := false
3493+
34903494
syncData, hlv, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalSync)
34913495
if err != nil {
34923496
if !base.IsDocNotFoundError(err) && !base.IsXattrNotFoundError(err) {
@@ -3503,10 +3507,15 @@ func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, do
35033507
return
35043508
}
35053509
}
3510+
if hlv.HasRevEncodedCV() {
3511+
// if local hlv has revID encoded CV given to it mark it as legacy change for the purposes of sending back known revs
3512+
// in legacy format. This will ensure delta sync works correctly replicating legacy revs
3513+
localIsLegacy = true
3514+
}
35063515
// parse in coming version, if it's not known to local doc hlv then it is marked as missing, if it is and is a newer version
35073516
// then it is also marked as missing
35083517
var cvValue Version
3509-
cvValue, err = parseIncomingChange(docid, rev)
3518+
cvValue, changeIsLegacy, err = parseIncomingChange(docid, rev)
35103519
if err != nil {
35113520
base.WarnfCtx(ctx, "%s", err)
35123521
missing = append(missing, rev)
@@ -3520,8 +3529,11 @@ func (db *DatabaseCollectionWithUser) CheckChangeVersion(ctx context.Context, do
35203529

35213530
// return the local current rev as known rev, this will mean if you have rev 1,2,3 and remote has rev 1,2,3,4,5 then
35223531
// remote should only send rev 4,5 in rev tree property on the subsequent rev message for this document, we also need to
3523-
// send cv as first element for delta sync purposes
3524-
possible = append(possible, hlv.GetCurrentVersionString())
3532+
// send cv as first element for delta sync purposes. Only send CV if this is not legacy rev change
3533+
if !localIsLegacy && !changeIsLegacy {
3534+
// we should only send CV in response when we are communicating with HLV's both sides of the replication
3535+
possible = append(possible, hlv.GetCurrentVersionString())
3536+
}
35253537
possible = append(possible, syncData.GetRevTreeID())
35263538

35273539
missing = append(missing, rev)

db/revision_cache_lru.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,13 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache
727727
if hlv != nil {
728728
value.cv = *hlv.ExtractCurrentVersionFromHLV()
729729
value.hlvHistory = hlv.ToHistoryForHLV()
730+
} else if value.err == nil {
731+
// if hlv is nil its a legacy rev, we need to create a CV from the revID
732+
encodedCV, err := LegacyRevToRevTreeEncodedVersion(value.revID)
733+
if err != nil {
734+
return docRev, false, err
735+
}
736+
value.cv = encodedCV
730737
}
731738
}
732739
}

rest/replicatortest/replicator_test_legacy_rev_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,3 +1094,184 @@ func TestActiveReplicatorConflictPreUpgradedVersionOneSide(t *testing.T) {
10941094
})
10951095
}
10961096
}
1097+
1098+
func TestActiveReplicatorDeltaSyncWhenBothSidesLegacy(t *testing.T) {
1099+
base.RequireNumTestBuckets(t, 2)
1100+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)
1101+
1102+
if !base.IsEnterpriseEdition() {
1103+
t.Skip("Delta sync only supported in EE")
1104+
}
1105+
1106+
const username = "alice"
1107+
1108+
// Passive (SGW2 in diagram above)
1109+
rt2 := rest.NewRestTester(t,
1110+
&rest.RestTesterConfig{
1111+
SyncFn: channels.DocChannelsSyncFunction,
1112+
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
1113+
Name: "passivedb",
1114+
DeltaSync: &rest.DeltaSyncConfig{
1115+
Enabled: base.Ptr(true),
1116+
},
1117+
}},
1118+
})
1119+
defer rt2.Close()
1120+
1121+
rt2.CreateUser(username, []string{username})
1122+
1123+
// Active (SGW1 in diagram above)
1124+
rt1 := rest.NewRestTester(t,
1125+
&rest.RestTesterConfig{
1126+
SyncFn: channels.DocChannelsSyncFunction,
1127+
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
1128+
Name: "activedb",
1129+
DeltaSync: &rest.DeltaSyncConfig{
1130+
Enabled: base.Ptr(true),
1131+
},
1132+
}},
1133+
})
1134+
defer rt1.Close()
1135+
ctx1 := rt1.Context()
1136+
1137+
docIDToPush := rest.SafeDocumentName(t, t.Name()+"_push")
1138+
1139+
// create doc on rt1 with one revision
1140+
bodyRT1 := db.Body{"channels": []string{username}, "source": "rt1"}
1141+
rt1InitDoc := rt1.CreateDocNoHLV(docIDToPush, bodyRT1)
1142+
legacyInitRevRt1 := rt1InitDoc.GetRevTreeID()
1143+
// create another rev to ensure we have a rev to delta from
1144+
bodyRT1 = db.Body{db.BodyRev: legacyInitRevRt1, "channels": []string{username}, "source": "rt1"}
1145+
rt1InitDoc = rt1.CreateDocNoHLV(docIDToPush, bodyRT1)
1146+
legacyRevRt1 := rt1InitDoc.GetRevTreeID()
1147+
1148+
// create rev on rt2 that will resolve to same revID as rev one above simulating the following:
1149+
// 1. doc created on rt1, pushed to rt2
1150+
// 2. doc updated on rt1 to create rev2, but upgrade happens before being pushed to rt2
1151+
// 3. doc is pushed post upgrade to rt2 and the delta from rev1 to rev2 is sent
1152+
bodyRT2 := db.Body{"channels": []string{username}, "source": "rt1"}
1153+
rt2InitDoc := rt2.CreateDocNoHLV(docIDToPush, bodyRT2)
1154+
legacyRevRt2 := rt2InitDoc.GetRevTreeID()
1155+
1156+
require.Equal(t, legacyInitRevRt1, legacyRevRt2)
1157+
1158+
stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil)
1159+
require.NoError(t, err)
1160+
replicationStats, err := stats.DBReplicatorStats(t.Name())
1161+
require.NoError(t, err)
1162+
1163+
ar, err := db.NewActiveReplicator(ctx1, &db.ActiveReplicatorConfig{
1164+
ID: t.Name(),
1165+
Direction: db.ActiveReplicatorTypePush,
1166+
RemoteDBURL: userDBURL(rt2, username),
1167+
ActiveDB: &db.Database{
1168+
DatabaseContext: rt1.GetDatabase(),
1169+
},
1170+
ChangesBatchSize: 200,
1171+
Continuous: true,
1172+
ReplicationStatsMap: replicationStats,
1173+
CollectionsEnabled: !rt1.GetDatabase().OnlyDefaultCollection(),
1174+
DeltasEnabled: true,
1175+
})
1176+
require.NoError(t, err)
1177+
defer func() {
1178+
require.NoError(t, ar.Stop())
1179+
}()
1180+
1181+
// Start the replicator
1182+
require.NoError(t, ar.Start(ctx1))
1183+
1184+
rt2.WaitForLegacyRev(docIDToPush, legacyRevRt1, []byte(`{"source":"rt1","channels":["alice"]}`))
1185+
1186+
base.RequireWaitForStat(t, func() int64 {
1187+
return replicationStats.PushDeltaSentCount.Value()
1188+
}, 1)
1189+
}
1190+
1191+
func TestDeltaSyncWhenOneSideHasEncodedCV(t *testing.T) {
1192+
base.RequireNumTestBuckets(t, 2)
1193+
1194+
if !base.IsEnterpriseEdition() {
1195+
t.Skip("Delta sync only supported in EE")
1196+
}
1197+
1198+
const username = "alice"
1199+
1200+
// Passive (SGW2 in diagram above)
1201+
rt2 := rest.NewRestTester(t,
1202+
&rest.RestTesterConfig{
1203+
SyncFn: channels.DocChannelsSyncFunction,
1204+
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
1205+
Name: "passivedb",
1206+
DeltaSync: &rest.DeltaSyncConfig{
1207+
Enabled: base.Ptr(true),
1208+
},
1209+
}},
1210+
})
1211+
defer rt2.Close()
1212+
1213+
rt2.CreateUser(username, []string{username})
1214+
1215+
// Active (SGW1 in diagram above)
1216+
rt1 := rest.NewRestTester(t,
1217+
&rest.RestTesterConfig{
1218+
SyncFn: channels.DocChannelsSyncFunction,
1219+
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
1220+
Name: "activedb",
1221+
DeltaSync: &rest.DeltaSyncConfig{
1222+
Enabled: base.Ptr(true),
1223+
},
1224+
}},
1225+
})
1226+
defer rt1.Close()
1227+
ctx1 := rt1.Context()
1228+
1229+
docIDToPush := rest.SafeDocumentName(t, t.Name()+"_push")
1230+
1231+
// create doc on rt1 with one revision
1232+
bodyRT1 := db.Body{"channels": []string{username}, "source": "rt1"}
1233+
rt1InitDoc := rt1.CreateDocNoHLV(docIDToPush, bodyRT1)
1234+
legacyInitRevRt1 := rt1InitDoc.GetRevTreeID()
1235+
1236+
stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil)
1237+
require.NoError(t, err)
1238+
replicationStats, err := stats.DBReplicatorStats(t.Name())
1239+
require.NoError(t, err)
1240+
1241+
ar, err := db.NewActiveReplicator(ctx1, &db.ActiveReplicatorConfig{
1242+
ID: t.Name(),
1243+
Direction: db.ActiveReplicatorTypePush,
1244+
RemoteDBURL: userDBURL(rt2, username),
1245+
ActiveDB: &db.Database{
1246+
DatabaseContext: rt1.GetDatabase(),
1247+
},
1248+
ChangesBatchSize: 200,
1249+
Continuous: true,
1250+
ReplicationStatsMap: replicationStats,
1251+
CollectionsEnabled: !rt1.GetDatabase().OnlyDefaultCollection(),
1252+
DeltasEnabled: true,
1253+
})
1254+
require.NoError(t, err)
1255+
defer func() {
1256+
require.NoError(t, ar.Stop())
1257+
}()
1258+
1259+
// Start the replicator
1260+
require.NoError(t, ar.Start(ctx1))
1261+
1262+
rt2.WaitForLegacyRev(docIDToPush, legacyInitRevRt1, []byte(`{"source":"rt1","channels":["alice"]}`))
1263+
1264+
// flush revision cache to remove old reference to rev 1 in rev cache
1265+
rt1.GetDatabase().FlushRevisionCacheForTest()
1266+
1267+
// update doc on rt1 to create a second revision with HLV
1268+
// This should:
1269+
// 1. update doc on rt1 to give HLV based of rt1 sourceID
1270+
// 2. push doc to rt2 with delta from rev1 to rev2
1271+
upgradeVersion := rt1.UpdateDoc(docIDToPush, db.DocVersion{RevTreeID: legacyInitRevRt1}, `{"channels": ["alice"], "source": "rt1-updated"}`)
1272+
rt1.WaitForVersion(docIDToPush, upgradeVersion)
1273+
1274+
base.RequireWaitForStat(t, func() int64 {
1275+
return replicationStats.PushDeltaSentCount.Value()
1276+
}, 1)
1277+
}

0 commit comments

Comments
 (0)