Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions base/abstract_dcp_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package base

import (
"context"
"expvar"
"fmt"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbaselabs/rosmar"
)

type DCPClient interface {
Start(ctx context.Context) (chan error, error)
Close() error
GetMetadata() []DCPMetadata
GetMetadataKeyPrefix() string
}

type DCPCollections map[string][]string

type DCPClientOptions struct {
ID string // name of the DCP feed, used for logging locally and stored by Couchbase Server
OneShot bool // if true, the feed runs to latest document found when the client is started
FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition
MetadataStoreType DCPMetadataStoreType // persistent or in memory storage
GroupID string // name of groupID of rest.ServerContext in order to isolate DCP checkpoints
CheckpointPrefix string // start of the checkpoint documents
Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events
DBStats *expvar.Map
Scopes map[string][]string // scopes and collections to monitor
InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with
}

func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) {
underlyingBucket := GetBaseBucket(bucket)
if _, ok := underlyingBucket.(*rosmar.Bucket); ok {
return NewRosmarDCPClient(bucket, opts)
} else if gocbBucket, ok := underlyingBucket.(*GocbV2Bucket); ok {
feedArgs := sgbucket.FeedArguments{
ID: opts.ID,
CheckpointPrefix: opts.CheckpointPrefix,
Scopes: opts.Scopes,
}
return newGocbDCPClient(ctx, gocbBucket, gocbBucket.GetName(), feedArgs, opts.Callback, opts.DBStats, opts.MetadataStoreType, opts.GroupID)
}
return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket)
}

func (c DCPCollections) Add(ds ...sgbucket.DataStoreName) {
for _, d := range ds {
if _, ok := c[d.ScopeName()]; !ok {
c[d.ScopeName()] = []string{}
}
c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName())
}
}
73 changes: 37 additions & 36 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ type endStreamCallbackFunc func(e endStreamEvent)

var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set")

type DCPClient struct {
type GoCBDCPClient struct {
ctx context.Context
ID string // unique ID for DCPClient - used for DCP stream name, must be unique
agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to DCPClient stream observer implementation
ID string // unique ID for GoCBDCPClient - used for DCP stream name, must be unique
agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to GoCBDCPClient stream observer implementation
callback sgbucket.FeedEventCallbackFunc // Callback invoked on DCP mutations/deletions
workers []*DCPWorker // Workers for concurrent processing of incoming mutations and callback. vbuckets are partitioned across workers
workersWg sync.WaitGroup // Active workers WG - used for signaling when the DCPClient workers have all stopped so the doneChannel can be closed
workersWg sync.WaitGroup // Active workers WG - used for signaling when the GoCBDCPClient workers have all stopped so the doneChannel can be closed
spec BucketSpec // Bucket spec for the target data store
supportsCollections bool // Whether the target data store supports collections
numVbuckets uint16 // number of vbuckets on target data store
terminator chan bool // Used to close worker goroutines spawned by the DCPClient
terminator chan bool // Used to close worker goroutines spawned by the GoCBDCPClient
doneChannel chan error // Returns nil on successful completion of one-shot feed or external close of feed, error otherwise
metadata DCPMetadataStore // Implementation of DCPMetadataStore for metadata persistence
activeVbuckets map[uint16]struct{} // vbuckets that have an open stream
Expand All @@ -67,7 +67,7 @@ type DCPClient struct {
collectionIDs []uint32 // collectionIDs used by gocbcore, if empty, uses default collections
}

type DCPClientOptions struct {
type GoCBDCPClientOptions struct {
NumWorkers int
OneShot bool
FailOnRollback bool // When true, the DCP client will terminate on DCP rollback
Expand All @@ -81,7 +81,7 @@ type DCPClientOptions struct {
CheckpointPrefix string
}

func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) {
func NewGocbDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) {

numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
Expand All @@ -91,7 +91,7 @@ func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCal
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
}

func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {

numWorkers := DefaultNumWorkers
if options.NumWorkers > 0 {
Expand All @@ -106,7 +106,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
}
}
client := &DCPClient{
client := &GoCBDCPClient{
ctx: ctx,
workers: make([]*DCPWorker, numWorkers),
numVbuckets: numVbuckets,
Expand Down Expand Up @@ -155,7 +155,7 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
}

// getCollectionHighSeqNo returns the highSeqNo for a given KV collection ID.
func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) {
func (dc *GoCBDCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, error) {
vbucketSeqnoOptions := gocbcore.GetVbucketSeqnoOptions{}
if dc.supportsCollections {
vbucketSeqnoOptions.FilterOptions = &gocbcore.GetVbucketSeqnoFilterOptions{CollectionID: collectionID}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (dc *DCPClient) getCollectionHighSeqNos(collectionID uint32) ([]uint64, err
}

// getHighSeqNos returns the maximum sequence number for every collection configured by the DCP agent.
func (dc *DCPClient) getHighSeqNos() ([]uint64, error) {
func (dc *GoCBDCPClient) getHighSeqNos() ([]uint64, error) {
highSeqNos := make([]uint64, dc.numVbuckets)
// Initialize highSeqNo to the current metadata's StartSeqNo - we don't want to use a value lower than what
// we've already processed
Expand All @@ -235,7 +235,7 @@ func (dc *DCPClient) getHighSeqNos() ([]uint64, error) {
}

// configureOneShot sets highSeqnos for a one shot feed.
func (dc *DCPClient) configureOneShot() error {
func (dc *GoCBDCPClient) configureOneShot() error {
highSeqNos, err := dc.getHighSeqNos()
if err != nil {
return err
Expand All @@ -250,8 +250,9 @@ func (dc *DCPClient) configureOneShot() error {
return nil
}

// Start returns an error and a channel to indicate when the DCPClient is done. If Start returns an error, DCPClient.Close() needs to be called.
func (dc *DCPClient) Start() (doneChan chan error, err error) {
// Start returns an error and a channel to indicate when the GoCBDCPClient is done. If Start returns an error, GoCBDCPClient.Close() needs to be called.
func (dc *GoCBDCPClient) Start(ctx context.Context) (doneChan chan error, err error) {
// FIXME: set context here
Copy link

Copilot AI Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FIXME suggests incomplete implementation. The context parameter is passed to Start() but this comment indicates it should be set somewhere. Clarify what needs to be done here or remove the comment if the implementation is correct.

Suggested change
// FIXME: set context here
dc.ctx = ctx

Copilot uses AI. Check for mistakes.
err = dc.initAgent(dc.spec)
if err != nil {
return dc.doneChannel, err
Expand All @@ -274,13 +275,13 @@ func (dc *DCPClient) Start() (doneChan chan error, err error) {
}

// Close is used externally to stop the DCP client. If the client was already closed due to error, returns that error
func (dc *DCPClient) Close() error {
func (dc *GoCBDCPClient) Close() error {
dc.close()
return dc.getCloseError()
}

// GetMetadata returns metadata for all vbuckets
func (dc *DCPClient) GetMetadata() []DCPMetadata {
func (dc *GoCBDCPClient) GetMetadata() []DCPMetadata {
metadata := make([]DCPMetadata, dc.numVbuckets)
for i := uint16(0); i < dc.numVbuckets; i++ {
metadata[i] = dc.metadata.GetMeta(i)
Expand All @@ -290,7 +291,7 @@ func (dc *DCPClient) GetMetadata() []DCPMetadata {

// close is used internally to stop the DCP client. Sends any fatal errors to the client's done channel, and
// closes that channel.
func (dc *DCPClient) close() {
func (dc *GoCBDCPClient) close() {

// set dc.closing to true, avoid re-triggering close if it's already in progress
if !dc.closing.CompareAndSwap(false, true) {
Expand All @@ -316,7 +317,7 @@ func (dc *DCPClient) close() {
}

// getAgentConfig returns a gocbcore.DCPAgentConfig for the given BucketSpec
func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) {
func (dc *GoCBDCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig, error) {
connStr, err := spec.GetGoCBConnStringForDCP()
if err != nil {
return nil, err
Expand Down Expand Up @@ -360,7 +361,7 @@ func (dc *DCPClient) getAgentConfig(spec BucketSpec) (*gocbcore.DCPAgentConfig,
}

// initAgent creates a DCP agent and waits for it to be ready
func (dc *DCPClient) initAgent(spec BucketSpec) error {
func (dc *GoCBDCPClient) initAgent(spec BucketSpec) error {
agentConfig, err := dc.getAgentConfig(spec)
if err != nil {
return err
Expand Down Expand Up @@ -405,13 +406,13 @@ func (dc *DCPClient) initAgent(spec BucketSpec) error {
return nil
}

func (dc *DCPClient) workerForVbno(vbNo uint16) *DCPWorker {
func (dc *GoCBDCPClient) workerForVbno(vbNo uint16) *DCPWorker {
workerIndex := int(vbNo % uint16(len(dc.workers)))
return dc.workers[workerIndex]
}

// startWorkers initializes the DCP workers to receive stream events from eventFeed
func (dc *DCPClient) startWorkers(ctx context.Context) {
func (dc *GoCBDCPClient) startWorkers(ctx context.Context) {

// vbuckets are assigned to workers as vbNo % NumWorkers. Create set of assigned vbuckets
assignedVbs := make(map[int][]uint16)
Expand All @@ -434,7 +435,7 @@ func (dc *DCPClient) startWorkers(ctx context.Context) {
}
}

func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error {
func (dc *GoCBDCPClient) openStream(vbID uint16, maxRetries uint32) error {

var openStreamErr error
var attempts uint32
Expand Down Expand Up @@ -488,7 +489,7 @@ func (dc *DCPClient) openStream(vbID uint16, maxRetries uint32) error {
return fmt.Errorf("openStream failed to complete after %d attempts, last error: %w", attempts, openStreamErr)
}

func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) {
func (dc *GoCBDCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.SeqNo) {
if dc.dbStats != nil {
dc.dbStats.Add("dcp_rollback_count", 1)
}
Expand All @@ -497,7 +498,7 @@ func (dc *DCPClient) rollback(ctx context.Context, vbID uint16, seqNo gocbcore.S

// openStreamRequest issues the OpenStream request, but doesn't perform any error handling. Callers
// should generally use openStream() for error and retry handling
func (dc *DCPClient) openStreamRequest(vbID uint16) error {
func (dc *GoCBDCPClient) openStreamRequest(vbID uint16) error {

vbMeta := dc.metadata.GetMeta(vbID)

Expand Down Expand Up @@ -548,7 +549,7 @@ func (dc *DCPClient) openStreamRequest(vbID uint16) error {
// verifyFailoverLog checks for VbUUID changes when failOnRollback is set, and
// writes the failover log to the client metadata store. If previous VbUUID is zero, it's
// not considered a rollback - it's not required to initialize vbUUIDs into meta.
func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error {
func (dc *GoCBDCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry) error {

if dc.failOnRollback {
previousMeta := dc.metadata.GetMeta(vbID)
Expand All @@ -566,7 +567,7 @@ func (dc *DCPClient) verifyFailoverLog(vbID uint16, f []gocbcore.FailoverEntry)
return nil
}

func (dc *DCPClient) deactivateVbucket(vbID uint16) {
func (dc *GoCBDCPClient) deactivateVbucket(vbID uint16) {
dc.activeVbucketLock.Lock()
delete(dc.activeVbuckets, vbID)
activeCount := len(dc.activeVbuckets)
Expand All @@ -580,16 +581,16 @@ func (dc *DCPClient) deactivateVbucket(vbID uint16) {
}
}

func (dc *DCPClient) onStreamEnd(e endStreamEvent) {
func (dc *GoCBDCPClient) onStreamEnd(e endStreamEvent) {
if e.err == nil {
DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed, all items streamed", e.vbID)
dc.deactivateVbucket(e.vbID)
return
}

if errors.Is(e.err, gocbcore.ErrDCPStreamClosed) {
DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by DCPClient", e.vbID)
dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by DCPClient", e.vbID))
DebugfCtx(dc.ctx, KeyDCP, "Stream (vb:%d) closed by GoCBDCPClient", e.vbID)
dc.fatalError(fmt.Errorf("Stream (vb:%d) closed by GoCBDCPClient", e.vbID))
return
}

Expand All @@ -616,15 +617,15 @@ func (dc *DCPClient) onStreamEnd(e endStreamEvent) {
}(e.vbID, retries)
}

func (dc *DCPClient) fatalError(err error) {
func (dc *GoCBDCPClient) fatalError(err error) {
dc.setCloseError(err)
dc.close()
}

func (dc *DCPClient) setCloseError(err error) {
func (dc *GoCBDCPClient) setCloseError(err error) {
dc.closeErrorLock.Lock()
defer dc.closeErrorLock.Unlock()
// If the DCPClient is already closing, don't update the error. If an initial error triggered the close,
// If the GoCBDCPClient is already closing, don't update the error. If an initial error triggered the close,
// then closeError will already be set. In the event of a requested close, we want to ignore EOF errors associated
// with stream close
if dc.closing.IsTrue() {
Expand All @@ -635,7 +636,7 @@ func (dc *DCPClient) setCloseError(err error) {
}
}

func (dc *DCPClient) getCloseError() error {
func (dc *GoCBDCPClient) getCloseError() error {
dc.closeErrorLock.Lock()
defer dc.closeErrorLock.Unlock()
return dc.closeError
Expand All @@ -661,16 +662,16 @@ func getLatestVbUUID(failoverLog []gocbcore.FailoverEntry) (vbUUID gocbcore.VbUU
return entry.VbUUID
}

func (dc *DCPClient) GetMetadataKeyPrefix() string {
func (dc *GoCBDCPClient) GetMetadataKeyPrefix() string {
return dc.metadata.GetKeyPrefix()
}

// StartWorkersForTest will iterate through dcp workers to start them, to be used for caching testing purposes only.
func (dc *DCPClient) StartWorkersForTest(t *testing.T) {
func (dc *GoCBDCPClient) StartWorkersForTest(t *testing.T) {
dc.startWorkers(dc.ctx)
}

// NewDCPClientForTest is a test-only function to create a DCP client with a specific number of vbuckets.
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options GoCBDCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
}
Loading
Loading