Skip to content
4 changes: 2 additions & 2 deletions base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// Manages retrieval of set of buckets, and generic interaction with bootstrap metadata documents from those buckets.
type BootstrapConnection interface {
// GetConfigBuckets returns a list of bucket names where a bootstrap metadata documents could reside.
GetConfigBuckets() ([]string, error)
GetConfigBuckets(context.Context) ([]string, error)
// GetMetadataDocument fetches a bootstrap metadata document for a given bucket and key, along with the CAS of the config document.
GetMetadataDocument(ctx context.Context, bucket, key string, valuePtr any) (cas uint64, err error)
// InsertMetadataDocument saves a new bootstrap metadata document for a given bucket and key.
Expand Down Expand Up @@ -258,7 +258,7 @@ func (cc *CouchbaseCluster) getClusterConnection() (*gocb.Cluster, error) {

}

func (cc *CouchbaseCluster) GetConfigBuckets() ([]string, error) {
func (cc *CouchbaseCluster) GetConfigBuckets(context.Context) ([]string, error) {
if cc == nil {
return nil, errors.New("nil CouchbaseCluster")
}
Expand Down
2 changes: 1 addition & 1 deletion base/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestBootstrapRefCounting(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, clusterConnection)

buckets, err := cluster.GetConfigBuckets()
buckets, err := cluster.GetConfigBuckets(ctx)
require.NoError(t, err)
// ensure these are sorted for determinstic bootstraping
sortedBuckets := make([]string, len(buckets))
Expand Down
5 changes: 2 additions & 3 deletions base/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,8 @@ func TestBaseBucket(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
baseBucket := GetBaseBucket(test.bucket)
_, ok := baseBucket.(*rosmar.Bucket)
assert.True(t, ok, "Base bucket wasn't walrus bucket")
_, err := AsRosmarBucket(test.bucket)
require.NoError(t, err)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var (
_ sgbucket.DynamicDataStoreBucket = &GocbV2Bucket{}
)

// AsGocbV2Bucket returns a bucket as a GocbV2Bucket, or an error if it is not one.
func AsGocbV2Bucket(bucket Bucket) (*GocbV2Bucket, error) {
baseBucket := GetBaseBucket(bucket)
if gocbv2Bucket, ok := baseBucket.(*GocbV2Bucket); ok {
Expand Down
131 changes: 100 additions & 31 deletions base/rosmar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ package base
import (
"context"
"errors"
"fmt"
"net/url"
"os"
"runtime"
"strings"

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbaselabs/rosmar"
Expand All @@ -20,90 +25,145 @@ var _ BootstrapConnection = &RosmarCluster{}

// RosmarCluster implements BootstrapConnection and is used for connecting to a rosmar cluster
type RosmarCluster struct {
serverURL string
serverURL string
bucketDirectory string
}

// NewRosmarCluster creates a from a given URL
func NewRosmarCluster(serverURL string) *RosmarCluster {
return &RosmarCluster{
func NewRosmarCluster(serverURL string) (*RosmarCluster, error) {
cluster := &RosmarCluster{
serverURL: serverURL,
}
if serverURL != rosmar.InMemoryURL {
u, err := url.Parse(serverURL)
if err != nil {
return nil, err
}
directory := u.Path
if runtime.GOOS == "windows" {
directory = strings.TrimPrefix(directory, "/")
}
err = os.MkdirAll(directory, 0700)
if err != nil {
return nil, fmt.Errorf("could not create or access directory to open rosmar cluster %q: %w", serverURL, err)
}
cluster.bucketDirectory = directory
}
return cluster, nil
}

// GetConfigBuckets returns all the buckets registered in rosmar.
func (c *RosmarCluster) GetConfigBuckets() ([]string, error) {
func (c *RosmarCluster) GetConfigBuckets(ctx context.Context) ([]string, error) {
// If the cluster is a serialized rosmar cluster, we need to open each bucket to add to rosmar.bucketRegistry.
if c.bucketDirectory != "" {
d, err := os.ReadDir(c.bucketDirectory)
if err != nil {
return nil, err
}
for _, bucketName := range d {
bucket, err := c.openBucket(bucketName.Name())
if err != nil {
return nil, fmt.Errorf("could not open bucket %s from %s :%w", bucketName, c.serverURL, err)
}
defer bucket.Close(ctx)

}
}
return rosmar.GetBucketNames(), nil
}

// openBucket opens a rosmar bucket with the given name.
func (c *RosmarCluster) openBucket(bucketName string) (*rosmar.Bucket, error) {
// OpenBucketIn is required to open a bucket from a serialized rosmar implementation.
return rosmar.OpenBucketIn(c.serverURL, bucketName, rosmar.CreateOrOpen)
}

// getDefaultDataStore returns the default datastore for the specified bucket. Returns a bucket close function and an
// error.
func (c *RosmarCluster) getDefaultDataStore(ctx context.Context, bucketName string) (sgbucket.DataStore, func(ctx context.Context), error) {
bucket, err := rosmar.OpenBucketIn(c.serverURL, bucketName, rosmar.CreateOrOpen)
if err != nil {
return nil, nil, err
}
closeFn := func(ctx context.Context) { bucket.Close(ctx) }

ds, err := bucket.NamedDataStore(DefaultScopeAndCollectionName())
if err != nil {
AssertfCtx(ctx, "Unexpected error getting default collection for bucket %q: %v", bucketName, err)
closeFn(ctx)
return nil, nil, err
}
return ds, closeFn, nil
}

// GetMetadataDocument returns a metadata document from the default collection for the specified bucket.
func (c *RosmarCluster) GetMetadataDocument(ctx context.Context, location, docID string, valuePtr any) (cas uint64, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return 0, err
}
defer bucket.Close(ctx)
defer closer(ctx)

return bucket.DefaultDataStore().Get(docID, valuePtr)
return ds.Get(docID, valuePtr)
}

// InsertMetadataDocument inserts a metadata document, and fails if it already exists.
func (c *RosmarCluster) InsertMetadataDocument(ctx context.Context, location, key string, value any) (newCAS uint64, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return 0, err
}
defer bucket.Close(ctx)
defer closer(ctx)

return bucket.DefaultDataStore().WriteCas(key, 0, 0, value, 0)
return ds.WriteCas(key, 0, 0, value, 0)
}

// WriteMetadataDocument writes a metadata document, and fails on CAS mismatch
func (c *RosmarCluster) WriteMetadataDocument(ctx context.Context, location, docID string, cas uint64, value any) (newCAS uint64, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return 0, err
}
defer bucket.Close(ctx)

return bucket.DefaultDataStore().WriteCas(docID, 0, cas, value, 0)
defer closer(ctx)
return ds.WriteCas(docID, 0, cas, value, 0)
}

// TouchMetadataDocument sets the specified property in a bootstrap metadata document for a given bucket and key. Used to
// trigger CAS update on the document, to block any racing updates. Does not retry on CAS failure.

func (c *RosmarCluster) TouchMetadataDocument(ctx context.Context, location, docID string, property, value string, cas uint64) (newCAS uint64, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return 0, err
}
defer bucket.Close(ctx)
defer closer(ctx)

// FIXME to not touch the whole document?
return bucket.DefaultDataStore().Touch(docID, 0)
return ds.Touch(docID, 0)
}

// DeleteMetadataDocument deletes an existing bootstrap metadata document for a given bucket and key.
func (c *RosmarCluster) DeleteMetadataDocument(ctx context.Context, location, key string, cas uint64) error {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return err
}
defer bucket.Close(ctx)
defer closer(ctx)

_, err = bucket.DefaultDataStore().Remove(key, cas)
_, err = ds.Remove(key, cas)
return err
}

// UpdateMetadataDocument updates a given document and retries on CAS mismatch.
func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, docID string, updateCallback func(bucketConfig []byte, rawBucketConfigCas uint64) (newConfig []byte, err error)) (newCAS uint64, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return 0, err
}
defer bucket.Close(ctx)
defer closer(ctx)
for {
var bucketValue []byte
cas, err := bucket.DefaultDataStore().Get(docID, &bucketValue)
cas, err := ds.Get(docID, &bucketValue)
if err != nil {
return 0, err
}
Expand All @@ -113,7 +173,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
}
// handle delete when updateCallback returns nil
if newConfig == nil {
removeCasOut, err := bucket.DefaultDataStore().Remove(docID, cas)
removeCasOut, err := ds.Remove(docID, cas)
if err != nil {
// retry on cas failure
if errors.As(err, &sgbucket.CasMismatchErr{}) {
Expand All @@ -124,7 +184,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do
return removeCasOut, nil
}

replaceCfgCasOut, err := bucket.DefaultDataStore().WriteCas(docID, 0, cas, newConfig, 0)
replaceCfgCasOut, err := ds.WriteCas(docID, 0, cas, newConfig, 0)
if err != nil {
if errors.As(err, &sgbucket.CasMismatchErr{}) {
// retry on cas failure
Expand All @@ -140,25 +200,25 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do

// KeyExists checks whether a key exists in the default collection for the specified bucket
func (c *RosmarCluster) KeyExists(ctx context.Context, location, docID string) (exists bool, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, location)
if err != nil {
return false, err
}
defer bucket.Close(ctx)
defer closer(ctx)

return bucket.DefaultDataStore().Exists(docID)
return ds.Exists(docID)
}

// GetDocument fetches a document from the default collection. Does not use configPersistence - callers
// requiring configPersistence handling should use GetMetadataDocument.
func (c *RosmarCluster) GetDocument(ctx context.Context, bucketName, docID string, rv any) (exists bool, err error) {
bucket, err := rosmar.OpenBucket(c.serverURL, bucketName, rosmar.CreateOrOpen)
ds, closer, err := c.getDefaultDataStore(ctx, bucketName)
if err != nil {
return false, err
}
defer bucket.Close(ctx)
defer closer(ctx)

_, err = bucket.DefaultDataStore().Get(docID, rv)
_, err = ds.Get(docID, rv)
if IsDocNotFoundError(err) {
return false, nil
}
Expand All @@ -171,3 +231,12 @@ func (c *RosmarCluster) Close() {
}

func (c *RosmarCluster) SetConnectionStringServerless() error { return nil }

// AsRosmarBucket returns a bucket as a rosmar.Bucket, or an error if it is not one.
func AsRosmarBucket(bucket Bucket) (*rosmar.Bucket, error) {
baseBucket := GetBaseBucket(bucket)
if b, ok := baseBucket.(*rosmar.Bucket); ok {
return b, nil
}
return nil, fmt.Errorf("bucket is not a rosmar bucket (type %T)", baseBucket)
}
2 changes: 1 addition & 1 deletion docs/api/paths/admin/db-_flush.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ post:
description: |-
**This is unsupported**

This will purge *all* documents.
This will purge *all* documents and remove any other database that is backed by this bucket.

The bucket will only be flushed if the unsupported database configuration option `enable_couchbase_bucket_flush` is set.

Expand Down
2 changes: 1 addition & 1 deletion rest/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2308,7 +2308,7 @@ func (h *handler) handleGetClusterInfo() error {
eccv := h.server.getBucketCCVSettings()
if h.server.persistentConfig {

bucketNames, err := h.server.GetBucketNames()
bucketNames, err := h.server.GetBucketNames(h.ctx())
if err != nil {
return err
}
Expand Down
55 changes: 39 additions & 16 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,27 +382,50 @@ func TestGetStatus(t *testing.T) {
}

func TestFlush(t *testing.T) {
for _, persistentConfig := range []bool{false, true} {
t.Run(fmt.Sprintf("persistentConfig=%t", persistentConfig), func(t *testing.T) {
unsupportedOptions := &db.UnsupportedOptions{
APIEndpoints: &db.APIEndpoints{
EnableCouchbaseBucketFlush: true,
},
}

if !base.UnitTestUrlIsWalrus() {
t.Skip("sgbucket.DeleteableBucket inteface only supported by Walrus")
}
rt := rest.NewRestTester(t, nil)
defer rt.Close()
rt := rest.NewRestTester(t, &rest.RestTesterConfig{
PersistentConfig: persistentConfig,
DatabaseConfig: &rest.DatabaseConfig{
DbConfig: rest.DbConfig{
Unsupported: unsupportedOptions,
},
},
})
defer rt.Close()

if persistentConfig {
dbConfig := rt.NewDbConfig()
if !base.UnitTestUrlIsWalrus() {
dbConfig.Unsupported = unsupportedOptions
}
rest.RequireStatus(t, rt.CreateDatabase("db", dbConfig), http.StatusCreated)
}
rt.CreateTestDoc("doc1")
rt.CreateTestDoc("doc2")
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200)
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 200)

rt.CreateTestDoc("doc1")
rt.CreateTestDoc("doc2")
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200)
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 200)
log.Printf("Flushing db...")
rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_flush", ""), 200)

log.Printf("Flushing db...")
rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_flush", ""), 200)
rt.SetAdminParty(true) // needs to be re-enabled after flush since guest user got wiped
// After the flush, the db exists but the documents are gone:
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/db/", ""), 200)

// After the flush, the db exists but the documents are gone:
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/db/", ""), 200)
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 404)
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 404)

rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 404)
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 404)
// recreate document
rt.CreateTestDoc("doc1")
rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200)
})
}
}

// Test a single call to take DB offline
Expand Down
Loading
Loading