Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
dario.cat/mergo v1.0.0
github.com/KimMachineGun/automemlimit v0.7.0
github.com/coreos/go-oidc/v3 v3.12.0
github.com/couchbase/cbgt v1.3.9
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd
github.com/couchbase/gocb/v2 v2.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79
github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU=
github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg=
github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg=
github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE=
github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq513eSm+k+Vih+eSMHZKjQJ5JawMuRs=
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY=
Expand Down
121 changes: 121 additions & 0 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,127 @@ func TestImportRollback(t *testing.T) {
}
}

// TestImportRollbackMultiplePartitions:
// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back
// - Test case rollbackWithoutFailover will only rollback one partition
func TestImportRollbackMultiplePartitions(t *testing.T) {
if !base.IsEnterpriseEdition() {
t.Skip("This test only works against EE")
}

if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing")
}

base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster)
ctx := base.TestCtx(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})

// create doc id's for vb0 and vb800
vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"}
vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"}

for _, v := range vb0DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}

// wait for docs to be imported
changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true)
require.NoError(t, err)
lastSeq := changes.Last_Seq.String()

// Close db while we alter checkpoints to force rollback
db := rt.GetDatabase()
checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion)
rt.Close()

metaStore := bucket.GetMetadataStore()
// fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to
// trigger rollback upon stream open request
checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0)
var checkpointData base.ShardedImportDCPMetadata
checkpointBytes, _, err := metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID := checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err := base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// vBucket 800
checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800)
checkpointData = base.ShardedImportDCPMetadata{}
checkpointBytes, _, err = metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID = checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err = base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// Reopen the db, expect DCP rollback
rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})
defer rt2.Close()

for _, v := range vb0DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}

// Add doc to non rolled back vBucket (392) and assert its imported
added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
require.True(t, added)

// wait for doc update to be imported
_, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true)
require.NoError(t, err)
}

func TestImportUpdateExpiry(t *testing.T) {
testCases := []struct {
name string
Expand Down