-
Notifications
You must be signed in to change notification settings - Fork 140
Prototype: Create an abstract DCPClient to be able to work with rosmar #7879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces an abstract DCPClient interface to support both Couchbase Server (via gocbcore) and Rosmar backends. The key change is creating a unified DCP client abstraction that dispatches to implementation-specific clients based on the underlying bucket type.
Key changes:
- Created
DCPClientinterface with implementationsGoCBDCPClient(renamed fromDCPClient) andRosmarDCPClient - Unified
DCPClientOptionsstruct replacing separate options for each implementation - Removed test skips for Rosmar/Walrus, enabling DCP-based tests to run against all bucket types
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| base/abstract_dcp_client.go | New abstraction layer with DCPClient interface and factory function NewDCPClient |
| base/rosmar_dcp_client.go | New Rosmar-specific DCP client implementation |
| base/dcp_client.go | Renamed DCPClient to GoCBDCPClient and DCPClientOptions to GoCBDCPClientOptions |
| base/dcp_client_stream_observer.go | Updated receiver types from DCPClient to GoCBDCPClient |
| base/gocb_dcp_feed.go | Refactored to use new client creation pattern |
| db/background_mgr_resync_dcp.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_attachment_migration.go | Updated to use new abstract client with scope-based collection specification |
| db/attachment_compaction.go | Updated to use new abstract client with scope-based collection specification |
| db/util_testing.go | Updated to use new abstract client with scope-based collection specification |
| db/background_mgr_resync_dcp_test.go | Removed Walrus test skips |
| db/background_mgr_attachment_migration_test.go | Removed Walrus test skips |
| db/attachment_compaction_test.go | Removed Walrus test skips |
| base/dcp_client_test.go | Removed Walrus test skips and commented out unported tests |
| tools/cache_perf_tool/dcpDataGeneration.go | Updated type references to GoCBDCPClient |
| } | ||
|
|
||
| func (dc *RosmarDCPClient) GetMetadataKeyPrefix() string { | ||
| // this value is probably not correct |
Copilot
AI
Nov 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment indicates uncertainty about the return value. Either verify the correct implementation or update the comment to explain why this approach was chosen and what the implications are.
| // this value is probably not correct | |
| // Return the CheckpointPrefix from options, which is used as the key prefix for DCP metadata. | |
| // This matches the convention used in other DCP clients and ensures consistency for checkpointing. |
| func (dc *DCPClient) Start() (doneChan chan error, err error) { | ||
| // Start returns an error and a channel to indicate when the GoCBDCPClient is done. If Start returns an error, GoCBDCPClient.Close() needs to be called. | ||
| func (dc *GoCBDCPClient) Start(ctx context.Context) (doneChan chan error, err error) { | ||
| // FIXME: set context here |
Copilot
AI
Nov 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This FIXME suggests incomplete implementation. The context parameter is passed to Start() but this comment indicates it should be set somewhere. Clarify what needs to be done here or remove the comment if the implementation is correct.
| // FIXME: set context here | |
| dc.ctx = ctx |
| bucket, err := base.AsGocbV2Bucket(db.Bucket) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
|
|
||
| base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, dcpFeedKey) | ||
| dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) | ||
| base.InfofCtx(ctx, base.KeyAll, "[%s] Starting DCP feed %q for sweep phase of attachment compaction", compactionLoggingID, clientOptions.ID) | ||
| dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) |
Copilot
AI
Nov 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to base.AsGocbV2Bucket is unnecessary since base.NewDCPClient now accepts base.Bucket directly (line 383 passes bucket which is a *GocbV2Bucket). This conversion should be removed to use db.Bucket directly, consistent with other refactored code in this PR.
| bucket, err := base.AsGocbV2Bucket(db.Bucket) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, callback, *clientOptions, bucket) | ||
| dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) |
Copilot
AI
Nov 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to base.AsGocbV2Bucket is unnecessary since base.NewDCPClient now accepts base.Bucket directly. This conversion should be removed to use db.Bucket directly, consistent with the refactoring in other files.
| bucket, err := base.AsGocbV2Bucket(testDb.Bucket) | ||
| require.NoError(t, err) | ||
| dcpFeedKey := GenerateCompactionDCPStreamName(t.Name(), CleanupPhase) | ||
| clientOptions, err := getCompactionDCPClientOptions(collectionID, testDb.Options.GroupID, testDb.MetadataKeys.DCPCheckpointPrefix(testDb.Options.GroupID)) | ||
| require.NoError(t, err) | ||
| dcpClient, err := base.NewDCPClient(ctx, dcpFeedKey, nil, *clientOptions, bucket) | ||
| clientOptions := getCompactionDCPClientOptions(dataStore, testDb.Options.GroupID, testDb.MetadataKeys.DCPCheckpointPrefix(testDb.Options.GroupID)) | ||
| clientOptions.ID = dcpFeedKey | ||
|
|
||
| dcpClient, err := base.NewDCPClient(ctx, bucket, clientOptions) |
Copilot
AI
Nov 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to base.AsGocbV2Bucket is unnecessary since base.NewDCPClient now accepts base.Bucket directly. This conversion should be removed to use testDb.Bucket directly, consistent with the refactoring in other files.
Create an abstract DCPClient to be able to work with rosmar
I expect that this can be extended to work as a sharded or non shared DCP client as well, but I think there's interesting work here.
I made this "work" but not for attachment compaction since this uses hierarchical paths in rosmar for xattr subdoc operations https://jira.issues.couchbase.com/browse/CBG-4232
TODO
Pre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiDependencies (if applicable)
Integration Tests
GSI=true,xattrs=truehttps://jenkins.sgwdev.com/job/SyncGatewayIntegration/0000/