diff --git a/base/bootstrap.go b/base/bootstrap.go index df474920c1..e3aafe418c 100644 --- a/base/bootstrap.go +++ b/base/bootstrap.go @@ -25,7 +25,7 @@ import ( // Manages retrieval of set of buckets, and generic interaction with bootstrap metadata documents from those buckets. type BootstrapConnection interface { // GetConfigBuckets returns a list of bucket names where a bootstrap metadata documents could reside. - GetConfigBuckets() ([]string, error) + GetConfigBuckets(context.Context) ([]string, error) // GetMetadataDocument fetches a bootstrap metadata document for a given bucket and key, along with the CAS of the config document. GetMetadataDocument(ctx context.Context, bucket, key string, valuePtr any) (cas uint64, err error) // InsertMetadataDocument saves a new bootstrap metadata document for a given bucket and key. @@ -258,7 +258,7 @@ func (cc *CouchbaseCluster) getClusterConnection() (*gocb.Cluster, error) { } -func (cc *CouchbaseCluster) GetConfigBuckets() ([]string, error) { +func (cc *CouchbaseCluster) GetConfigBuckets(context.Context) ([]string, error) { if cc == nil { return nil, errors.New("nil CouchbaseCluster") } diff --git a/base/bootstrap_test.go b/base/bootstrap_test.go index 3e11b2f571..8704bdb8c8 100644 --- a/base/bootstrap_test.go +++ b/base/bootstrap_test.go @@ -59,7 +59,7 @@ func TestBootstrapRefCounting(t *testing.T) { require.NoError(t, err) require.NotNil(t, clusterConnection) - buckets, err := cluster.GetConfigBuckets() + buckets, err := cluster.GetConfigBuckets(ctx) require.NoError(t, err) // ensure these are sorted for determinstic bootstraping sortedBuckets := make([]string, len(buckets)) diff --git a/base/bucket_test.go b/base/bucket_test.go index e3a6f0d78a..c8df81b762 100644 --- a/base/bucket_test.go +++ b/base/bucket_test.go @@ -471,9 +471,8 @@ func TestBaseBucket(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - baseBucket := GetBaseBucket(test.bucket) - _, ok := baseBucket.(*rosmar.Bucket) - assert.True(t, ok, "Base bucket wasn't walrus bucket") + _, err := AsRosmarBucket(test.bucket) + require.NoError(t, err) }) } } diff --git a/base/collection.go b/base/collection.go index 764a64b78b..4994b1acb0 100644 --- a/base/collection.go +++ b/base/collection.go @@ -196,6 +196,7 @@ var ( _ sgbucket.DynamicDataStoreBucket = &GocbV2Bucket{} ) +// AsGocbV2Bucket returns a bucket as a GocbV2Bucket, or an error if it is not one. func AsGocbV2Bucket(bucket Bucket) (*GocbV2Bucket, error) { baseBucket := GetBaseBucket(bucket) if gocbv2Bucket, ok := baseBucket.(*GocbV2Bucket); ok { diff --git a/base/rosmar_cluster.go b/base/rosmar_cluster.go index f4734cfc06..8bbd8a9937 100644 --- a/base/rosmar_cluster.go +++ b/base/rosmar_cluster.go @@ -11,6 +11,11 @@ package base import ( "context" "errors" + "fmt" + "net/url" + "os" + "runtime" + "strings" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbaselabs/rosmar" @@ -20,90 +25,145 @@ var _ BootstrapConnection = &RosmarCluster{} // RosmarCluster implements BootstrapConnection and is used for connecting to a rosmar cluster type RosmarCluster struct { - serverURL string + serverURL string + bucketDirectory string } // NewRosmarCluster creates a from a given URL -func NewRosmarCluster(serverURL string) *RosmarCluster { - return &RosmarCluster{ +func NewRosmarCluster(serverURL string) (*RosmarCluster, error) { + cluster := &RosmarCluster{ serverURL: serverURL, } + if serverURL != rosmar.InMemoryURL { + u, err := url.Parse(serverURL) + if err != nil { + return nil, err + } + directory := u.Path + if runtime.GOOS == "windows" { + directory = strings.TrimPrefix(directory, "/") + } + err = os.MkdirAll(directory, 0700) + if err != nil { + return nil, fmt.Errorf("could not create or access directory to open rosmar cluster %q: %w", serverURL, err) + } + cluster.bucketDirectory = directory + } + return cluster, nil } // GetConfigBuckets returns all the buckets registered in rosmar. -func (c *RosmarCluster) GetConfigBuckets() ([]string, error) { +func (c *RosmarCluster) GetConfigBuckets(ctx context.Context) ([]string, error) { + // If the cluster is a serialized rosmar cluster, we need to open each bucket to add to rosmar.bucketRegistry. + if c.bucketDirectory != "" { + d, err := os.ReadDir(c.bucketDirectory) + if err != nil { + return nil, err + } + for _, bucketName := range d { + bucket, err := c.openBucket(bucketName.Name()) + if err != nil { + return nil, fmt.Errorf("could not open bucket %s from %s :%w", bucketName, c.serverURL, err) + } + defer bucket.Close(ctx) + + } + } return rosmar.GetBucketNames(), nil } +// openBucket opens a rosmar bucket with the given name. +func (c *RosmarCluster) openBucket(bucketName string) (*rosmar.Bucket, error) { + // OpenBucketIn is required to open a bucket from a serialized rosmar implementation. + return rosmar.OpenBucketIn(c.serverURL, bucketName, rosmar.CreateOrOpen) +} + +// getDefaultDataStore returns the default datastore for the specified bucket. Returns a bucket close function and an +// error. +func (c *RosmarCluster) getDefaultDataStore(ctx context.Context, bucketName string) (sgbucket.DataStore, func(ctx context.Context), error) { + bucket, err := rosmar.OpenBucketIn(c.serverURL, bucketName, rosmar.CreateOrOpen) + if err != nil { + return nil, nil, err + } + closeFn := func(ctx context.Context) { bucket.Close(ctx) } + + ds, err := bucket.NamedDataStore(DefaultScopeAndCollectionName()) + if err != nil { + AssertfCtx(ctx, "Unexpected error getting default collection for bucket %q: %v", bucketName, err) + closeFn(ctx) + return nil, nil, err + } + return ds, closeFn, nil +} + // GetMetadataDocument returns a metadata document from the default collection for the specified bucket. func (c *RosmarCluster) GetMetadataDocument(ctx context.Context, location, docID string, valuePtr any) (cas uint64, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return 0, err } - defer bucket.Close(ctx) + defer closer(ctx) - return bucket.DefaultDataStore().Get(docID, valuePtr) + return ds.Get(docID, valuePtr) } // InsertMetadataDocument inserts a metadata document, and fails if it already exists. func (c *RosmarCluster) InsertMetadataDocument(ctx context.Context, location, key string, value any) (newCAS uint64, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return 0, err } - defer bucket.Close(ctx) + defer closer(ctx) - return bucket.DefaultDataStore().WriteCas(key, 0, 0, value, 0) + return ds.WriteCas(key, 0, 0, value, 0) } // WriteMetadataDocument writes a metadata document, and fails on CAS mismatch func (c *RosmarCluster) WriteMetadataDocument(ctx context.Context, location, docID string, cas uint64, value any) (newCAS uint64, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return 0, err } - defer bucket.Close(ctx) - - return bucket.DefaultDataStore().WriteCas(docID, 0, cas, value, 0) + defer closer(ctx) + return ds.WriteCas(docID, 0, cas, value, 0) } // TouchMetadataDocument sets the specified property in a bootstrap metadata document for a given bucket and key. Used to // trigger CAS update on the document, to block any racing updates. Does not retry on CAS failure. func (c *RosmarCluster) TouchMetadataDocument(ctx context.Context, location, docID string, property, value string, cas uint64) (newCAS uint64, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return 0, err } - defer bucket.Close(ctx) + defer closer(ctx) // FIXME to not touch the whole document? - return bucket.DefaultDataStore().Touch(docID, 0) + return ds.Touch(docID, 0) } // DeleteMetadataDocument deletes an existing bootstrap metadata document for a given bucket and key. func (c *RosmarCluster) DeleteMetadataDocument(ctx context.Context, location, key string, cas uint64) error { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return err } - defer bucket.Close(ctx) + defer closer(ctx) - _, err = bucket.DefaultDataStore().Remove(key, cas) + _, err = ds.Remove(key, cas) return err } // UpdateMetadataDocument updates a given document and retries on CAS mismatch. func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, docID string, updateCallback func(bucketConfig []byte, rawBucketConfigCas uint64) (newConfig []byte, err error)) (newCAS uint64, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return 0, err } - defer bucket.Close(ctx) + defer closer(ctx) for { var bucketValue []byte - cas, err := bucket.DefaultDataStore().Get(docID, &bucketValue) + cas, err := ds.Get(docID, &bucketValue) if err != nil { return 0, err } @@ -113,7 +173,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do } // handle delete when updateCallback returns nil if newConfig == nil { - removeCasOut, err := bucket.DefaultDataStore().Remove(docID, cas) + removeCasOut, err := ds.Remove(docID, cas) if err != nil { // retry on cas failure if errors.As(err, &sgbucket.CasMismatchErr{}) { @@ -124,7 +184,7 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do return removeCasOut, nil } - replaceCfgCasOut, err := bucket.DefaultDataStore().WriteCas(docID, 0, cas, newConfig, 0) + replaceCfgCasOut, err := ds.WriteCas(docID, 0, cas, newConfig, 0) if err != nil { if errors.As(err, &sgbucket.CasMismatchErr{}) { // retry on cas failure @@ -140,25 +200,25 @@ func (c *RosmarCluster) UpdateMetadataDocument(ctx context.Context, location, do // KeyExists checks whether a key exists in the default collection for the specified bucket func (c *RosmarCluster) KeyExists(ctx context.Context, location, docID string) (exists bool, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, location, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, location) if err != nil { return false, err } - defer bucket.Close(ctx) + defer closer(ctx) - return bucket.DefaultDataStore().Exists(docID) + return ds.Exists(docID) } // GetDocument fetches a document from the default collection. Does not use configPersistence - callers // requiring configPersistence handling should use GetMetadataDocument. func (c *RosmarCluster) GetDocument(ctx context.Context, bucketName, docID string, rv any) (exists bool, err error) { - bucket, err := rosmar.OpenBucket(c.serverURL, bucketName, rosmar.CreateOrOpen) + ds, closer, err := c.getDefaultDataStore(ctx, bucketName) if err != nil { return false, err } - defer bucket.Close(ctx) + defer closer(ctx) - _, err = bucket.DefaultDataStore().Get(docID, rv) + _, err = ds.Get(docID, rv) if IsDocNotFoundError(err) { return false, nil } @@ -171,3 +231,12 @@ func (c *RosmarCluster) Close() { } func (c *RosmarCluster) SetConnectionStringServerless() error { return nil } + +// AsRosmarBucket returns a bucket as a rosmar.Bucket, or an error if it is not one. +func AsRosmarBucket(bucket Bucket) (*rosmar.Bucket, error) { + baseBucket := GetBaseBucket(bucket) + if b, ok := baseBucket.(*rosmar.Bucket); ok { + return b, nil + } + return nil, fmt.Errorf("bucket is not a rosmar bucket (type %T)", baseBucket) +} diff --git a/docs/api/paths/admin/db-_flush.yaml b/docs/api/paths/admin/db-_flush.yaml index 33bdbc13de..6125ad28b6 100644 --- a/docs/api/paths/admin/db-_flush.yaml +++ b/docs/api/paths/admin/db-_flush.yaml @@ -12,7 +12,7 @@ post: description: |- **This is unsupported** - This will purge *all* documents. + This will purge *all* documents and remove any other database that is backed by this bucket. The bucket will only be flushed if the unsupported database configuration option `enable_couchbase_bucket_flush` is set. diff --git a/rest/admin_api.go b/rest/admin_api.go index 4fc03f8f78..08136e43ff 100644 --- a/rest/admin_api.go +++ b/rest/admin_api.go @@ -2308,7 +2308,7 @@ func (h *handler) handleGetClusterInfo() error { eccv := h.server.getBucketCCVSettings() if h.server.persistentConfig { - bucketNames, err := h.server.GetBucketNames() + bucketNames, err := h.server.GetBucketNames(h.ctx()) if err != nil { return err } diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 89045ba4cb..04235532e5 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -382,27 +382,50 @@ func TestGetStatus(t *testing.T) { } func TestFlush(t *testing.T) { + for _, persistentConfig := range []bool{false, true} { + t.Run(fmt.Sprintf("persistentConfig=%t", persistentConfig), func(t *testing.T) { + unsupportedOptions := &db.UnsupportedOptions{ + APIEndpoints: &db.APIEndpoints{ + EnableCouchbaseBucketFlush: true, + }, + } - if !base.UnitTestUrlIsWalrus() { - t.Skip("sgbucket.DeleteableBucket inteface only supported by Walrus") - } - rt := rest.NewRestTester(t, nil) - defer rt.Close() + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + PersistentConfig: persistentConfig, + DatabaseConfig: &rest.DatabaseConfig{ + DbConfig: rest.DbConfig{ + Unsupported: unsupportedOptions, + }, + }, + }) + defer rt.Close() + + if persistentConfig { + dbConfig := rt.NewDbConfig() + if !base.UnitTestUrlIsWalrus() { + dbConfig.Unsupported = unsupportedOptions + } + rest.RequireStatus(t, rt.CreateDatabase("db", dbConfig), http.StatusCreated) + } + rt.CreateTestDoc("doc1") + rt.CreateTestDoc("doc2") + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200) + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 200) - rt.CreateTestDoc("doc1") - rt.CreateTestDoc("doc2") - rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200) - rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 200) + log.Printf("Flushing db...") + rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_flush", ""), 200) - log.Printf("Flushing db...") - rest.RequireStatus(t, rt.SendAdminRequest("POST", "/db/_flush", ""), 200) - rt.SetAdminParty(true) // needs to be re-enabled after flush since guest user got wiped + // After the flush, the db exists but the documents are gone: + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/db/", ""), 200) - // After the flush, the db exists but the documents are gone: - rest.RequireStatus(t, rt.SendAdminRequest("GET", "/db/", ""), 200) + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 404) + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 404) - rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 404) - rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc2", ""), 404) + // recreate document + rt.CreateTestDoc("doc1") + rest.RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1", ""), 200) + }) + } } // Test a single call to take DB offline diff --git a/rest/api.go b/rest/api.go index 7a589d7de1..fc69082052 100644 --- a/rest/api.go +++ b/rest/api.go @@ -21,7 +21,6 @@ import ( "sync/atomic" "time" - sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" "github.com/felixge/fgprof" @@ -274,77 +273,52 @@ func (h *handler) handleCompact() error { } func (h *handler) handleFlush() error { + var deleteFunc func(base.BucketSpec) error - baseBucket := base.GetBaseBucket(h.db.Bucket) - - // If it can be flushed, then flush it - if _, ok := baseBucket.(sgbucket.FlushableStore); ok { - - // If it's not a walrus bucket, don't allow flush unless the unsupported config is set - if !h.db.BucketSpec.IsWalrusBucket() { - if !h.db.DatabaseContext.AllowFlushNonCouchbaseBuckets() { - return errors.New("Flush not allowed on Couchbase buckets by default.") - } - } - - name := h.db.Name - config := h.server.GetDatabaseConfig(name) - - // This needs to first call RemoveDatabase since flushing the bucket under Sync Gateway might cause issues. - h.server.RemoveDatabase(h.ctx(), name, fmt.Sprintf("called from %s", h.rq.URL)) - - // Create a bucket connection spec from the database config - spec, err := GetBucketSpec(h.ctx(), &config.DatabaseConfig, h.server.Config) - if err != nil { - return err - } - - // Manually re-open a temporary bucket connection just for flushing purposes - tempBucketForFlush, err := db.ConnectToBucket(h.ctx(), spec, false) - if err != nil { - return err + if _, err := base.AsGocbV2Bucket(h.db.Bucket); err == nil { + if !h.db.DatabaseContext.AllowFlushNonCouchbaseBuckets() { + return errors.New("Flush not allowed on Couchbase buckets by default.") } - defer tempBucketForFlush.Close(h.ctx()) // Close the temporary connection to the bucket that was just for purposes of flushing it - - // Flush the bucket (assuming it conforms to sgbucket.DeleteableStore interface - if tempBucketForFlush, ok := tempBucketForFlush.(sgbucket.FlushableStore); ok { - - // Flush - err := tempBucketForFlush.Flush() + deleteFunc = func(spec base.BucketSpec) error { + // open a copy of the bucket to flush + bucket, err := db.ConnectToBucket(h.ctx(), spec, false) if err != nil { return err } - - } - - // Re-open database and add to Sync Gateway - _, err2 := h.server.AddDatabaseFromConfig(h.ctx(), config.DatabaseConfig) - if err2 != nil { - return err2 + defer bucket.Close(h.ctx()) + // Flush the bucket + gocbBucket, err := base.AsGocbV2Bucket(bucket) + if err != nil { + return err + } + return gocbBucket.Flush(h.ctx()) } - base.Audit(h.ctx(), base.AuditIDDatabaseFlush, nil) - - } else if bucket, ok := baseBucket.(sgbucket.DeleteableStore); ok { - - // If it's not flushable, but it's deletable, then delete it + } else if _, err := base.AsRosmarBucket(h.db.Bucket); err == nil { + deleteFunc = func(spec base.BucketSpec) error { + bucket, err := db.ConnectToBucket(h.ctx(), spec, false) + if err != nil { + return fmt.Errorf("could not open bucket in order to delete it: %w", err) + } - name := h.db.Name - config := h.server.GetDatabaseConfig(name) - h.server.RemoveDatabase(h.ctx(), name, fmt.Sprintf("called from %s", h.rq.URL)) - err := bucket.CloseAndDelete(h.ctx()) - _, err2 := h.server.AddDatabaseFromConfig(h.ctx(), config.DatabaseConfig) - if err != nil { - return err - } else if err2 != nil { - return err2 + rosmarBucket, err := base.AsRosmarBucket(bucket) + if err != nil { + return err + } + err = rosmarBucket.CloseAndDelete(h.ctx()) + if err != nil { + return err + } + return nil } - base.Audit(h.ctx(), base.AuditIDDatabaseFlush, nil) - return nil } else { + return base.HTTPErrorf(http.StatusServiceUnavailable, "Bucket type %T does not support flush or delete", h.db.Bucket) + } - return base.HTTPErrorf(http.StatusServiceUnavailable, "Bucket does not support flush or delete") - + err := h.server.removeBucketAndRecreateDatabase(h.ctx(), h.db.Name, deleteFunc) + if err != nil { + return err } + base.Audit(h.ctx(), base.AuditIDDatabaseFlush, nil) return nil diff --git a/rest/bootstrap_test.go b/rest/bootstrap_test.go index 945de28a3e..4b4438ed55 100644 --- a/rest/bootstrap_test.go +++ b/rest/bootstrap_test.go @@ -11,11 +11,14 @@ package rest import ( "fmt" "net/http" + "path/filepath" + "runtime" "testing" "time" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" + "github.com/couchbaselabs/rosmar" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -247,3 +250,72 @@ func DevTestFetchConfigManual(t *testing.T) { time.Sleep(15 * time.Second) } + +func TestBootstrapRosmarServer(t *testing.T) { + tempDir := t.TempDir() + diskURL := "rosmar://" + tempDir + if runtime.GOOS == "windows" { + // rosmar requires prefix forward slash and forward slashes + diskURL = "rosmar:///" + filepath.ToSlash(tempDir) + } + testCases := []struct { + name string + rosmarURL string + }{ + { + name: "InMemory", + rosmarURL: rosmar.InMemoryURL, + }, + { + name: "DiskBased", + rosmarURL: diskURL, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := base.TestCtx(t) + dbName := "testrosmarboostrap" + bucketName := dbName // bucket name will match db name by default + defer func() { + // buckets are cached by name _across_ rosmar URIs (including in-memory), so ensure we clean up after ourselves + bucket, err := rosmar.OpenBucketIn(tc.rosmarURL, bucketName, rosmar.CreateOrOpen) + assert.NoError(t, err) + assert.NoError(t, bucket.CloseAndDelete(ctx)) + }() + config := BootstrapStartupConfigForTest(t) + config.Bootstrap.Server = tc.rosmarURL + // set ConfigUpdateFrequency so high to avoid it running, trigger this manually below + config.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(time.Hour * 24) + sc, closeFn := StartServerWithConfig(t, &config) + defer closeFn() + + resp := BootstrapAdminRequest(t, sc, http.MethodPut, "/"+dbName+"/", fmt.Sprintf(`{"bucket": "%s", "scopes":{"_default":{"collections":{"_default":{}}}}}`, bucketName)) + resp.RequireStatus(http.StatusCreated) + + collectionDBName := "namedcollectiondb" + resp = BootstrapAdminRequest(t, sc, http.MethodPut, "/"+collectionDBName+"/", fmt.Sprintf(`{"bucket": "%s", "scopes":{"custom_scope":{"collections":{"custom_collection":{}}}}}`, bucketName)) + resp.RequireStatus(http.StatusCreated) + + resp = BootstrapAdminRequest(t, sc, http.MethodPost, "/"+dbName+"/_flush", `{}`) + resp.RequireStatus(http.StatusOK) + + // make sure collection database is removed + resp = BootstrapAdminRequest(t, sc, http.MethodGet, "/"+collectionDBName+"/", "") + resp.RequireStatus(http.StatusNotFound) + + // ensure that config polling won't reload the database + preBootstrapDB, err := sc.GetActiveDatabase(dbName) + require.NoError(t, err) + + // manually bootstrap poll + count, err := sc.fetchAndLoadConfigs(ctx, false) + require.NoError(t, err) + require.Equal(t, 0, count, "expected 0 databases to be loaded on config polling") + + postBoostrapDB, err := sc.GetActiveDatabase(dbName) + require.NoError(t, err) + // compare pointers of the database + assert.Equal(t, preBootstrapDB, postBoostrapDB) + }) + } +} diff --git a/rest/config.go b/rest/config.go index 92749fc22f..89dfed71ec 100644 --- a/rest/config.go +++ b/rest/config.go @@ -1749,7 +1749,7 @@ func (sc *ServerContext) _fetchAndLoadDatabase(nonContextStruct base.NonCancella // migrateV30Configs checks for configs stored in the 3.0 location, and migrates them to the db registry func (sc *ServerContext) migrateV30Configs(ctx context.Context) error { groupID := sc.Config.Bootstrap.ConfigGroupID - buckets, err := sc.BootstrapContext.Connection.GetConfigBuckets() + buckets, err := sc.BootstrapContext.Connection.GetConfigBuckets(ctx) if err != nil { return err } @@ -1782,7 +1782,10 @@ func (sc *ServerContext) migrateV30Configs(ctx context.Context) error { return nil } -func (sc *ServerContext) findBucketWithCallback(callback func(bucket string) (exit bool, err error)) (err error) { +// findBucketWithCallback ensures the bucket exists in the BootstrapContext and calls the callback +// function. Returns an error if the bootstrap context can't find the bucket or the callback function returns an +// error and the exit return value for callback is true. +func (sc *ServerContext) findBucketWithCallback(ctx context.Context, callback func(bucket string) (exit bool, err error)) (err error) { // rewritten loop from FetchDatabase as part of CBG-2420 PR review var buckets []string if sc.Config.IsServerless() { @@ -1791,7 +1794,7 @@ func (sc *ServerContext) findBucketWithCallback(callback func(bucket string) (ex buckets = append(buckets, bucket) } } else { - buckets, err = sc.BootstrapContext.Connection.GetConfigBuckets() + buckets, err = sc.BootstrapContext.Connection.GetConfigBuckets(ctx) if err != nil { return fmt.Errorf("couldn't get buckets from cluster: %w", err) } @@ -1869,7 +1872,7 @@ func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (fou return foundInBucket, callbackErr } - err = sc.findBucketWithCallback(callback) + err = sc.findBucketWithCallback(ctx, callback) if err != nil { return false, nil, err @@ -1921,7 +1924,7 @@ func (sc *ServerContext) bucketNameFromDbName(ctx context.Context, dbName string } return false, nil } - err := sc.findBucketWithCallback(callback) + err := sc.findBucketWithCallback(ctx, callback) if err != nil { return "", false } @@ -1951,7 +1954,7 @@ func (sc *ServerContext) fetchConfigsSince(ctx context.Context, refreshInterval } // GetBucketNames returns a slice of the bucket names associated with the server context -func (sc *ServerContext) GetBucketNames() (buckets []string, err error) { +func (sc *ServerContext) GetBucketNames(ctx context.Context) (buckets []string, err error) { if sc.Config.IsServerless() { buckets = make([]string, len(sc.Config.BucketCredentials)) for bucket, _ := range sc.Config.BucketCredentials { @@ -1968,7 +1971,7 @@ func (sc *ServerContext) GetBucketNames() (buckets []string, err error) { // } // } } else { - buckets, err = sc.BootstrapContext.Connection.GetConfigBuckets() + buckets, err = sc.BootstrapContext.Connection.GetConfigBuckets(ctx) if err != nil { return nil, fmt.Errorf("couldn't get buckets from cluster: %w", err) } @@ -1979,7 +1982,7 @@ func (sc *ServerContext) GetBucketNames() (buckets []string, err error) { // FetchConfigs retrieves all database configs from the ServerContext's bootstrapConnection. func (sc *ServerContext) FetchConfigs(ctx context.Context, isInitialStartup bool) (dbNameConfigs map[string]DatabaseConfig, err error) { - buckets, err := sc.GetBucketNames() + buckets, err := sc.GetBucketNames(ctx) if err != nil { return nil, err } diff --git a/rest/main.go b/rest/main.go index 3fe7f3c812..ae6be7b363 100644 --- a/rest/main.go +++ b/rest/main.go @@ -426,8 +426,7 @@ func setBootstrapConnectionOptsFromDbConfig(opts *bootstrapConnectionOpts, dbCon func createBootstrapConnectionWithOpts(ctx context.Context, opts bootstrapConnectionOpts) (base.BootstrapConnection, error) { if base.ServerIsWalrus(opts.server) { - cluster := base.NewRosmarCluster(opts.server) - return cluster, nil + return base.NewRosmarCluster(opts.server) } var connStr string diff --git a/rest/server_context.go b/rest/server_context.go index 23df36fceb..9e61ffe3e0 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -2331,6 +2331,66 @@ func (sc *ServerContext) getClusterUUID(ctx context.Context) (string, error) { return base.ParseClusterUUID(output) } +// removeBucketAndRecreateDatabase will flush all data from the backing bucket associated with this database. Note, this will take down +// other databases backed by the same bucket. It will recreate an empty database from the existing configuration. +func (sc *ServerContext) removeBucketAndRecreateDatabase(ctx context.Context, dbName string, deleteFunc func(base.BucketSpec) error) error { + sc.lock.Lock() + defer sc.lock.Unlock() + config, ok := sc.dbConfigs[dbName] + if !ok { + return fmt.Errorf("no config found for database %q", dbName) + } + for otherDBName, dbCtx := range sc.databases_ { + if dbCtx.Bucket.GetName() == *config.Bucket { + // If async init is running for the database, cancel it for an external remove. (cannot be + // done in _removeDatabase, as this is called during reload) + if sc.DatabaseInitManager != nil && sc.DatabaseInitManager.HasActiveInitialization(otherDBName) { + sc.DatabaseInitManager.Cancel(otherDBName, fmt.Sprintf("flush db %s with same backing bucket", dbName)) + } + if !sc._removeDatabase(ctx, otherDBName) { + return base.RedactErrorf("could not remove database %s as part of flushing %s. Bucket %s is left in an unstable state", base.UD(otherDBName), base.UD(dbName), base.UD(*config.Bucket)) + } + } + } + + spec, err := GetBucketSpec(ctx, &config.DatabaseConfig, sc.Config) + if err != nil { + return err + } + + err = deleteFunc(spec) + if err != nil { + return err + } + + if sc.persistentConfig { + ctx := context.WithoutCancel(ctx) + cas, err := sc.BootstrapContext.InsertConfig(ctx, spec.BucketName, sc.Config.Bootstrap.ConfigGroupID, &config.DatabaseConfig) + if err != nil { + base.WarnfCtx(ctx, "Could not re-insert database config after flush and re-adding bucket: %v", err) + return err + } + _, err = sc._fetchAndLoadDatabase(base.NewNonCancelCtxForDatabase(ctx), dbName, true) + if err != nil { + base.WarnfCtx(ctx, "Could not reload database after flush and re-adding bucket: %v", err) + return err + } + // store cas in db config after update + sc.dbConfigs[dbName].cfgCas = cas + } else { + // Re-open database and add to Sync Gateway + _, err := sc._getOrAddDatabaseFromConfig(ctx, config.DatabaseConfig, + getOrAddDatabaseConfigOptions{ + useExisting: false, + failFast: false, + }) + if err != nil { + return err + } + } + return nil +} + func (sc *ServerContext) getBucketCCVSettings() map[string]bool { bucketCCVSettings := make(map[string]bool) for _, _db := range sc.databases_ {