From af6b82d7ec88e81a537c4f0a86a946c177c755d4 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 10 Nov 2025 08:50:31 -0500 Subject: [PATCH] Create an abstract DCPClient to be able to work with rosmar --- base/abstract_dcp_client.go | 56 ++++++++++++++ base/dcp_client.go | 73 ++++++++++--------- base/dcp_client_stream_observer.go | 28 +++---- base/dcp_client_test.go | 54 +++++--------- base/gocb_dcp_feed.go | 26 ++++--- base/rosmar_dcp_client.go | 60 +++++++++++++++ db/attachment_compaction.go | 58 ++++++--------- db/attachment_compaction_test.go | 12 +-- db/background_mgr_attachment_migration.go | 44 ++++++----- ...ackground_mgr_attachment_migration_test.go | 13 ---- db/background_mgr_resync_dcp.go | 23 +++--- db/background_mgr_resync_dcp_test.go | 28 ------- db/util_testing.go | 45 +++++------- tools/cache_perf_tool/dcpDataGeneration.go | 6 +- 14 files changed, 284 insertions(+), 242 deletions(-) create mode 100644 base/abstract_dcp_client.go create mode 100644 base/rosmar_dcp_client.go diff --git a/base/abstract_dcp_client.go b/base/abstract_dcp_client.go new file mode 100644 index 0000000000..cafbf7039b --- /dev/null +++ b/base/abstract_dcp_client.go @@ -0,0 +1,56 @@ +package base + +import ( + "context" + "expvar" + "fmt" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbaselabs/rosmar" +) + +type DCPClient interface { + Start(ctx context.Context) (chan error, error) + Close() error + GetMetadata() []DCPMetadata + GetMetadataKeyPrefix() string +} + +type DCPCollections map[string][]string + +type DCPClientOptions struct { + ID string // name of the DCP feed, used for logging locally and stored by Couchbase Server + OneShot bool // if true, the feed runs to latest document found when the client is started + FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition + MetadataStoreType DCPMetadataStoreType // persistent or in memory storage + GroupID string // name of groupID of rest.ServerContext in order to isolate DCP checkpoints + CheckpointPrefix string // start of the checkpoint documents + Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events + DBStats *expvar.Map + Scopes map[string][]string // scopes and collections to monitor + InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with +} + +func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) { + underlyingBucket := GetBaseBucket(bucket) + if _, ok := underlyingBucket.(*rosmar.Bucket); ok { + return NewRosmarDCPClient(bucket, opts) + } else if gocbBucket, ok := underlyingBucket.(*GocbV2Bucket); ok { + feedArgs := sgbucket.FeedArguments{ + ID: opts.ID, + CheckpointPrefix: opts.CheckpointPrefix, + Scopes: opts.Scopes, + } + return newGocbDCPClient(ctx, gocbBucket, gocbBucket.GetName(), feedArgs, opts.Callback, opts.DBStats, opts.MetadataStoreType, opts.GroupID) + } + return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket) +} + +func (c DCPCollections) Add(ds ...sgbucket.DataStoreName) { + for _, d := range ds { + if _, ok := c[d.ScopeName()]; !ok { + c[d.ScopeName()] = []string{} + } + c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName()) + } +} diff --git a/base/dcp_client.go b/base/dcp_client.go index 029f40e391..93225ed34c 100644 --- a/base/dcp_client.go +++ b/base/dcp_client.go @@ -40,17 +40,17 @@ type endStreamCallbackFunc func(e endStreamEvent) var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set") -type DCPClient struct { +type GoCBDCPClient struct { ctx context.Context - ID string // unique ID for DCPClient - used for DCP stream name, must be unique - agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to DCPClient stream observer implementation + ID string // unique ID for GoCBDCPClient - used for DCP stream name, must be unique + agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to GoCBDCPClient stream observer implementation callback sgbucket.FeedEventCallbackFunc // Callback invoked on DCP mutations/deletions workers []*DCPWorker // Workers for concurrent processing of incoming mutations and callback. vbuckets are partitioned across workers - workersWg sync.WaitGroup // Active workers WG - used for signaling when the DCPClient workers have all stopped so the doneChannel can be closed + workersWg sync.WaitGroup // Active workers WG - used for signaling when the GoCBDCPClient workers have all stopped so the doneChannel can be closed spec BucketSpec // Bucket spec for the target data store supportsCollections bool // Whether the target data store supports collections numVbuckets uint16 // number of vbuckets on target data store - terminator chan bool // Used to close worker goroutines spawned by the DCPClient + terminator chan bool // Used to close worker goroutines spawned by the GoCBDCPClient doneChannel chan error // Returns nil on successful completion of one-shot feed or external close of feed, error otherwise metadata DCPMetadataStore // Implementation of DCPMetadataStore for metadata persistence activeVbuckets map[uint16]struct{} // vbuckets that have an open stream @@ -67,7 +67,7 @@ type DCPClient struct { collectionIDs []uint32 // collectionIDs used by gocbcore, if empty, uses default collections } -type DCPClientOptions struct { +type GoCBDCPClientOptions struct { NumWorkers int OneShot bool FailOnRollback bool // When true, the DCP client will terminate on DCP rollback @@ -81,7 +81,7 @@ type DCPClientOptions struct { CheckpointPrefix string } -func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) { +func NewGocbDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) { numVbuckets, err := bucket.GetMaxVbno() if err != nil { @@ -91,7 +91,7 @@ func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCal return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets) } -func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) { +func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) { numWorkers := DefaultNumWorkers if options.NumWorkers > 0 { @@ -106,7 +106,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata") } } - client := &DCPClient{ + client := &GoCBDCPClient{ ctx: ctx, workers: make([]*DCPWorker, numWorkers), numVbuckets: numVbuckets, @@ -155,7 +155,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke } // getCollectionHighSeqNo returns the highSeqNo for a given KV collection ID. -func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) { +func (dc *GoCBDCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) { vbucketSeqnoOptions := gocbcore.GetVbucketSeqnoOptions{} if dc.supportsCollections { vbucketSeqnoOptions.FilterOptions = &gocbcore.GetVbucketSeqnoFilterOptions{CollectionID: collectionID} @@ -213,7 +213,7 @@ func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, err } // getHighSeqNos returns the maximum sequence number for every collection configured by the DCP agent. -func (dc *DCPClient) getHighSeqNos() ([]uint64, error) { +func (dc *GoCBDCPClient) getHighSeqNos() ([]uint64, error) { highSeqNos := make([]uint64, dc.numVbuckets) // Initialize highSeqNo to the current metadata's StartSeqNo - we don't want to use a value lower than what // we've already processed @@ -235,7 +235,7 @@ func (dc *DCPClient) getHighSeqNos() ([]uint64, error) { } // configureOneShot sets highSeqnos for a one shot feed. -func (dc *DCPClient) configureOneShot() error { +func (dc *GoCBDCPClient) configureOneShot() error { highSeqNos, err := dc.getHighSeqNos() if err != nil { return err @@ -250,8 +250,9 @@ func (dc *DCPClient) configureOneShot() error { return nil } -// Start returns an error and a channel to indicate when the DCPClient is done. If Start returns an error, DCPClient.Close() needs to be called. -func (dc *DCPClient) Start() (doneChan chan error, err error) { +// Start returns an error and a channel to indicate when the GoCBDCPClient is done. If Start returns an error, GoCBDCPClient.Close() needs to be called. +func (dc *GoCBDCPClient) Start(ctx context.Context) (doneChan chan error, err error) { + // FIXME: set context here err = dc.initAgent(dc.spec) if err != nil { return dc.doneChannel, err @@ -274,13 +275,13 @@ func (dc *DCPClient) Start() (doneChan chan error, err error) { } // Close is used externally to stop the DCP client. If the client was already closed due to error, returns that error -func (dc *DCPClient) Close() error { +func (dc *GoCBDCPClient) Close() error { dc.close() return dc.getCloseError() } // GetMetadata returns metadata for all vbuckets -func (dc *DCPClient) GetMetadata() []DCPMetadata { +func (dc *GoCBDCPClient) GetMetadata() []DCPMetadata { metadata := make([]DCPMetadata, dc.numVbuckets) for i := uint16(0); i < dc.numVbuckets; i++ { metadata[i] = dc.metadata.GetMeta(i) @@ -290,7 +291,7 @@ func (dc *DCPClient) GetMetadata() []DCPMetadata { // close is used internally to stop the DCP client. Sends any fatal errors to the client's done channel, and // closes that channel. -func (dc *DCPClient) close() { +func (dc *GoCBDCPClient) close() { // set dc.closing to true, avoid re-triggering close if it's already in progress if !dc.closing.CompareAndSwap(false, true) { @@ -316,7 +317,7 @@ func (dc *DCPClient) close() { } // getAgentConfig returns a gocbcore.DCPAgentConfig for the given BucketSpec -func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) { +func (dc *GoCBDCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) { connStr, err := spec.GetGoCBConnStringForDCP() if err != nil { return nil, err @@ -360,7 +361,7 @@ func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, } // initAgent creates a DCP agent and waits for it to be ready -func (dc *DCPClient) initAgent(spec BucketSpec) error { +func (dc *GoCBDCPClient) initAgent(spec BucketSpec) error { agentConfig, err := dc.getAgentConfig(spec) if err != nil { return err @@ -405,13 +406,13 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error { return nil } -func (dc *DCPClient) workerForVbno(vbNo uint16) *DCPWorker { +func (dc *GoCBDCPClient) workerForVbno(vbNo uint16) *DCPWorker { workerIndex := int(vbNo % uint16(len(dc.workers))) return dc.workers[workerIndex] } // startWorkers initializes the DCP workers to receive stream events from eventFeed -func (dc *DCPClient) startWorkers(ctx context.Context) { +func (dc *GoCBDCPClient) startWorkers(ctx context.Context) { // vbuckets are assigned to workers as vbNo % NumWorkers. Create set of assigned vbuckets assignedVbs := make(map[int][]uint16) @@ -434,7 +435,7 @@ func (dc *DCPClient) startWorkers(ctx context.Context) { } } -func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error { +func (dc *GoCBDCPClient) openStream(vbID uint16, maxRetries uint32) error { var openStreamErr error var attempts uint32 @@ -488,7 +489,7 @@ func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error { return fmt.Errorf("openStream failed to complete after %d attempts, last error: %w", attempts, openStreamErr) } -func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) { +func (dc *GoCBDCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) { if dc.dbStats != nil { dc.dbStats.Add("dcp_rollback_count", 1) } @@ -497,7 +498,7 @@ func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.S // openStreamRequest issues the OpenStream request, but doesn't perform any error handling. Callers // should generally use openStream() for error and retry handling -func (dc *DCPClient) openStreamRequest(vbID uint16) error { +func (dc *GoCBDCPClient) openStreamRequest(vbID uint16) error { vbMeta := dc.metadata.GetMeta(vbID) @@ -548,7 +549,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error { // verifyFailoverLog checks for VbUUID changes when failOnRollback is set, and // writes the failover log to the client metadata store. If previous VbUUID is zero, it's // not considered a rollback - it's not required to initialize vbUUIDs into meta. -func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error { +func (dc *GoCBDCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error { if dc.failOnRollback { previousMeta := dc.metadata.GetMeta(vbID) @@ -566,7 +567,7 @@ func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) return nil } -func (dc *DCPClient) deactivateVbucket(vbID uint16) { +func (dc *GoCBDCPClient) deactivateVbucket(vbID uint16) { dc.activeVbucketLock.Lock() delete(dc.activeVbuckets, vbID) activeCount := len(dc.activeVbuckets) @@ -580,7 +581,7 @@ func (dc *DCPClient) deactivateVbucket(vbID uint16) { } } -func (dc *DCPClient) onStreamEnd(e endStreamEvent) { +func (dc *GoCBDCPClient) onStreamEnd(e endStreamEvent) { if e.err == nil { DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed, all items streamed", e.vbID) dc.deactivateVbucket(e.vbID) @@ -588,8 +589,8 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) { } if errors.Is(e.err, gocbcore.ErrDCPStreamClosed) { - DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by DCPClient", e.vbID) - dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by DCPClient", e.vbID)) + DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by GoCBDCPClient", e.vbID) + dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by GoCBDCPClient", e.vbID)) return } @@ -616,15 +617,15 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) { }(e.vbID, retries) } -func (dc *DCPClient) fatalError(err error) { +func (dc *GoCBDCPClient) fatalError(err error) { dc.setCloseError(err) dc.close() } -func (dc *DCPClient) setCloseError(err error) { +func (dc *GoCBDCPClient) setCloseError(err error) { dc.closeErrorLock.Lock() defer dc.closeErrorLock.Unlock() - // If the DCPClient is already closing, don't update the error. If an initial error triggered the close, + // If the GoCBDCPClient is already closing, don't update the error. If an initial error triggered the close, // then closeError will already be set. In the event of a requested close, we want to ignore EOF errors associated // with stream close if dc.closing.IsTrue() { @@ -635,7 +636,7 @@ func (dc *DCPClient) setCloseError(err error) { } } -func (dc *DCPClient) getCloseError() error { +func (dc *GoCBDCPClient) getCloseError() error { dc.closeErrorLock.Lock() defer dc.closeErrorLock.Unlock() return dc.closeError @@ -661,16 +662,16 @@ func getLatestVbUUID(failoverLog []gocbcore.FailoverEntry) (vbUUID gocbcore.VbUU return entry.VbUUID } -func (dc *DCPClient) GetMetadataKeyPrefix() string { +func (dc *GoCBDCPClient) GetMetadataKeyPrefix() string { return dc.metadata.GetKeyPrefix() } // StartWorkersForTest will iterate through dcp workers to start them, to be used for caching testing purposes only. -func (dc *DCPClient) StartWorkersForTest(t *testing.T) { +func (dc *GoCBDCPClient) StartWorkersForTest(t *testing.T) { dc.startWorkers(dc.ctx) } // NewDCPClientForTest is a test-only function to create a DCP client with a specific number of vbuckets. -func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) { +func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) { return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets) } diff --git a/base/dcp_client_stream_observer.go b/base/dcp_client_stream_observer.go index da33c5daf1..3266046c76 100644 --- a/base/dcp_client_stream_observer.go +++ b/base/dcp_client_stream_observer.go @@ -16,7 +16,7 @@ import ( // to the DCPClient's workers to be processed, but performs the following additional functionality: // - key-based filtering for document-based events (Deletion, Expiration, Mutation) // - stream End handling, including restart on error -func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { +func (dc *GoCBDCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { e := snapshotEvent{ streamEventCommon: streamEventCommon{ @@ -30,7 +30,7 @@ func (dc *DCPClient) SnapshotMarker(snapshotMarker gocbcore.DcpSnapshotMarker) { dc.workerForVbno(snapshotMarker.VbID).Send(dc.ctx, e) } -func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) { +func (dc *GoCBDCPClient) Mutation(mutation gocbcore.DcpMutation) { if dc.filteredKey(mutation.Key) { return @@ -56,7 +56,7 @@ func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) { dc.workerForVbno(mutation.VbID).Send(dc.ctx, e) } -func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) { +func (dc *GoCBDCPClient) Deletion(deletion gocbcore.DcpDeletion) { if dc.filteredKey(deletion.Key) { return @@ -80,7 +80,7 @@ func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) { } -func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) { +func (dc *GoCBDCPClient) End(end gocbcore.DcpStreamEnd, err error) { e := endStreamEvent{ streamEventCommon: streamEventCommon{ @@ -92,41 +92,41 @@ func (dc *DCPClient) End(end gocbcore.DcpStreamEnd, err error) { } -func (dc *DCPClient) Expiration(expiration gocbcore.DcpExpiration) { +func (dc *GoCBDCPClient) Expiration(expiration gocbcore.DcpExpiration) { // SG doesn't opt in to expirations, so they'll come through as deletion events // (cf.https://github.com/couchbase/kv_engine/blob/master/docs/dcp/documentation/expiry-opcode-output.md) WarnfCtx(dc.ctx, "Unexpected DCP expiration event (vb:%d) for key %v", expiration.VbID, UD(string(expiration.Key))) } -func (dc *DCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) { +func (dc *GoCBDCPClient) CreateCollection(creation gocbcore.DcpCollectionCreation) { // Not used by SG at this time } -func (dc *DCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) { +func (dc *GoCBDCPClient) DeleteCollection(deletion gocbcore.DcpCollectionDeletion) { // Not used by SG at this time } -func (dc *DCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) { +func (dc *GoCBDCPClient) FlushCollection(flush gocbcore.DcpCollectionFlush) { // Not used by SG at this time } -func (dc *DCPClient) CreateScope(creation gocbcore.DcpScopeCreation) { +func (dc *GoCBDCPClient) CreateScope(creation gocbcore.DcpScopeCreation) { // Not used by SG at this time } -func (dc *DCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) { +func (dc *GoCBDCPClient) DeleteScope(deletion gocbcore.DcpScopeDeletion) { // Not used by SG at this time } -func (dc *DCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) { +func (dc *GoCBDCPClient) ModifyCollection(modification gocbcore.DcpCollectionModification) { // Not used by SG at this time } -func (dc *DCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) { +func (dc *GoCBDCPClient) OSOSnapshot(snapshot gocbcore.DcpOSOSnapshot) { // Not used by SG at this time } -func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { +func (dc *GoCBDCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { dc.workerForVbno(seqNoAdvanced.VbID).Send(dc.ctx, seqnoAdvancedEvent{ streamEventCommon: streamEventCommon{ vbID: seqNoAdvanced.VbID, @@ -136,6 +136,6 @@ func (dc *DCPClient) SeqNoAdvanced(seqNoAdvanced gocbcore.DcpSeqNoAdvanced) { }) } -func (dc *DCPClient) filteredKey(key []byte) bool { +func (dc *GoCBDCPClient) filteredKey(key []byte) bool { return false } diff --git a/base/dcp_client_test.go b/base/dcp_client_test.go index 245bb97053..96bf44f96f 100644 --- a/base/dcp_client_test.go +++ b/base/dcp_client_test.go @@ -9,7 +9,6 @@ package base import ( - "bytes" "fmt" "log" "sync" @@ -17,7 +16,6 @@ import ( "testing" "time" - "github.com/couchbase/gocbcore/v10" sgbucket "github.com/couchbase/sg-bucket" "github.com/stretchr/testify/assert" @@ -28,10 +26,6 @@ const oneShotDCPTimeout = 5 * time.Minute func TestOneShotDCP(t *testing.T) { - if UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } - ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -54,28 +48,20 @@ func TestOneShotDCP(t *testing.T) { return false } - // start one shot feed - feedID := t.Name() - - collection, err := AsCollection(dataStore) - require.NoError(t, err) - var collectionIDs []uint32 - if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) { - collectionIDs = append(collectionIDs, collection.GetCollectionID()) - } - - clientOptions := DCPClientOptions{ + dcpOptions := DCPClientOptions{ + ID: t.Name(), + Scopes: map[string][]string{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, OneShot: true, - CollectionIDs: collectionIDs, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, clientOptions, gocbv2Bucket) + dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) require.NoError(t, err) - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) defer func() { @@ -112,10 +98,6 @@ func TestOneShotDCP(t *testing.T) { func TestTerminateDCPFeed(t *testing.T) { - if UnitTestUrlIsWalrus() { - t.Skip("This test only works against Couchbase Server") - } - ctx := TestCtx(t) bucket := GetTestBucket(t) defer bucket.Close(ctx) @@ -129,15 +111,17 @@ func TestTerminateDCPFeed(t *testing.T) { return false } - // start continuous feed with terminator - feedID := t.Name() - - gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket) - require.NoError(t, err) - options := DCPClientOptions{ + dcpOptions := DCPClientOptions{ + ID: t.Name(), + Scopes: map[string][]string{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, + OneShot: false, CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()), + Callback: counterCallback, } - dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, options, gocbv2Bucket) + + dcpClient, err := NewDCPClient(ctx, bucket, dcpOptions) require.NoError(t, err) // Add documents in a separate goroutine @@ -157,7 +141,7 @@ func TestTerminateDCPFeed(t *testing.T) { } }() - doneChan, startErr := dcpClient.Start() + doneChan, startErr := dcpClient.Start(ctx) require.NoError(t, startErr) // Wait for some processing to complete, then close the feed @@ -181,6 +165,7 @@ func TestTerminateDCPFeed(t *testing.T) { log.Printf("additionalDocs wait completed") } +/* // TestDCPClientMultiFeedConsistency tests for DCP rollback between execution of two DCP feeds, based on // changes in the VbUUID func TestDCPClientMultiFeedConsistency(t *testing.T) { @@ -824,3 +809,4 @@ func TestDCPClientAgentConfig(t *testing.T) { }) } } +*/ diff --git a/base/gocb_dcp_feed.go b/base/gocb_dcp_feed.go index 60b26e7a4b..14e4ede878 100644 --- a/base/gocb_dcp_feed.go +++ b/base/gocb_dcp_feed.go @@ -47,19 +47,17 @@ func getHighSeqMetadata(cbstore CouchbaseBucketStore) ([]DCPMetadata, error) { return metadata, nil } -// StartGocbDCPFeed starts a DCP Feed. -func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) error { - +func newGocbDCPClient(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) (*GoCBDCPClient, error) { feedName, err := GenerateDcpStreamName(args.ID) if err != nil { - return err + return nil, err } var collectionIDs []uint32 if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) { cm, err := bucket.GetCollectionManifest() if err != nil { - return err + return nil, err } // should only be one args.Scope so cheaper to iterate this way around @@ -84,18 +82,18 @@ func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName stri if len(collectionsFound) != len(collections) { for _, collectionName := range collections { if _, ok := collectionsFound[collectionName]; !ok { - return RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections) + return nil, RedactErrorf("collection %s not found in scope %s %+v", MD(collectionName), MD(manifestScope.Name), manifestScope.Collections) } } } break } if !scopeFound { - return RedactErrorf("scope %s not found", MD(scopeName)) + return nil, RedactErrorf("scope %s not found", MD(scopeName)) } } } - options := DCPClientOptions{ + options := GoCBDCPClientOptions{ MetadataStoreType: metadataStoreType, GroupID: groupID, DbStats: dbStats, @@ -107,22 +105,28 @@ func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName stri if args.Backfill == sgbucket.FeedNoBackfill { metadata, err := getHighSeqMetadata(bucket) if err != nil { - return err + return nil, err } options.InitialMetadata = metadata } - dcpClient, err := NewDCPClient( + return NewGocbDCPClient( ctx, feedName, callback, options, bucket) +} + +// StartGocbDCPFeed starts a DCP Feed. +func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName string, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, metadataStoreType DCPMetadataStoreType, groupID string) error { + dcpClient, err := newGocbDCPClient(ctx, bucket, bucketName, args, callback, dbStats, metadataStoreType, groupID) if err != nil { return err } + feedName := dcpClient.ID - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err) // simplify in CBG-2234 diff --git a/base/rosmar_dcp_client.go b/base/rosmar_dcp_client.go new file mode 100644 index 0000000000..5e78900b38 --- /dev/null +++ b/base/rosmar_dcp_client.go @@ -0,0 +1,60 @@ +package base + +import ( + "context" + + sgbucket "github.com/couchbase/sg-bucket" +) + +type RosmarDCPClient struct { + bucket Bucket + opts DCPClientOptions + doneChan chan struct{} + terminator chan bool +} + +func NewRosmarDCPClient(bucket Bucket, opts DCPClientOptions) (*RosmarDCPClient, error) { + return &RosmarDCPClient{ + bucket: bucket, + opts: opts, + }, nil +} + +func (dc *RosmarDCPClient) Start(ctx context.Context) (chan error, error) { + doneChan := make(chan error) + dc.doneChan = make(chan struct{}) + dc.terminator = make(chan bool) + feedArgs := sgbucket.FeedArguments{ + ID: dc.opts.ID, + CheckpointPrefix: dc.opts.CheckpointPrefix, + Dump: dc.opts.OneShot, + DoneChan: dc.doneChan, + Terminator: dc.terminator, + Scopes: dc.opts.Scopes, + } + err := dc.bucket.StartDCPFeed(ctx, feedArgs, dc.opts.Callback, nil) + if err != nil { + return nil, err + } + go func() { + <-feedArgs.DoneChan + close(doneChan) + }() + return doneChan, nil +} + +func (dc *RosmarDCPClient) Close() error { + close(dc.terminator) + <-dc.doneChan + return nil +} + +func (dc *RosmarDCPClient) GetMetadata() []DCPMetadata { + // Rosmar DCP client does not support getting metadata yet + return nil +} + +func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string { + // this value is probably not correct + return dc.opts.CheckpointPrefix +} diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 6ed3b4daec..6e123780f1 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -131,28 +131,20 @@ func attachmentCompactMarkPhase(ctx context.Context, dataStore base.DataStore, c return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return 0, nil, "", err - } + clientOptions := getCompactionDCPClientOptions(dataStore, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) + clientOptions.ID = GenerateCompactionDCPStreamName(compactionID, MarkPhase) + clientOptions.Callback = callback base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for mark phase of attachment compaction", compactionLoggingID) - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, MarkPhase) - - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return 0, nil, "", err - } - - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return 0, nil, "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) _ = dcpClient.Close() @@ -377,27 +369,24 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore, return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return 0, err - } + clientOptions := getCompactionDCPClientOptions(dataStore, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) + clientOptions.ID = GenerateCompactionDCPStreamName(compactionID, SweepPhase) + clientOptions.Callback = callback clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs) - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, SweepPhase) - bucket, err := base.AsGocbV2Bucket(db.Bucket) if err != nil { return 0, err } - base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, dcpFeedKey) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, clientOptions.ID) + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return 0, err } - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) _ = dcpClient.Close() @@ -513,29 +502,26 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore return true } - clientOptions, err := getCompactionDCPClientOptions(collectionID, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - if err != nil { - return "", err - } + clientOptions := getCompactionDCPClientOptions(dataStore, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) clientOptions.InitialMetadata = base.BuildDCPMetadataSliceFromVBUUIDs(vbUUIDs) + clientOptions.ID = GenerateCompactionDCPStreamName(compactionID, CleanupPhase) + clientOptions.Callback = callback base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed for cleanup phase of attachment compaction", compactionLoggingID) - dcpFeedKey := GenerateCompactionDCPStreamName(compactionID, CleanupPhase) - bucket, err := base.AsGocbV2Bucket(db.Bucket) if err != nil { return "", err } - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment compaction DCP client! %v", compactionLoggingID, err) return "", err } metadataKeyPrefix := dcpClient.GetMetadataKeyPrefix() - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment compaction DCP feed! %v", compactionLoggingID, err) // simplify close in CBG-2234 @@ -574,17 +560,17 @@ func getCompactionIDSubDocPath(compactionID string) string { } // getCompactionDCPClientOptions returns the default set of DCPClientOptions suitable for attachment compaction -func getCompactionDCPClientOptions(collectionID uint32, groupID string, prefix string) (*base.DCPClientOptions, error) { - clientOptions := &base.DCPClientOptions{ +func getCompactionDCPClientOptions(dataStore sgbucket.DataStore, groupID string, prefix string) base.DCPClientOptions { + return base.DCPClientOptions{ OneShot: true, FailOnRollback: true, MetadataStoreType: base.DCPMetadataStoreCS, GroupID: groupID, - CollectionIDs: []uint32{collectionID}, - CheckpointPrefix: prefix, + Scopes: map[string][]string{ + dataStore.ScopeName(): {dataStore.CollectionName()}, + }, + CheckpointPrefix: prefix, } - return clientOptions, nil - } func GenerateCompactionDCPStreamName(compactionID, compactionAction string) string { diff --git a/db/attachment_compaction_test.go b/db/attachment_compaction_test.go index 4a86925440..9f51e68adc 100644 --- a/db/attachment_compaction_test.go +++ b/db/attachment_compaction_test.go @@ -25,10 +25,6 @@ import ( ) func TestAttachmentMark(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Requires CBS") - } - testDb, ctx := setupTestDB(t) defer testDb.Close(ctx) @@ -256,7 +252,6 @@ func TestAttachmentCleanupRollback(t *testing.T) { var garbageVBUUID gocbcore.VbUUID = 1234 collection := GetSingleDatabaseCollection(t, testDb.DatabaseContext) dataStore := collection.dataStore - collectionID := collection.GetCollectionID() makeMarkedDoc := func(docid string, compactID string) { err := dataStore.SetRaw(docid, 0, nil, []byte("{}")) @@ -284,9 +279,10 @@ func TestAttachmentCleanupRollback(t *testing.T) { bucket, err := base.AsGocbV2Bucket(testDb.Bucket) require.NoError(t, err) dcpFeedKey := GenerateCompactionDCPStreamName(t.Name(), CleanupPhase) - clientOptions, err := getCompactionDCPClientOptions(collectionID, testDb.Options.GroupID, testDb.MetadataKeys.DCPCheckpointPrefix(testDb.Options.GroupID)) - require.NoError(t, err) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, nil, *clientOptions, bucket) + clientOptions := getCompactionDCPClientOptions(dataStore, testDb.Options.GroupID, testDb.MetadataKeys.DCPCheckpointPrefix(testDb.Options.GroupID)) + clientOptions.ID = dcpFeedKey + + dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) require.NoError(t, err) // alter dcp metadata to feed into the compaction manager diff --git a/db/background_mgr_attachment_migration.go b/db/background_mgr_attachment_migration.go index afd4f55afd..63134289f7 100644 --- a/db/background_mgr_attachment_migration.go +++ b/db/background_mgr_attachment_migration.go @@ -158,12 +158,7 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string return true } - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return err - } - - currCollectionIDs, err := getCollectionIDsForMigration(db) + scopes, currCollectionIDs, err := getCollectionsForAttachmentMigration(db) if err != nil { return err } @@ -178,15 +173,16 @@ func (a *AttachmentMigrationManager) Run(ctx context.Context, options map[string } a.SetCollectionIDs(currCollectionIDs) - dcpOptions := getMigrationDCPClientOptions(currCollectionIDs, db.Options.GroupID, dcpPrefix) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *dcpOptions, bucket) + dcpOptions := getMigrationDCPClientOptions(scopes, db.Options.GroupID, dcpPrefix) + dcpOptions.Callback = callback + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, dcpOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create attachment migration DCP client: %v", migrationLoggingID, err) return err } base.DebugfCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for attachment migration", migrationLoggingID, dcpFeedKey) - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start attachment migration DCP feed: %v", migrationLoggingID, err) _ = dcpClient.Close() @@ -291,16 +287,15 @@ func (a *AttachmentMigrationManager) GetProcessStatus(status BackgroundManagerSt return statusJSON, metaJSON, err } -func getMigrationDCPClientOptions(collectionIDs []uint32, groupID, prefix string) *base.DCPClientOptions { - clientOptions := &base.DCPClientOptions{ +func getMigrationDCPClientOptions(scopes map[string][]string, groupID, prefix string) base.DCPClientOptions { + return base.DCPClientOptions{ OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, GroupID: groupID, - CollectionIDs: collectionIDs, + Scopes: scopes, CheckpointPrefix: prefix, } - return clientOptions } type AttachmentMigrationManagerResponse struct { @@ -355,25 +350,34 @@ func (a *AttachmentMigrationManager) resetDCPMetadataIfNeeded(ctx context.Contex return nil } -// getCollectionIDsForMigration will get all collection IDs required for DCP client on migration run -func getCollectionIDsForMigration(db *DatabaseContext) ([]uint32, error) { +// getCollectionsForAttachmentMigration will get all datastores. +func getCollectionsForAttachmentMigration(db *DatabaseContext) (scopes map[string][]string, ids []uint32, err error) { + collections := make(map[string][]string, 1) // one scope always collectionIDs := make([]uint32, 0) - // if all collections are included in RequireAttachmentMigration then we need to run against all collections, // if no collections are specified in RequireAttachmentMigration, run against all collections. This is to support job // being triggered by rest api (even after job was previously completed) if len(db.RequireAttachmentMigration) == 0 { - // get all collection IDs - collectionIDs = db.GetCollectionIDs() + for _, collection := range db.CollectionByID { + if _, ok := collections[collection.ScopeName]; !ok { + collections[collection.ScopeName] = make([]string, 0) + } + collections[collection.ScopeName] = append(collections[collection.ScopeName], collection.Name) + collectionIDs = append(collectionIDs, collection.GetCollectionID()) + } } else { // iterate through and grab collectionIDs we need for _, v := range db.RequireAttachmentMigration { collection, err := db.GetDatabaseCollection(v.ScopeName(), v.CollectionName()) if err != nil { - return nil, base.RedactErrorf("failed to find ID for collection %s.%s", base.MD(v.ScopeName()), base.MD(v.CollectionName())) + return nil, nil, base.RedactErrorf("failed to find collection %s.%s", base.MD(v.ScopeName()), base.MD(v.CollectionName())) + } + if _, ok := collections[collection.ScopeName]; !ok { + collections[collection.ScopeName] = make([]string, 0) } + collections[collection.ScopeName] = append(collections[collection.ScopeName], collection.Name) collectionIDs = append(collectionIDs, collection.GetCollectionID()) } } - return collectionIDs, nil + return collections, collectionIDs, nil } diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index 7d583e06b7..24ccb35254 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -20,9 +20,6 @@ import ( ) func TestAttachmentMigrationTaskMixMigratedAndNonMigratedDocs(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -76,10 +73,6 @@ func getAttachmentMigrationStats(t *testing.T, migrationManager BackgroundManage } func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } - db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -142,9 +135,6 @@ func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { } func TestAttachmentMigrationManagerNoDocsToMigrate(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) @@ -182,9 +172,6 @@ func TestAttachmentMigrationManagerNoDocsToMigrate(t *testing.T) { } func TestMigrationManagerDocWithSyncAndGlobalAttachmentMetadata(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar does not support DCP client, pending CBG-4249") - } db, ctx := setupTestDB(t) defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 9f1ed723cd..032c88b7f4 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -163,28 +163,23 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers return true } - bucket, err := base.AsGocbV2Bucket(db.Bucket) - if err != nil { - return err - } - if r.hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against all collections", resyncLoggingID) } else { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against specified collections", resyncLoggingID) } - clientOptions := getResyncDCPClientOptions(r.collectionIDs, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) - - dcpFeedKey := GenerateResyncDCPStreamName(r.ResyncID) - dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) + clientOptions := getResyncDCPClientOptions(r.ResyncedCollections, db.Options.GroupID, db.MetadataKeys.DCPCheckpointPrefix(db.Options.GroupID)) + clientOptions.ID = GenerateResyncDCPStreamName(r.ResyncID) + clientOptions.Callback = callback + dcpClient, err := base.NewDCPClient(ctx, db.Bucket, clientOptions) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to create resync DCP client! %v", resyncLoggingID, err) return err } - base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for resync", resyncLoggingID, dcpFeedKey) - doneChan, err := dcpClient.Start() + base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for resync", resyncLoggingID, clientOptions.ID) + doneChan, err := dcpClient.Start(ctx) if err != nil { base.WarnfCtx(ctx, "[%s] Failed to start resync DCP feed! %v", resyncLoggingID, err) _ = dcpClient.Close() @@ -404,13 +399,13 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error { } // getResyncDCPClientOptions returns the default set of DCPClientOptions suitable for resync -func getResyncDCPClientOptions(collectionIDs []uint32, groupID string, prefix string) *base.DCPClientOptions { - return &base.DCPClientOptions{ +func getResyncDCPClientOptions(collectionNames base.DCPCollections, groupID string, prefix string) base.DCPClientOptions { + return base.DCPClientOptions{ OneShot: true, FailOnRollback: false, MetadataStoreType: base.DCPMetadataStoreCS, GroupID: groupID, - CollectionIDs: collectionIDs, + Scopes: collectionNames, CheckpointPrefix: prefix, } } diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index 0989d414c3..735b029f0e 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -173,10 +173,6 @@ func TestResyncDCPInit(t *testing.T) { } func TestResyncManagerDCPStopInMidWay(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 1000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) @@ -230,10 +226,6 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) { func TestResyncManagerDCPStart(t *testing.T) { base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - t.Run("Resync without updating sync function", func(t *testing.T) { docsToCreate := 100 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, false) @@ -333,10 +325,6 @@ func TestResyncManagerDCPStart(t *testing.T) { } func TestResyncManagerDCPRunTwice(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 1000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, false) defer db.Close(ctx) @@ -390,10 +378,6 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { } func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - docsToCreate := 5000 db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) @@ -462,11 +446,6 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { // TestResyncManagerDCPResumeStoppedProcessChangeCollections starts a resync with a single collection, stops it, and re-runs with an additional collection. // Expects the resync process to reset with a new ID, and new checkpoints, and reprocess the full set of documents across both collections. func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - - base.SetUpTestLogging(t, base.LevelDebug) base.TestRequiresCollections(t) docsPerCollection := 5000 @@ -620,13 +599,6 @@ function sync(doc, oldDoc){ // TestResyncMou ensures that resync updates create mou, and preserve pcas in mou in the case where resync is reprocessing an import func TestResyncMou(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("Test requires Couchbase Server") - } - if !base.TestUseXattrs() { - t.Skip("_mou is written to xattrs only") - } - base.SetUpTestLogging(t, base.LevelInfo, base.KeyMigrate, base.KeyImport) db, ctx := setupTestDBWithOptionsAndImport(t, nil, DatabaseContextOptions{}) defer db.Close(ctx) diff --git a/db/util_testing.go b/db/util_testing.go index 3926ee25e6..7b4bc0571a 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -14,9 +14,7 @@ import ( "context" "errors" "fmt" - "maps" "net/http" - "slices" "strconv" "strings" "sync/atomic" @@ -187,26 +185,19 @@ func purgeWithDCPFeed(ctx context.Context, bucket base.Bucket, tbp *base.TestBuc var purgeErrors *base.MultiError - collections := make(map[uint32]sgbucket.DataStore) - if bucket.IsSupported(sgbucket.BucketStoreFeatureCollections) { - dataStores, err := bucket.ListDataStores() + dataStores, err := bucket.ListDataStores() + if err != nil { + return err + } + collections := make(map[uint32]sgbucket.DataStore, len(dataStores)) + collectionNames := make(base.DCPCollections) + for _, dataStoreName := range dataStores { + collection, err := bucket.NamedDataStore(dataStoreName) if err != nil { return err } - for _, dataStoreName := range dataStores { - collection, err := bucket.NamedDataStore(dataStoreName) - if err != nil { - return err - } - collections[collection.GetCollectionID()] = collection - } - } - - dcpClientOpts := base.DCPClientOptions{ - OneShot: true, - FailOnRollback: false, - CollectionIDs: slices.Collect(maps.Keys(collections)), - MetadataStoreType: base.DCPMetadataStoreInMemory, + collectionNames.Add(dataStoreName) + collections[collection.GetCollectionID()] = collection } purgeCallback := func(event sgbucket.FeedEvent) bool { @@ -273,16 +264,20 @@ func purgeWithDCPFeed(ctx context.Context, bucket base.Bucket, tbp *base.TestBuc } return false } - feedID := "purgeFeed-" + bucket.GetName() - gocbBucket, err := base.AsGocbV2Bucket(bucket) - if err != nil { - return err + dcpClientOpts := base.DCPClientOptions{ + ID: "purgeFeed-" + bucket.GetName(), + OneShot: true, + FailOnRollback: false, + Scopes: collectionNames, + MetadataStoreType: base.DCPMetadataStoreInMemory, + Callback: purgeCallback, } - dcpClient, err := base.NewDCPClient(ctx, feedID, purgeCallback, dcpClientOpts, gocbBucket) + + dcpClient, err := base.NewDCPClient(ctx, bucket, dcpClientOpts) if err != nil { return err } - doneChan, err := dcpClient.Start() + doneChan, err := dcpClient.Start(ctx) if err != nil { return fmt.Errorf("error starting purge DCP feed: %w", err) } diff --git a/tools/cache_perf_tool/dcpDataGeneration.go b/tools/cache_perf_tool/dcpDataGeneration.go index 86c312fa61..fb202a39a5 100644 --- a/tools/cache_perf_tool/dcpDataGeneration.go +++ b/tools/cache_perf_tool/dcpDataGeneration.go @@ -31,7 +31,7 @@ type dcpDataGen struct { seqAlloc *sequenceAllocator delays []time.Duration dbCtx *db.DatabaseContext - client *base.DCPClient + client *base.GoCBDCPClient numChannelsPerDoc int numTotalChannels int simRapidUpdate bool @@ -327,8 +327,8 @@ func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int, casValue u return encodedVal, chanCount, nil } -func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.DCPClient, error) { - options := base.DCPClientOptions{ +func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map, numWorkers, numVBuckets int) (*base.GoCBDCPClient, error) { + options := base.GoCBDCPClientOptions{ MetadataStoreType: base.DCPMetadataStoreInMemory, GroupID: "", DbStats: dbStats,