Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions db/import_pindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,28 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) {
New: getNewPIndexImplType(ctx),
Open: OpenImportPIndexImpl,
OpenUsing: getOpenImportPIndexImplUsing(ctx),
Rollback: getPIndexImplRollbackCallback(ctx),
Description: "general/syncGateway-import " +
" - import processing for shared bucket access",
})
}

func getPIndexImplRollbackCallback(ctx context.Context) func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager,
restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {

rollback := func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager,
restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) {
go mgr.JanitorKick("rollback from PIndexImpl.Rollback")
importDest, err := getListenerImportDest(ctx, indexParams, restart)
if err != nil {
base.InfofCtx(ctx, base.KeyDCP, "No importDest found for indexParams on rollback. sourceParams=%v indexParams=%v %v. Not able to start import feed", err, base.MD(sourceParams), base.MD(indexParams))
return nil, nil, err
}
return nil, importDest, nil
}
return rollback
}

// getListenerForIndex looks up the importListener for the dbName specified in the index params
func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) {

Expand Down
121 changes: 121 additions & 0 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,127 @@ func TestImportRollback(t *testing.T) {
}
}

// TestImportRollbackMultiplePartitions:
// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back
// - Test case rollbackWithoutFailover will only rollback one partition
func TestImportRollbackMultiplePartitions(t *testing.T) {
if !base.IsEnterpriseEdition() {
t.Skip("This test only works against EE")
}

if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing")
}

base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster)
ctx := base.TestCtx(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(ctx)

rt := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})

// create doc id's for vb0 and vb800
vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"}
vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"}

for _, v := range vb0DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.True(t, added)
require.NoError(t, err)
}

// wait for docs to be imported
changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true)
require.NoError(t, err)
lastSeq := changes.Last_Seq.String()

// Close db while we alter checkpoints to force rollback
db := rt.GetDatabase()
checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion)
rt.Close()

metaStore := bucket.GetMetadataStore()
// fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to
// trigger rollback upon stream open request
checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0)
var checkpointData base.ShardedImportDCPMetadata
checkpointBytes, _, err := metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID := checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err := base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// vBucket 800
checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800)
checkpointData = base.ShardedImportDCPMetadata{}
checkpointBytes, _, err = metaStore.GetRaw(checkpointKey)
require.NoError(t, err)
require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData))
checkpointData.SnapStart = 3000 + checkpointData.SnapStart
checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd
checkpointData.SeqStart = 3000 + checkpointData.SeqStart
checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd
existingVbUUID = checkpointData.FailOverLog[0][0]
checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}}

updatedBytes, err = base.JSONMarshal(checkpointData)
require.NoError(t, err)
err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes)
require.NoError(t, err)

// Reopen the db, expect DCP rollback
rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{
CustomTestBucket: bucket.NoCloseClone(),
PersistentConfig: false,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
ImportPartitions: base.Uint16Ptr(2),
},
},
})
defer rt2.Close()

for _, v := range vb0DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}
for _, v := range vb800DocIDs {
err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
}

// Add doc to non rolled back vBucket (392) and assert its imported
added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`)))
require.NoError(t, err)
require.True(t, added)

// wait for doc update to be imported
_, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true)
require.NoError(t, err)
}

func TestImportUpdateExpiry(t *testing.T) {
testCases := []struct {
name string
Expand Down
Loading