Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func DropAllIndexes(ctx context.Context, n1QLStore N1QLStore) error {
// DROP INDEX is asynchronous, but generally quick. Wait for all indexes to disappear as part of the test harness.
err, _ = RetryLoop(ctx, "Waiting for no indexes on the bucket", func() (shouldRetry bool, err error, _ any) {
// Retrieve all indexes on the bucket/collection
indexes, err := n1QLStore.GetIndexes()
indexes, err = n1QLStore.GetIndexes()
if err != nil {
return false, err, nil
}
Expand All @@ -382,7 +382,10 @@ func DropAllIndexes(ctx context.Context, n1QLStore N1QLStore) error {
}
return true, nil, nil
}, CreateSleeperFunc(500, 100))
return err
if err != nil {
return fmt.Errorf("%w: indexes still exist, but all are expected to be gone: %v", err, indexes)
}
return nil
}

// Generates a string of size int
Expand Down
3 changes: 2 additions & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,

dbContext.ResyncManager = NewResyncManagerDCP(metadataStore, dbContext.UseXattrs(), metaKeys)

dbContext.AsyncIndexInitManager = NewAsyncIndexInitManager(metadataStore, metaKeys)

return dbContext, nil
}

Expand Down Expand Up @@ -2455,7 +2457,6 @@ func (db *DatabaseContext) StartOnlineProcesses(ctx context.Context) (returnedEr

db.TombstoneCompactionManager = NewTombstoneCompactionManager()
db.AttachmentCompactionManager = NewAttachmentCompactionManager(db.MetadataStore, db.MetadataKeys)
db.AsyncIndexInitManager = NewAsyncIndexInitManager(db.MetadataStore, db.MetadataKeys)

db.startReplications(ctx)

Expand Down
23 changes: 20 additions & 3 deletions rest/database_init_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"sync"
"testing"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
Expand All @@ -33,6 +34,9 @@ type DatabaseInitManager struct {
workers map[string]*DatabaseInitWorker
workersLock sync.Mutex

// initializeIndexesFunc is defined for testability only.
initializeIndexesFunc InitializeIndexesFunc

// testCollectionStatusUpdateCallback is defined for testability only.
// Invoked after collection initialization is complete for each collection
testCollectionStatusUpdateCallback CollectionCallbackFunc
Expand All @@ -45,6 +49,8 @@ type DatabaseInitManager struct {
// CollectionCallbackFunc is called when the initialization has completed for each collection on the database.
type CollectionCallbackFunc func(dbName string, scName base.ScopeAndCollectionName, status db.CollectionIndexStatus)

type InitializeIndexesFunc func(context.Context, base.N1QLStore, db.InitializeIndexOptions) error

// CollectionInitData defines the set of collections being created (by ScopeAneCollectionName), and the set of
// indexes required for each collection.
type CollectionInitData map[base.ScopeAndCollectionName]db.CollectionIndexesType
Expand Down Expand Up @@ -117,8 +123,12 @@ func (m *DatabaseInitManager) InitializeDatabaseWithStatusCallback(ctx context.C
statusCallback = m.testCollectionStatusUpdateCallback
}

initializeIndexesFunc := db.InitializeIndexes
if m.initializeIndexesFunc != nil {
initializeIndexesFunc = m.initializeIndexesFunc
}
// Create new worker and add this caller as a watcher
worker := NewDatabaseInitWorker(context.WithoutCancel(ctx), dbConfig.Name, n1qlStore, collectionSet, indexOptions, statusCallback)
worker := NewDatabaseInitWorker(context.WithoutCancel(ctx), dbConfig.Name, n1qlStore, collectionSet, indexOptions, statusCallback, initializeIndexesFunc)
m.workers[dbConfig.Name] = worker
doneChan = worker.addWatcher()

Expand Down Expand Up @@ -171,6 +181,10 @@ func (m *DatabaseInitManager) SetTestCallbacks(collectionCallback CollectionCall
m.testDatabaseCompleteCallback = databaseComplete
}

func (m *DatabaseInitManager) SetInitializeIndexesFunc(_ testing.TB, initializeIndexesFunc InitializeIndexesFunc) {
m.initializeIndexesFunc = initializeIndexesFunc
}

func (m *DatabaseInitManager) Cancel(dbName string, reason string) {
m.workersLock.Lock()
defer m.workersLock.Unlock()
Expand Down Expand Up @@ -225,14 +239,16 @@ type DatabaseInitWorker struct {
watcherLock sync.Mutex // Mutex for synchronized watchers access
completed bool // Set to true when processing completes, to handle watcher registration during completion. Synchronized with watcherLock.
lastError error // Set for when processing does not complete successfully. Synchronized with watcherLock

initializeIndexesFunc InitializeIndexesFunc // function to create indexes, for testability
}

// DatabaseInitOptions specifies the options used for database initialization
type DatabaseInitOptions struct {
indexOptions db.InitializeIndexOptions // Options used for index initialization
}

func NewDatabaseInitWorker(ctx context.Context, dbName string, n1qlStore *base.ClusterOnlyN1QLStore, collections CollectionInitData, indexOptions db.InitializeIndexOptions, callback CollectionCallbackFunc) *DatabaseInitWorker {
func NewDatabaseInitWorker(ctx context.Context, dbName string, n1qlStore *base.ClusterOnlyN1QLStore, collections CollectionInitData, indexOptions db.InitializeIndexOptions, callback CollectionCallbackFunc, initializeIndexesFunc InitializeIndexesFunc) *DatabaseInitWorker {
cancelCtx, cancelFunc := context.WithCancelCause(ctx)
return &DatabaseInitWorker{
dbName: dbName,
Expand All @@ -242,6 +258,7 @@ func NewDatabaseInitWorker(ctx context.Context, dbName string, n1qlStore *base.C
collections: collections,
n1qlStore: n1qlStore,
collectionStatusCallback: callback,
initializeIndexesFunc: initializeIndexesFunc,
}
}

Expand Down Expand Up @@ -272,7 +289,7 @@ func (w *DatabaseInitWorker) Run() {
// Set the scope and collection name on the cluster n1ql store for use by initializeIndexes
w.n1qlStore.SetScopeAndCollection(scName)
keyspaceCtx := base.KeyspaceLogCtx(w.ctx, w.n1qlStore.BucketName(), scName.ScopeName(), scName.CollectionName())
indexErr = db.InitializeIndexes(keyspaceCtx, w.n1qlStore, collectionIndexOptions)
indexErr = w.initializeIndexesFunc(keyspaceCtx, w.n1qlStore, collectionIndexOptions)
if w.collectionStatusCallback != nil {
if indexErr != nil {
w.collectionStatusCallback(w.dbName, scName, db.CollectionIndexStatusError)
Expand Down
62 changes: 26 additions & 36 deletions rest/indextest/index_init_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package indextest

import (
"context"
"fmt"
"net/http"
"regexp"
Expand Down Expand Up @@ -273,38 +274,28 @@ func TestChangeIndexPartitionsDbOffline(t *testing.T) {
if base.UnitTestUrlIsWalrus() || base.TestsDisableGSI() {
t.Skip("This test only works against Couchbase Server with GSI enabled")
}

// requires index init

rt := rest.NewRestTester(t, nil)
rt := rest.NewRestTesterPersistentConfig(t)
defer rt.Close()

rt.TakeDbOffline()

rt.ServerContext().DatabaseInitManager.SetInitializeIndexesFunc(t, getNoopInitializeIndexes())

resp := rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_index_init", `{"num_partitions":2}`)
rest.RequireStatus(t, resp, http.StatusOK)

// wait for completion
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp := rt.SendAdminRequest(http.MethodGet, "/{{.db}}/_index_init", "")
rest.AssertStatus(t, resp, http.StatusOK)
var body db.AsyncIndexInitManagerResponse
err := base.JSONUnmarshal(resp.BodyBytes(), &body)
require.NoError(c, err)
require.Equal(c, db.BackgroundProcessStateCompleted, body.State)
}, 1*time.Minute, 1*time.Second)
db.RequireBackgroundManagerState(t, rt.GetDatabase().AsyncIndexInitManager, db.BackgroundProcessStateCompleted)
}

func TestChangeIndexPartitionsStartStopAndRestart(t *testing.T) {
if base.UnitTestUrlIsWalrus() || base.TestsDisableGSI() {
t.Skip("This test only works against Couchbase Server with GSI enabled")
}

// requires index init

rt := rest.NewRestTester(t, nil)
rt := rest.NewRestTesterPersistentConfig(t)
defer rt.Close()

rt.ServerContext().DatabaseInitManager.SetInitializeIndexesFunc(t, getNoopBlockingInitializeIndexes())
resp := rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_index_init", `{"num_partitions":2}`)
rest.RequireStatus(t, resp, http.StatusOK)

Expand All @@ -321,22 +312,13 @@ func TestChangeIndexPartitionsStartStopAndRestart(t *testing.T) {
assert.Equal(c, db.BackgroundProcessStateStopped, body.State, "body: %#+v", body)
require.Equal(c, "", body.LastErrorMessage, "expected no error when stopping, got: %s", body.LastErrorMessage)
}, 1*time.Minute, 1*time.Second)
rt.ServerContext().DatabaseInitManager.SetInitializeIndexesFunc(t, getNoopInitializeIndexes())

// restart with new params
resp = rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_index_init", `{"num_partitions":3}`)
rest.RequireStatus(t, resp, http.StatusOK)

// wait for completion
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp := rt.SendAdminRequest(http.MethodGet, "/{{.db}}/_index_init", "")
rest.AssertStatus(t, resp, http.StatusOK)
var body db.AsyncIndexInitManagerResponse
err := base.JSONUnmarshal(resp.BodyBytes(), &body)
require.NoError(c, err)
// immediately exit if the state turns to error
require.NotEqual(t, db.BackgroundProcessStateError, body.State, "body: %#+v", body)
assert.Equal(c, db.BackgroundProcessStateCompleted, body.State, "body: %#+v", body)
}, 1*time.Minute, 1*time.Second)
db.RequireBackgroundManagerState(t, rt.GetDatabase().AsyncIndexInitManager, db.BackgroundProcessStateCompleted)
}

func TestChangeIndexPartitionsWithViews(t *testing.T) {
Expand Down Expand Up @@ -441,13 +423,21 @@ func runIndexInit(rt *rest.RestTester, body string) {
resp := rt.SendAdminRequest(http.MethodPost, "/{{.db}}/_index_init", body)
rest.RequireStatus(t, resp, http.StatusOK)

// wait for completion
require.EventuallyWithT(t, func(c *assert.CollectT) {
resp := rt.SendAdminRequest(http.MethodGet, "/{{.db}}/_index_init", "")
rest.AssertStatus(t, resp, http.StatusOK)
var body db.AsyncIndexInitManagerResponse
err := base.JSONUnmarshal(resp.BodyBytes(), &body)
require.NoError(c, err)
require.Equal(c, db.BackgroundProcessStateCompleted, body.State)
}, 1*time.Minute, 500*time.Millisecond)
db.RequireBackgroundManagerState(t, rt.GetDatabase().AsyncIndexInitManager, db.BackgroundProcessStateCompleted)
}

// getNoopBlockingInitializeIndexes is a replacement initialize index function that blocks forever. It is cancelled when DatabaseInitManager.Cancel is called. Used to avoid churn on n1ql nodes.
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'n1ql' to 'N1QL'.

Copilot uses AI. Check for mistakes.
func getNoopBlockingInitializeIndexes() rest.InitializeIndexesFunc {
return func(ctx context.Context, _ base.N1QLStore, _ db.InitializeIndexOptions) error {
<-ctx.Done()
return nil
}
}

// getNoopInitializeIndexes is a replacement initialize index function that returns immediately. Used to avoid churn on
// on n1ql nodes.
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'n1ql' to 'N1QL'.

Suggested change
// on n1ql nodes.
// on N1QL nodes.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate word 'on' should be removed.

Suggested change
// on n1ql nodes.
// n1ql nodes.

Copilot uses AI. Check for mistakes.
func getNoopInitializeIndexes() rest.InitializeIndexesFunc {
return func(context.Context, base.N1QLStore, db.InitializeIndexOptions) error {
return nil
}
}
Loading