Skip to content

Commit 8af2809

Browse files
authored
CBG-3811 do not assert on processed DCP docs (#7622)
1 parent 5fdc1bb commit 8af2809

File tree

6 files changed

+79
-20
lines changed

6 files changed

+79
-20
lines changed

db/attachment_compaction.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,10 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,
322322
return true
323323
}
324324

325+
// deletion events are not relevant
326+
if event.Opcode != sgbucket.FeedOpMutation {
327+
return true
328+
}
325329
// If the data contains an xattr then the attachment likely has a compaction ID, need to check this value
326330
if event.DataType&base.MemcachedDataTypeXattr != 0 {
327331

@@ -357,7 +361,10 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,
357361
base.TracefCtx(ctx, base.KeyAll, "[%s] Purging attachment %s", compactionLoggingID, base.UD(docID))
358362
_, err := dataStore.Remove(docID, event.Cas)
359363
if err != nil {
360-
base.WarnfCtx(ctx, "[%s] Unable to purge attachment %s: %v", compactionLoggingID, base.UD(docID), err)
364+
if !base.IsDocNotFoundError(err) {
365+
// attachment was removed separately, we don't need to worry about it
366+
base.WarnfCtx(ctx, "[%s] Unable to purge attachment %s: %v", compactionLoggingID, base.UD(docID), err)
367+
}
361368
return true
362369
}
363370
base.DebugfCtx(ctx, base.KeyAll, "[%s] Purged attachment %s", compactionLoggingID, base.UD(docID))

rest/adminapitest/admin_api_test.go

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -729,8 +729,18 @@ func TestResyncUsingDCPStream(t *testing.T) {
729729
resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
730730

731731
assert.Equal(t, testCase.docsCreated, int(rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()))
732-
733-
assert.Equal(t, testCase.docsCreated, int(resyncManagerStatus.DocsProcessed))
732+
if !base.UnitTestUrlIsWalrus() && !base.TestsDisableGSI() {
733+
// It is possible for Couchbase Server GSI runs which use DCP purge to two DCP events from a previous
734+
// test.
735+
// 1. doc1 mutation
736+
// 2. doc1 deletion
737+
//
738+
// In a test, these will not be resynced but docsProcessed is incremented. Relax
739+
// the assertion to greater than the number of documents.
740+
assert.GreaterOrEqual(t, int(resyncManagerStatus.DocsProcessed), testCase.docsCreated)
741+
} else {
742+
assert.Equal(t, testCase.docsCreated, int(resyncManagerStatus.DocsProcessed))
743+
}
734744
assert.Equal(t, 0, int(resyncManagerStatus.DocsChanged))
735745
})
736746
}
@@ -784,8 +794,19 @@ func TestResyncUsingDCPStreamReset(t *testing.T) {
784794
assert.NotEqual(t, resyncID, resyncManagerStatus.ResyncID)
785795

786796
resyncManagerStatus = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
787-
assert.Equal(t, int64(numDocs), resyncManagerStatus.DocsProcessed)
797+
if !base.UnitTestUrlIsWalrus() && !base.TestsDisableGSI() {
798+
// It is possible for Couchbase Server GSI runs which use DCP purge to two DCP events from a previous
799+
// test.
800+
// 1. doc1 mutation
801+
// 2. doc1 deletion
802+
//
803+
// In a test, these will not be resynced but docsProcessed is incremented. Relax
804+
// the assertion to greater than the number of documents.
805+
assert.GreaterOrEqual(t, int(resyncManagerStatus.DocsProcessed), numDocs)
806+
} else {
788807

808+
assert.Equal(t, int64(numDocs), resyncManagerStatus.DocsProcessed)
809+
}
789810
}
790811

791812
func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) {
@@ -804,12 +825,14 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) {
804825

805826
rt := rest.NewRestTesterMultipleCollections(t,
806827
&rest.RestTesterConfig{
807-
SyncFn: syncFn,
828+
SyncFn: syncFn,
829+
PersistentConfig: true,
808830
},
809831
numCollections,
810832
)
811833
defer rt.Close()
812834

835+
rest.RequireStatus(t, rt.CreateDatabase("db", rt.NewDbConfig()), http.StatusCreated)
813836
// put a docs in both collections
814837
for i := 1; i <= 10; i++ {
815838
resp := rt.SendAdminRequest(http.MethodPut, fmt.Sprintf("/{{.keyspace1}}/1000%d", i), `{"type":"test_doc"}`)
@@ -821,6 +844,8 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) {
821844

822845
rt.TakeDbOffline()
823846

847+
rest.RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace1}}/_config/sync", `function(doc){channel("A")}`), http.StatusOK)
848+
rest.RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace2}}/_config/sync", `function(doc){channel("A")}`), http.StatusOK)
824849
dataStore1, err := rt.TestBucket.GetNamedDataStore(0)
825850
require.NoError(t, err)
826851
// Run resync for single collection // Request body {"scopes": "scopeName": ["collection1Name", "collection2Name"]}}
@@ -833,16 +858,17 @@ func TestResyncUsingDCPStreamForNamedCollection(t *testing.T) {
833858
rest.RequireStatus(t, resp, http.StatusOK)
834859
resyncManagerStatus := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
835860

836-
assert.Equal(t, 0, int(resyncManagerStatus.DocsChanged))
837-
assert.LessOrEqual(t, 10, int(resyncManagerStatus.DocsProcessed))
861+
assert.Equal(t, 10, int(resyncManagerStatus.DocsChanged))
862+
863+
rest.RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace1}}/_config/sync", `function(doc){channel("B")}`), http.StatusOK)
864+
rest.RequireStatus(t, rt.SendAdminRequest(http.MethodPut, "/{{.keyspace2}}/_config/sync", `function(doc){channel("B")}`), http.StatusOK)
838865

839866
// Run resync for all collections
840867
resp = rt.SendAdminRequest("POST", "/db/_resync?action=start", "")
841868
rest.RequireStatus(t, resp, http.StatusOK)
842869
resyncManagerStatus = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
843870

844-
assert.Equal(t, 0, int(resyncManagerStatus.DocsChanged))
845-
assert.LessOrEqual(t, 20, int(resyncManagerStatus.DocsProcessed))
871+
assert.Equal(t, 20, int(resyncManagerStatus.DocsChanged))
846872
}
847873

848874
func TestResyncErrorScenariosUsingDCPStream(t *testing.T) {
@@ -1230,6 +1256,7 @@ func TestMultipleBucketWithBadDbConfigScenario2(t *testing.T) {
12301256
// - persist that db config to another bucket
12311257
// - assert that is picked up as an invalid db config
12321258
func TestMultipleBucketWithBadDbConfigScenario3(t *testing.T) {
1259+
base.RequireNumTestBuckets(t, 2)
12331260

12341261
ctx := base.TestCtx(t)
12351262
tb1 := base.GetTestBucket(t)
@@ -3742,12 +3769,14 @@ func TestDeleteDatabaseCBGTTeardown(t *testing.T) {
37423769

37433770
func TestDatabaseCreationErrorCode(t *testing.T) {
37443771
for _, persistentConfig := range []bool{true, false} {
3745-
rt := rest.NewRestTester(t, &rest.RestTesterConfig{PersistentConfig: persistentConfig})
3746-
defer rt.Close()
3772+
t.Run(fmt.Sprintf("persistent_config=%t", persistentConfig), func(t *testing.T) {
3773+
rt := rest.NewRestTester(t, &rest.RestTesterConfig{PersistentConfig: persistentConfig})
3774+
defer rt.Close()
37473775

3748-
rt.CreateDatabase("db", rt.NewDbConfig())
3749-
resp := rt.SendAdminRequest(http.MethodPut, "/db/", `{"bucket": "irrelevant"}`)
3750-
rest.RequireStatus(t, resp, http.StatusPreconditionFailed)
3776+
rt.CreateDatabase("db", rt.NewDbConfig())
3777+
resp := rt.SendAdminRequest(http.MethodPut, "/db/", `{"bucket": "irrelevant"}`)
3778+
rest.RequireStatus(t, resp, http.StatusPreconditionFailed)
3779+
})
37513780
}
37523781
}
37533782

rest/adminapitest/resync_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ function sync(doc, oldDoc){
359359
resp = rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_resync?action=start", "")
360360
rest.RequireStatus(t, resp, http.StatusOK)
361361
status := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
362-
assert.Equal(t, int64(1), status.DocsProcessed)
363362
assert.Equal(t, int64(1), status.DocsChanged)
364363

365364
// ensure doc body remains unchanged after resync

rest/attachmentcompactiontest/attachment_compaction_api_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,19 @@ func TestAttachmentCompactionDryRun(t *testing.T) {
241241
rest.RequireStatus(t, resp, http.StatusOK)
242242
status := rt.WaitForAttachmentCompactionStatus(t, db.BackgroundProcessStateCompleted)
243243
assert.True(t, status.DryRun)
244-
assert.Equal(t, int64(5), status.PurgedAttachments)
244+
if !base.UnitTestUrlIsWalrus() && !base.TestsDisableGSI() {
245+
// It is possible for Couchbase Server GSI runs which use DCP purge to two DCP events from a previous
246+
// test.
247+
// 1. attachment mutation
248+
// 2. attachment deletion
249+
//
250+
// In a non dry run test, these will not be counted since removing the attachment will not fail. Relax
251+
// the assertion to greater than the number of documents.
252+
assert.GreaterOrEqual(t, status.PurgedAttachments, int64(5))
253+
254+
} else {
255+
assert.Equal(t, int64(5), status.PurgedAttachments)
256+
}
245257

246258
for _, docID := range attachmentKeys {
247259
_, _, err := dataStore.GetRaw(docID)

rest/sync_fn_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,18 @@ func TestResyncRegenerateSequences(t *testing.T) {
601601
}
602602

603603
assert.Equal(t, int64(12), resyncStatus.DocsChanged)
604-
assert.Equal(t, int64(12), resyncStatus.DocsProcessed)
604+
if !base.UnitTestUrlIsWalrus() && !base.TestsDisableGSI() {
605+
// It is possible for Couchbase Server GSI runs which use DCP purge to two DCP events from a previous
606+
// test.
607+
// 1. doc1 mutation
608+
// 2. doc1 deletion
609+
//
610+
// In a test, these will not be resynced but docsProcessed is incremented. Relax
611+
// the assertion to greater than the number of documents.
612+
assert.GreaterOrEqual(t, resyncStatus.DocsProcessed, int64(12))
613+
} else {
614+
assert.Equal(t, int64(12), resyncStatus.DocsProcessed)
615+
}
605616

606617
rt.TakeDbOnline()
607618

rest/utilities_testing_resttester.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,13 @@ func (rt *RestTester) WaitForResyncDCPStatus(status db.BackgroundProcessState) d
300300
var resyncStatus db.ResyncManagerResponseDCP
301301
require.EventuallyWithT(rt.TB(), func(c *assert.CollectT) {
302302
response := rt.SendAdminRequest("GET", "/{{.db}}/_resync", "")
303-
err := json.Unmarshal(response.BodyBytes(), &resyncStatus)
304-
assert.NoError(c, err)
303+
RequireStatus(rt.TB(), response, http.StatusOK)
304+
require.NoError(rt.TB(), json.Unmarshal(response.BodyBytes(), &resyncStatus))
305305

306306
assert.Equal(c, status, resyncStatus.State)
307307
if slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateCompleted, db.BackgroundProcessStateStopped}, status) {
308-
_, err = rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), nil)
308+
var output any
309+
_, err := rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), output)
309310
assert.True(c, base.IsDocNotFoundError(err), "expected heartbeat doc to be deleted, got: %v", err)
310311
}
311312
}, time.Second*10, time.Millisecond*10)

0 commit comments

Comments
 (0)