diff --git a/db/import_pindex.go b/db/import_pindex.go index 0b154d90b5..911cb1564b 100644 --- a/db/import_pindex.go +++ b/db/import_pindex.go @@ -39,11 +39,28 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) { New: getNewPIndexImplType(ctx), Open: OpenImportPIndexImpl, OpenUsing: getOpenImportPIndexImplUsing(ctx), + Rollback: getPIndexImplRollbackCallback(ctx), Description: "general/syncGateway-import " + " - import processing for shared bucket access", }) } +func getPIndexImplRollbackCallback(ctx context.Context) func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, + restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { + + rollback := func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, + restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { + go mgr.JanitorKick("rollback from PIndexImpl.Rollback") + importDest, err := getListenerImportDest(ctx, indexParams, restart) + if err != nil { + base.InfofCtx(ctx, base.KeyDCP, "No importDest found for indexParams on rollback. sourceParams=%v indexParams=%v %v. Not able to start import feed", err, base.MD(sourceParams), base.MD(indexParams)) + return nil, nil, err + } + return nil, importDest, nil + } + return rollback +} + // getListenerForIndex looks up the importListener for the dbName specified in the index params func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) { diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index b8a9329c62..e0d287619d 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2316,6 +2316,127 @@ func TestImportRollback(t *testing.T) { } } +// TestImportRollbackMultiplePartitions: +// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back +// - Test case rollbackWithoutFailover will only rollback one partition +func TestImportRollbackMultiplePartitions(t *testing.T) { + if !base.IsEnterpriseEdition() { + t.Skip("This test only works against EE") + } + + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing") + } + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster) + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + + // create doc id's for vb0 and vb800 + vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"} + vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"} + + for _, v := range vb0DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + + // wait for docs to be imported + changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + lastSeq := changes.Last_Seq.String() + + // Close db while we alter checkpoints to force rollback + db := rt.GetDatabase() + checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion) + rt.Close() + + metaStore := bucket.GetMetadataStore() + // fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to + // trigger rollback upon stream open request + checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0) + var checkpointData base.ShardedImportDCPMetadata + checkpointBytes, _, err := metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID := checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err := base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // vBucket 800 + checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800) + checkpointData = base.ShardedImportDCPMetadata{} + checkpointBytes, _, err = metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID = checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err = base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // Reopen the db, expect DCP rollback + rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + defer rt2.Close() + + for _, v := range vb0DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + + // Add doc to non rolled back vBucket (392) and assert its imported + added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + require.True(t, added) + + // wait for doc update to be imported + _, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) + require.NoError(t, err) +} + func TestImportUpdateExpiry(t *testing.T) { testCases := []struct { name string