Skip to content

Commit b85e507

Browse files
committed
CBG-3690 do not re-read document for resync
1 parent 693e4b5 commit b85e507

File tree

9 files changed

+117
-109
lines changed

9 files changed

+117
-109
lines changed

base/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ const (
133133

134134
// VirtualXattrRevSeqNo is used to fetch rev seq no from documents virtual xattr
135135
VirtualXattrRevSeqNo = "$document.revid"
136+
137+
// VirtualExpiry is used to fetch the expiry from documents
138+
VirtualExpiry = "$document.exptime"
139+
136140
// VirtualDocumentXattr is used to fetch the documents virtual xattr
137141
VirtualDocumentXattr = "$document"
138142

db/background_mgr_resync_dcp.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,15 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
124124

125125
callback := func(event sgbucket.FeedEvent) bool {
126126
docID := string(event.Key)
127-
key := realDocID(docID)
128127
base.TracefCtx(ctx, base.KeyAll, "[%s] Received DCP event %d for doc %v", resyncLoggingID, event.Opcode, base.UD(docID))
129128

130129
// Ignore documents without xattrs if possible, to avoid processing unnecessary documents
131130
if r.useXattrs && event.DataType&base.MemcachedDataTypeXattr == 0 {
132131
return true
133132
}
134133
// Don't want to process raw binary docs
135-
// The binary check should suffice but for additional safety also check for empty bodies
134+
// The binary check should suffice but for additional safety also check for empty bodies. This will also avoid
135+
// processing tombstones.
136136
if event.DataType == base.MemcachedDataTypeRaw || len(event.Value) == 0 {
137137
return true
138138
}
@@ -147,9 +147,14 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
147147
databaseCollection := db.CollectionByID[event.CollectionID]
148148
databaseCollection.collectionStats.ResyncNumProcessed.Add(1)
149149
collectionCtx := databaseCollection.AddCollectionContext(ctx)
150-
_, unusedSequences, err := (&DatabaseCollectionWithUser{
150+
doc, err := bucketDocumentFromFeed(event)
151+
if err != nil {
152+
base.WarnfCtx(collectionCtx, "[%s] Error getting document from DCP event for doc %q: %v", resyncLoggingID, base.UD(docID), err)
153+
return false
154+
}
155+
unusedSequences, err := (&DatabaseCollectionWithUser{
151156
DatabaseCollection: databaseCollection,
152-
}).ResyncDocument(collectionCtx, docID, key, regenerateSequences, []uint64{})
157+
}).ResyncDocument(collectionCtx, docID, doc, regenerateSequences)
153158

154159
databaseCollection.releaseSequences(collectionCtx, unusedSequences)
155160

@@ -159,6 +164,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
159164
databaseCollection.collectionStats.ResyncNumChanged.Add(1)
160165
} else if err != base.ErrUpdateCancel {
161166
base.WarnfCtx(collectionCtx, "[%s] Error updating doc %q: %v", resyncLoggingID, base.UD(docID), err)
167+
return false
162168
}
163169
return true
164170
}

db/crud.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2724,7 +2724,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
27242724
if expiry != nil {
27252725
initialExpiry = *expiry
27262726
}
2727-
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
2727+
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
27282728
// Be careful: this block can be invoked multiple times if there are races!
27292729
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
27302730
return

db/database.go

Lines changed: 47 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1824,96 +1824,67 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
18241824

18251825
// ResyncDocument will re-run the sync function on the document and write an updated version to the bucket. If
18261826
// the sync function doesn't change any channels or access grants, no write will be performed.
1827-
func (db *DatabaseCollectionWithUser) ResyncDocument(ctx context.Context, docid, key string, regenerateSequences bool, unusedSequences []uint64) (updatedHighSeq uint64, updatedUnusedSequences []uint64, err error) {
1827+
func (db *DatabaseCollectionWithUser) ResyncDocument(ctx context.Context, docid string, previousDoc *sgbucket.BucketDocument, regenerateSequences bool) (updatedUnusedSequences []uint64, err error) {
18281828
var updatedDoc *Document
18291829
var shouldUpdate bool
18301830
var updatedExpiry *uint32
1831-
if db.UseXattrs() {
1832-
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
1833-
// There's no scenario where a doc should from non-deleted to deleted during UpdateAllDocChannels processing,
1834-
// so deleteDoc is always returned as false.
1835-
if currentValue == nil || len(currentValue) == 0 {
1836-
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
1837-
}
1838-
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
1839-
if err != nil {
1840-
return sgbucket.UpdatedDoc{}, err
1841-
}
1842-
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
1843-
if err != nil {
1844-
return sgbucket.UpdatedDoc{}, err
1845-
}
1846-
if !shouldUpdate {
1847-
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
1848-
}
1849-
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
1850-
if updatedExpiry != nil {
1851-
updatedDoc.UpdateExpiry(*updatedExpiry)
1852-
}
1853-
doc.SetCrc32cUserXattrHash()
1854-
1855-
// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
1856-
if db.useMou() {
1857-
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
1858-
}
1859-
1860-
_, rawSyncXattr, _, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
1861-
updatedDoc := sgbucket.UpdatedDoc{
1862-
Doc: nil, // Resync does not require document body update
1863-
Xattrs: map[string][]byte{
1864-
base.SyncXattrName: rawSyncXattr,
1865-
},
1866-
Expiry: updatedExpiry,
1867-
}
1868-
if db.useMou() {
1869-
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
1870-
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
1871-
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
1872-
}
1873-
}
1874-
if rawGlobalXattr != nil {
1875-
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
1876-
}
1877-
return updatedDoc, err
1831+
writeUpdateFunc := func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (sgbucket.UpdatedDoc, error) {
1832+
// resyncDocument is not called on tombstoned documents, so this value will only be empty if the document was
1833+
// deleted between DCP event and calling this function. In any case, we do not need to update it.
1834+
if len(currentValue) == 0 {
1835+
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
18781836
}
1879-
opts := &sgbucket.MutateInOptions{
1880-
MacroExpansion: macroExpandSpec(base.SyncXattrName),
1837+
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
1838+
if err != nil {
1839+
return sgbucket.UpdatedDoc{}, err
18811840
}
1882-
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
1883-
} else {
1884-
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
1885-
// Be careful: this block can be invoked multiple times if there are races!
1886-
if currentValue == nil {
1887-
return nil, nil, false, base.ErrUpdateCancel // someone deleted it?!
1888-
}
1889-
doc, err := unmarshalDocument(docid, currentValue)
1890-
if err != nil {
1891-
return nil, nil, false, err
1892-
}
1893-
updatedDoc, shouldUpdate, updatedExpiry, updatedHighSeq, unusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, unusedSequences)
1894-
if err != nil {
1895-
return nil, nil, false, err
1896-
}
1897-
if shouldUpdate {
1898-
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
1899-
if updatedExpiry != nil {
1900-
updatedDoc.UpdateExpiry(*updatedExpiry)
1901-
}
1841+
fmt.Printf("%+v\n", doc)
1842+
updatedDoc, shouldUpdate, updatedExpiry, _, updatedUnusedSequences, err = db.getResyncedDocument(ctx, doc, regenerateSequences, nil)
1843+
if err != nil {
1844+
return sgbucket.UpdatedDoc{}, err
1845+
}
1846+
if !shouldUpdate {
1847+
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
1848+
}
1849+
base.TracefCtx(ctx, base.KeyAccess, "Saving updated channels and access grants of %q", base.UD(docid))
1850+
if updatedExpiry != nil {
1851+
updatedDoc.UpdateExpiry(*updatedExpiry)
1852+
}
1853+
doc.SetCrc32cUserXattrHash()
19021854

1903-
updatedBytes, marshalErr := base.JSONMarshal(updatedDoc)
1904-
return updatedBytes, updatedExpiry, false, marshalErr
1905-
} else {
1906-
return nil, nil, false, base.ErrUpdateCancel
1855+
// Update MetadataOnlyUpdate based on previous Cas, MetadataOnlyUpdate
1856+
doc.MetadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.MetadataOnlyUpdate)
1857+
fmt.Printf("updatedDoc= %+v\n", updatedDoc)
1858+
_, rawSyncXattr, _, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
1859+
updatedDoc := sgbucket.UpdatedDoc{
1860+
Doc: nil, // Resync does not require document body update
1861+
Xattrs: map[string][]byte{
1862+
base.SyncXattrName: rawSyncXattr,
1863+
base.MouXattrName: rawMouXattr,
1864+
},
1865+
Expiry: updatedExpiry,
1866+
}
1867+
if doc.MetadataOnlyUpdate != nil {
1868+
if doc.MetadataOnlyUpdate.HexCAS == expandMacroCASValueString {
1869+
updatedDoc.Spec = append(updatedDoc.Spec, sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas))
19071870
}
1908-
})
1871+
}
1872+
if rawGlobalXattr != nil {
1873+
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
1874+
}
1875+
return updatedDoc, err
1876+
}
1877+
opts := &sgbucket.MutateInOptions{
1878+
MacroExpansion: macroExpandSpec(base.SyncXattrName),
19091879
}
1880+
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, docid, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, previousDoc, opts, writeUpdateFunc)
19101881
if err == nil {
19111882
base.Audit(ctx, base.AuditIDDocumentResync, base.AuditFields{
19121883
base.AuditFieldDocID: docid,
19131884
base.AuditFieldDocVersion: updatedDoc.CVOrRevTreeID(),
19141885
})
19151886
}
1916-
return updatedHighSeq, unusedSequences, err
1887+
return updatedUnusedSequences, err
19171888
}
19181889

19191890
// invalidateAllPrincipals invalidates computed channels and roles for all users/roles, for the specified collections:

db/database_collection.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,20 @@ func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
323323
return xattrKeys
324324
}
325325

326+
// syncGlobalSyncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs.
327+
func (c *DatabaseCollection) syncGlobalSyncMouAndUserXattrKeys() []string {
328+
xattrKeys := []string{base.SyncXattrName, base.VvXattrName,
329+
base.MouXattrName, base.GlobalXattrName}
330+
userXattrKey := c.userXattrKey()
331+
if userXattrKey != "" {
332+
xattrKeys = append(xattrKeys, userXattrKey)
333+
}
334+
return xattrKeys
335+
}
336+
326337
// syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
327338
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
328-
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
339+
xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.VirtualXattrRevSeqNo}
329340
if c.useMou() {
330341
xattrKeys = append(xattrKeys, base.MouXattrName, base.GlobalXattrName)
331342
}

db/database_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3918,12 +3918,13 @@ func Test_resyncDocument(t *testing.T) {
39183918
_, err = collection.UpdateSyncFun(ctx, syncFn)
39193919
require.NoError(t, err)
39203920

3921-
preResyncDoc, err := collection.GetDocument(ctx, docID, DocUnmarshalAll)
3921+
preResyncDoc, _, err := collection.getDocWithXattrs(ctx, docID, collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), DocUnmarshalAll)
39223922
require.NoError(t, err)
39233923
if !tc.useHLV {
39243924
require.Nil(t, preResyncDoc.HLV)
39253925
}
3926-
_, _, err = collection.ResyncDocument(ctx, docID, realDocID(docID), false, []uint64{10})
3926+
3927+
_, err = collection.ResyncDocument(ctx, docID, getBucketDocument(t, collection.DatabaseCollection, docID), false)
39273928
require.NoError(t, err)
39283929
err = collection.WaitForPendingChanges(ctx)
39293930
require.NoError(t, err)
@@ -3943,7 +3944,6 @@ func Test_resyncDocument(t *testing.T) {
39433944
}
39443945
assert.True(t, found)
39453946

3946-
require.NoError(t, err)
39473947
if tc.useHLV {
39483948
require.NotNil(t, postResyncDoc.HLV)
39493949
require.Equal(t, Version{
@@ -3953,6 +3953,7 @@ func Test_resyncDocument(t *testing.T) {
39533953
SourceID: postResyncDoc.HLV.SourceID,
39543954
Value: postResyncDoc.HLV.Version,
39553955
})
3956+
assert.Equal(t, preResyncDoc.Cas, postResyncDoc.HLV.CurrentVersionCAS)
39563957
} else {
39573958
require.Nil(t, postResyncDoc.HLV)
39583959
}

db/document.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -612,21 +612,6 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
612612
return rawDoc, syncData, nil
613613
}
614614

615-
func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) {
616-
if dataType&base.MemcachedDataTypeXattr == 0 {
617-
return unmarshalDocument(docid, data)
618-
}
619-
xattrKeys := []string{base.SyncXattrName}
620-
if userXattrKey != "" {
621-
xattrKeys = append(xattrKeys, userXattrKey)
622-
}
623-
body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data)
624-
if err != nil {
625-
return nil, err
626-
}
627-
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll)
628-
}
629-
630615
func (doc *SyncData) HasValidSyncData() bool {
631616

632617
valid := doc != nil && doc.GetRevTreeID() != "" && (doc.Sequence > 0)
@@ -1578,3 +1563,18 @@ func (d DocVersion) CVOrRevTreeID() string {
15781563
}
15791564
return d.RevTreeID
15801565
}
1566+
1567+
// bucketDocumentFromFeed converts a sgbucket.FeedEvent to a sgbucket.BucketDocument
1568+
func bucketDocumentFromFeed(event sgbucket.FeedEvent) (*sgbucket.BucketDocument, error) {
1569+
body, xattrs, err := sgbucket.DecodeValueWithAllXattrs(event.Value)
1570+
if err != nil {
1571+
return nil, err
1572+
}
1573+
return &sgbucket.BucketDocument{
1574+
Body: body,
1575+
Xattrs: xattrs,
1576+
Cas: event.Cas,
1577+
Expiry: event.Expiry,
1578+
IsTombstone: event.Opcode == sgbucket.FeedOpDeletion,
1579+
}, nil
1580+
}

db/import_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -477,14 +477,9 @@ func TestImportWithStaleBucketDocCorrectExpiry(t *testing.T) {
477477
assert.NoError(t, err, "Error writing doc w/ expiry")
478478

479479
// Get the existing bucket doc
480-
_, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, key, DocUnmarshalAll)
481-
assert.NoError(t, err, fmt.Sprintf("Error retrieving doc w/ xattr: %v", err))
482-
483-
body = Body{}
484-
err = body.Unmarshal(existingBucketDoc.Body)
485-
assert.NoError(t, err, "Error unmarshalling body")
480+
existingBucketDoc := getBucketDocument(t, collection.DatabaseCollection, key)
481+
require.NoError(t, err)
486482

487-
// Set the expiry value
488483
syncMetaExpiryUnix := syncMetaExpiry.Unix()
489484
expiry := uint32(syncMetaExpiryUnix)
490485

@@ -1494,3 +1489,24 @@ func TestImportWithSyncCVAndNoVV(t *testing.T) {
14941489
base.RequireWaitForStat(t, db.DbStats.Database().Crc32MatchCount.Value, 1)
14951490

14961491
}
1492+
1493+
func getBucketDocument(t *testing.T, collection *DatabaseCollection, docID string) *sgbucket.BucketDocument {
1494+
ctx := base.TestCtx(t)
1495+
xattrNames := append(collection.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), base.VirtualExpiry)
1496+
body, xattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, docID, xattrNames)
1497+
require.NoError(t, err)
1498+
var expiry uint32
1499+
if expiryBytes, ok := xattrs[base.VirtualExpiry]; ok {
1500+
err := base.JSONUnmarshal(expiryBytes, &expiry)
1501+
require.NoError(t, err)
1502+
delete(xattrs, base.VirtualExpiry)
1503+
}
1504+
1505+
return &sgbucket.BucketDocument{
1506+
Body: body,
1507+
Xattrs: xattrs,
1508+
Cas: cas,
1509+
Expiry: expiry,
1510+
IsTombstone: len(body) == 0,
1511+
}
1512+
}

rest/legacy_rev_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ func TestResyncLegacyRev(t *testing.T) {
5555
// use ResyncDocument and TakeDbOffline/Online instead of /ks/_config/sync && /db/_resync to work under rosmar which
5656
// doesn't yet support DCP resync or updating config on an existing bucket.
5757
regenerateSequences := false
58-
var unusedSequences []uint64
59-
_, _, err = collection.ResyncDocument(ctx, docID, docID, regenerateSequences, unusedSequences)
58+
_, err = collection.ResyncDocument(ctx, docID, nil, regenerateSequences)
6059
require.NoError(t, err)
6160

6261
rt.TakeDbOffline()

0 commit comments

Comments
 (0)