From 84b775cae4ad4c3f8c680b5fcafd778a0cd472d6 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 15 Jan 2025 13:41:05 +0000 Subject: [PATCH 1/5] CBG-4309: add rollback hook for cbgt to call during rollback of pIndex --- base/dcp_common.go | 2 +- base/dcp_sharded.go | 10 +++ rest/importtest/import_test.go | 148 +++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 1 deletion(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index 3c54c041e0..873bf7718a 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -163,7 +163,7 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) ( // rollbackEx is called when a DCP stream issues a rollback. The metadata persisted for a given uuid and sequence number and then cbgt.Mgr JANITOR_ROLLBACK_PINDEX is issued via janitorRollback function. func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte, janitorRollback func()) error { - WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq) + WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %d.", vbucketId, rollbackSeq) c.dbStatsExpvars.Add("dcp_rollback_count", 1) c.updateSeq(vbucketId, rollbackSeq, false) err := c.setMetaData(vbucketId, rollbackMetaData, true) diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index a489718104..732296b033 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -337,6 +337,16 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG sourceUUID: bucketUUID, } + // Define rollback hook to be called by cbgt during rollback. This is used to kick the janitor to reassign feeds + // after a full rollback of a pindex + cbgt.RollbackHook = func(phase cbgt.RollbackPhase, pindex *cbgt.PIndex) (err error) { + if phase == cbgt.RollbackCompleted { + InfofCtx(ctx, KeyDCP, "Kicking janitor after rollback of pIndex: %s", pindex.Name) + err = mgr.JanitorOnce("Adding feeds after full rollback of pindex: " + pindex.Name) + } + return err + } + if spec.Auth != nil || (spec.Certpath != "" && spec.Keypath != "") { username, password, _ := spec.Auth.GetCredentials() addCbgtCredentials(dbName, bucket.GetName(), username, password, spec.Certpath, spec.Keypath) diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index b8a9329c62..77c0f99f66 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2228,6 +2228,33 @@ func TestImportFilterTimeout(t *testing.T) { assert.NoError(t, timeoutErr) } +func TestLol(t *testing.T) { + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + }) + defer rt.Close() + + vbNo, err := base.GetVbucketForKey(bucket, "someKey") + require.NoError(t, err) + fmt.Println("lol", vbNo) + + prevKey := "someKey" + found := false + for !found { + t.Logf("he") + prevKey = prevKey + "1" + vbNo, err := base.GetVbucketForKey(bucket, prevKey) + require.NoError(t, err) + if vbNo == uint32(0) { + found = true + fmt.Println("prev", prevKey) + } + } +} + func TestImportRollback(t *testing.T) { if !base.IsEnterpriseEdition() { @@ -2316,6 +2343,127 @@ func TestImportRollback(t *testing.T) { } } +// TestImportRollbackMultiplePartitions: +// - Test is much like TestImportRollback, but with multiple partitions and multiple vBuckets rolling back +// - Test case rollbackWithoutFailover will only rollback one partition +func TestImportRollbackMultiplePartitions(t *testing.T) { + if !base.IsEnterpriseEdition() { + t.Skip("This test only works against EE") + } + + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server - needs cbgt and import checkpointing") + } + + base.SetUpTestLogging(t, base.LevelDebug, base.KeyImport, base.KeyDCP, base.KeyCluster) + ctx := base.TestCtx(t) + bucket := base.GetTestBucket(t) + defer bucket.Close(ctx) + + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + + // create doc id's for vb0 and vb800 + vb0DocIDs := []string{"abbacomes", "abdicate", "accrescent", "aconitum", "acrux", "adduction", "affrication", "algraphy", "allantoinuria", "altiloquent"} + vb800DocIDs := []string{"abrook", "accept", "accompaniment", "acoemeti", "adiposeness", "alkyd", "alnage", "ambulance", "anasazi", "anhydroxime"} + + for _, v := range vb0DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + added, err := rt.GetSingleDataStore().AddRaw(v, 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.True(t, added) + require.NoError(t, err) + } + + // wait for docs to be imported + changes, err := rt.WaitForChanges(20, "/{{.keyspace}}/_changes?since=0", "", true) + require.NoError(t, err) + lastSeq := changes.Last_Seq.String() + + // Close db while we alter checkpoints to force rollback + db := rt.GetDatabase() + checkpointPrefix := rt.GetDatabase().MetadataKeys.DCPVersionedCheckpointPrefix(db.Options.GroupID, db.Options.ImportVersion) + rt.Close() + + metaStore := bucket.GetMetadataStore() + // fetch the checkpoint for the vBucket 0 and 800, modify the checkpoint values to a higher sequence to + // trigger rollback upon stream open request + checkpointKey := fmt.Sprintf("%s%d", checkpointPrefix, 0) + var checkpointData base.ShardedImportDCPMetadata + checkpointBytes, _, err := metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID := checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err := base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // vBucket 800 + checkpointKey = fmt.Sprintf("%s%d", checkpointPrefix, 800) + checkpointData = base.ShardedImportDCPMetadata{} + checkpointBytes, _, err = metaStore.GetRaw(checkpointKey) + require.NoError(t, err) + require.NoError(t, base.JSONUnmarshal(checkpointBytes, &checkpointData)) + checkpointData.SnapStart = 3000 + checkpointData.SnapStart + checkpointData.SnapEnd = 3000 + checkpointData.SnapEnd + checkpointData.SeqStart = 3000 + checkpointData.SeqStart + checkpointData.SeqEnd = 3000 + checkpointData.SeqEnd + existingVbUUID = checkpointData.FailOverLog[0][0] + checkpointData.FailOverLog = [][]uint64{{existingVbUUID + 1, 0}} + + updatedBytes, err = base.JSONMarshal(checkpointData) + require.NoError(t, err) + err = metaStore.SetRaw(checkpointKey, 0, nil, updatedBytes) + require.NoError(t, err) + + // Reopen the db, expect DCP rollback + rt2 := rest.NewRestTester(t, &rest.RestTesterConfig{ + CustomTestBucket: bucket.NoCloseClone(), + PersistentConfig: false, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + ImportPartitions: base.Uint16Ptr(2), + }, + }, + }) + defer rt2.Close() + + for _, v := range vb0DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + for _, v := range vb800DocIDs { + err := rt2.GetSingleDataStore().SetRaw(v, 0, nil, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + } + + // Add doc to non rolled back vBucket (392) and assert its imported + added, err := rt2.GetSingleDataStore().AddRaw("someKey", 0, []byte(fmt.Sprintf(`{"star": "6"}`))) + require.NoError(t, err) + require.True(t, added) + + // wait for doc update to be imported + _, err = rt2.WaitForChanges(21, "/{{.keyspace}}/_changes?since="+lastSeq, "", true) + require.NoError(t, err) +} + func TestImportUpdateExpiry(t *testing.T) { testCases := []struct { name string From 9b27131aecf5985f55e4eeaf38efeea93ab08947 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 15 Jan 2025 13:42:52 +0000 Subject: [PATCH 2/5] remove temp test --- rest/importtest/import_test.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index 77c0f99f66..e0d287619d 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2228,33 +2228,6 @@ func TestImportFilterTimeout(t *testing.T) { assert.NoError(t, timeoutErr) } -func TestLol(t *testing.T) { - ctx := base.TestCtx(t) - bucket := base.GetTestBucket(t) - defer bucket.Close(ctx) - rt := rest.NewRestTester(t, &rest.RestTesterConfig{ - CustomTestBucket: bucket.NoCloseClone(), - }) - defer rt.Close() - - vbNo, err := base.GetVbucketForKey(bucket, "someKey") - require.NoError(t, err) - fmt.Println("lol", vbNo) - - prevKey := "someKey" - found := false - for !found { - t.Logf("he") - prevKey = prevKey + "1" - vbNo, err := base.GetVbucketForKey(bucket, prevKey) - require.NoError(t, err) - if vbNo == uint32(0) { - found = true - fmt.Println("prev", prevKey) - } - } -} - func TestImportRollback(t *testing.T) { if !base.IsEnterpriseEdition() { From 8eb9413bb607a6a1bb13b6be52d3563fa7d9ed89 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 15 Jan 2025 13:52:54 +0000 Subject: [PATCH 3/5] update cbgt to include rollback hook --- go.mod | 6 +++--- go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 723f4c8a79..5e4e0392d7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( dario.cat/mergo v1.0.0 github.com/KimMachineGun/automemlimit v0.7.0 github.com/coreos/go-oidc/v3 v3.12.0 - github.com/couchbase/cbgt v1.3.9 + github.com/couchbase/cbgt v1.4.0 github.com/couchbase/clog v0.1.0 github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd github.com/couchbase/gocb/v2 v2.9.1 @@ -45,8 +45,8 @@ require ( github.com/aws/aws-sdk-go v1.44.299 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/couchbase/blance v0.1.5 // indirect - github.com/couchbase/cbauth v0.1.11 // indirect + github.com/couchbase/blance v0.1.6 // indirect + github.com/couchbase/cbauth v0.1.12 // indirect github.com/couchbase/go-couchbase v0.1.1 // indirect github.com/couchbase/gocbcoreps v0.1.3 // indirect github.com/couchbase/goprotostellar v1.0.2 // indirect diff --git a/go.sum b/go.sum index 46a3ab3770..5b673d334d 100644 --- a/go.sum +++ b/go.sum @@ -30,12 +30,12 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo= github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= -github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79w= -github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= -github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg= -github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= -github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE= -github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= +github.com/couchbase/blance v0.1.6 h1:zyNew/SN2AheIoJxQ2LqqA1u3bMp03eGCer6hSDMUDs= +github.com/couchbase/blance v0.1.6/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= +github.com/couchbase/cbauth v0.1.12 h1:JOAWjjp2BdubvrrggvN4yQo3oEc2ndXcRN1ONCklUOM= +github.com/couchbase/cbauth v0.1.12/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= +github.com/couchbase/cbgt v1.4.0 h1:ZkRAWCrY/Ax63//8Sd7aozR2g05btECUbqwf8QSXGnU= +github.com/couchbase/cbgt v1.4.0/go.mod h1:QR8XIUzSm2cFviBkdBCdpa87M2oe5yMVIzvsJGm/BUI= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY= From f673f65f952cc2fea2c250870788fa33bfa7c462 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 27 Jan 2025 10:59:54 -0500 Subject: [PATCH 4/5] CBG-4309 add Rollback to to PIndexImplType --- base/dcp_common.go | 2 +- base/dcp_sharded.go | 10 ---------- db/import_pindex.go | 17 +++++++++++++++++ go.mod | 6 +++--- go.sum | 12 ++++++------ 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/base/dcp_common.go b/base/dcp_common.go index 873bf7718a..3c54c041e0 100644 --- a/base/dcp_common.go +++ b/base/dcp_common.go @@ -163,7 +163,7 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) ( // rollbackEx is called when a DCP stream issues a rollback. The metadata persisted for a given uuid and sequence number and then cbgt.Mgr JANITOR_ROLLBACK_PINDEX is issued via janitorRollback function. func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte, janitorRollback func()) error { - WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %d.", vbucketId, rollbackSeq) + WarnfCtx(c.loggingCtx, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq) c.dbStatsExpvars.Add("dcp_rollback_count", 1) c.updateSeq(vbucketId, rollbackSeq, false) err := c.setMetaData(vbucketId, rollbackMetaData, true) diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 732296b033..a489718104 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -337,16 +337,6 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG sourceUUID: bucketUUID, } - // Define rollback hook to be called by cbgt during rollback. This is used to kick the janitor to reassign feeds - // after a full rollback of a pindex - cbgt.RollbackHook = func(phase cbgt.RollbackPhase, pindex *cbgt.PIndex) (err error) { - if phase == cbgt.RollbackCompleted { - InfofCtx(ctx, KeyDCP, "Kicking janitor after rollback of pIndex: %s", pindex.Name) - err = mgr.JanitorOnce("Adding feeds after full rollback of pindex: " + pindex.Name) - } - return err - } - if spec.Auth != nil || (spec.Certpath != "" && spec.Keypath != "") { username, password, _ := spec.Auth.GetCredentials() addCbgtCredentials(dbName, bucket.GetName(), username, password, spec.Certpath, spec.Keypath) diff --git a/db/import_pindex.go b/db/import_pindex.go index 0b154d90b5..911cb1564b 100644 --- a/db/import_pindex.go +++ b/db/import_pindex.go @@ -39,11 +39,28 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) { New: getNewPIndexImplType(ctx), Open: OpenImportPIndexImpl, OpenUsing: getOpenImportPIndexImplUsing(ctx), + Rollback: getPIndexImplRollbackCallback(ctx), Description: "general/syncGateway-import " + " - import processing for shared bucket access", }) } +func getPIndexImplRollbackCallback(ctx context.Context) func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, + restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { + + rollback := func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, + restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { + go mgr.JanitorKick("rollback from PIndexImpl.Rollback") + importDest, err := getListenerImportDest(ctx, indexParams, restart) + if err != nil { + base.InfofCtx(ctx, base.KeyDCP, "No importDest found for indexParams on rollback. sourceParams=%v indexParams=%v %v. Not able to start import feed", err, base.MD(sourceParams), base.MD(indexParams)) + return nil, nil, err + } + return nil, importDest, nil + } + return rollback +} + // getListenerForIndex looks up the importListener for the dbName specified in the index params func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) { diff --git a/go.mod b/go.mod index 5e4e0392d7..723f4c8a79 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( dario.cat/mergo v1.0.0 github.com/KimMachineGun/automemlimit v0.7.0 github.com/coreos/go-oidc/v3 v3.12.0 - github.com/couchbase/cbgt v1.4.0 + github.com/couchbase/cbgt v1.3.9 github.com/couchbase/clog v0.1.0 github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd github.com/couchbase/gocb/v2 v2.9.1 @@ -45,8 +45,8 @@ require ( github.com/aws/aws-sdk-go v1.44.299 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/couchbase/blance v0.1.6 // indirect - github.com/couchbase/cbauth v0.1.12 // indirect + github.com/couchbase/blance v0.1.5 // indirect + github.com/couchbase/cbauth v0.1.11 // indirect github.com/couchbase/go-couchbase v0.1.1 // indirect github.com/couchbase/gocbcoreps v0.1.3 // indirect github.com/couchbase/goprotostellar v1.0.2 // indirect diff --git a/go.sum b/go.sum index 5b673d334d..46a3ab3770 100644 --- a/go.sum +++ b/go.sum @@ -30,12 +30,12 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo= github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= -github.com/couchbase/blance v0.1.6 h1:zyNew/SN2AheIoJxQ2LqqA1u3bMp03eGCer6hSDMUDs= -github.com/couchbase/blance v0.1.6/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= -github.com/couchbase/cbauth v0.1.12 h1:JOAWjjp2BdubvrrggvN4yQo3oEc2ndXcRN1ONCklUOM= -github.com/couchbase/cbauth v0.1.12/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= -github.com/couchbase/cbgt v1.4.0 h1:ZkRAWCrY/Ax63//8Sd7aozR2g05btECUbqwf8QSXGnU= -github.com/couchbase/cbgt v1.4.0/go.mod h1:QR8XIUzSm2cFviBkdBCdpa87M2oe5yMVIzvsJGm/BUI= +github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79w= +github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= +github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg= +github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= +github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE= +github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY= From 6e3889ca024d79777814d62f70e02e5c8ee61d5d Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 28 Jan 2025 15:51:01 -0500 Subject: [PATCH 5/5] Update cbgt fix to trigger janitor kick --- db/import_pindex.go | 17 ----------------- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/db/import_pindex.go b/db/import_pindex.go index 911cb1564b..0b154d90b5 100644 --- a/db/import_pindex.go +++ b/db/import_pindex.go @@ -39,28 +39,11 @@ func RegisterImportPindexImpl(ctx context.Context, configGroup string) { New: getNewPIndexImplType(ctx), Open: OpenImportPIndexImpl, OpenUsing: getOpenImportPIndexImplUsing(ctx), - Rollback: getPIndexImplRollbackCallback(ctx), Description: "general/syncGateway-import " + " - import processing for shared bucket access", }) } -func getPIndexImplRollbackCallback(ctx context.Context) func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, - restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { - - rollback := func(indexType, indexParams, sourceParams, path string, mgr *cbgt.Manager, - restart func()) (cbgt.PIndexImpl, cbgt.Dest, error) { - go mgr.JanitorKick("rollback from PIndexImpl.Rollback") - importDest, err := getListenerImportDest(ctx, indexParams, restart) - if err != nil { - base.InfofCtx(ctx, base.KeyDCP, "No importDest found for indexParams on rollback. sourceParams=%v indexParams=%v %v. Not able to start import feed", err, base.MD(sourceParams), base.MD(indexParams)) - return nil, nil, err - } - return nil, importDest, nil - } - return rollback -} - // getListenerForIndex looks up the importListener for the dbName specified in the index params func getListenerImportDest(ctx context.Context, indexParams string, restart func()) (cbgt.Dest, error) { diff --git a/go.mod b/go.mod index 723f4c8a79..0487064585 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( dario.cat/mergo v1.0.0 github.com/KimMachineGun/automemlimit v0.7.0 github.com/coreos/go-oidc/v3 v3.12.0 - github.com/couchbase/cbgt v1.3.9 + github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d github.com/couchbase/clog v0.1.0 github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd github.com/couchbase/gocb/v2 v2.9.1 diff --git a/go.sum b/go.sum index 46a3ab3770..1c26d492aa 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/couchbase/blance v0.1.5 h1:kNSAwhb8FXSJpicJ8R8Kk7+0V1+MyTcY1MOHIDbU79 github.com/couchbase/blance v0.1.5/go.mod h1:2Sa/nsJSieN/r3T9LsrUYWeQ015qDsuHybhz4F4JcHU= github.com/couchbase/cbauth v0.1.11 h1:LLyGiVnsKxyHp9wbOQk87oF9eDUSh1in2vh/l6vaezg= github.com/couchbase/cbauth v0.1.11/go.mod h1:W7zkNXa0B2cTDg90YmmuTSbu+PlYOvMqzQvmNlNH/Mg= -github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE= -github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= +github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq513eSm+k+Vih+eSMHZKjQJ5JawMuRs= +github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE= github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY= github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4= github.com/couchbase/go-blip v0.0.0-20241014144256-13a798c348fd h1:ERQXaXuX1eix3NUqrxQ5VY0hqHH90vcfrWdbEWKzlEY=