Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions base/bucket_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const PrimaryIndexName = "#primary"

// IndexOptions used to build the 'with' clause
type N1qlIndexOptions struct {
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
NumPartitions *uint32 `json:"num_partition"` // The number of partitions to use for the index. 1 will be a non-partitioned index.
}
13 changes: 13 additions & 0 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,19 @@ func (b *GocbV2Bucket) QueryEpsCount() (int, error) {
return len(agent.N1qlEps()), nil
}

// GSIEps returns the GSI endpoints for the bucket for querying information about indexes.
func (b *GocbV2Bucket) GSIEps() (url []string, err error) {
agent, err := b.getGoCBAgent()
if err != nil {
return url, err
}
gsiEps := agent.GSIEps()
if len(gsiEps) == 0 {
return nil, fmt.Errorf("No available Couchbase Server nodes")
}
return gsiEps, nil
}

// Gets the metadata purge interval for the bucket. First checks for a bucket-specific value. If not
// found, retrieves the cluster-wide value.
func (b *GocbV2Bucket) MetadataPurgeInterval(ctx context.Context) (time.Duration, error) {
Expand Down
7 changes: 6 additions & 1 deletion base/collection_n1ql_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,16 @@ func createIndex(ctx context.Context, store N1QLStore, indexName string, express
if filterExpression != "" {
filterExpressionStr = " WHERE " + filterExpression
}
var partitionExpresionStr string
if options.NumPartitions != nil && *options.NumPartitions > 1 {
partitionExpresionStr = " PARTITION BY HASH(META().id)"
}

createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, filterExpressionStr)
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s %s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, partitionExpresionStr, filterExpressionStr)

// Replace any KeyspaceQueryToken references in the index expression
createStatement = strings.ReplaceAll(createStatement, KeyspaceQueryToken, store.EscapedKeyspace())

createErr := createIndexFromStatement(ctx, store, indexName, createStatement, options)
if IsIndexAlreadyExistsError(createErr) || IsCreateDuplicateIndexError(createErr) {
// Pre-7.1 compatibility: Swallow this error like Server does when specifying `IF NOT EXISTS`
Expand Down
1 change: 1 addition & 0 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error {
NumReplicas: db.Options.NumIndexReplicas,
MetadataIndexes: IndexesPrincipalOnly,
UseXattrs: db.UseXattrs(),
NumPartitions: db.numIndexPartitions(),
}

return InitializeIndexes(ctx, n1qlStore, options)
Expand Down
9 changes: 9 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type DatabaseContextOptions struct {
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
NumIndexReplicas uint // Number of replicas for GSI indexes
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
}

Expand Down Expand Up @@ -1915,6 +1916,14 @@ func (context *DatabaseContext) UseXattrs() bool {
return context.Options.EnableXattr
}

// numIndexPartitions returns the number of index partitions to use for the database's indexes.
func (context *DatabaseContext) numIndexPartitions() uint32 {
if context.Options.NumIndexPartitions != nil {
return *context.Options.NumIndexPartitions
}
return DefaultNumIndexPartitions
}

func (context *DatabaseContext) UseViews() bool {
return context.Options.UseViews
}
Expand Down
5 changes: 5 additions & 0 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ func (c *DatabaseCollection) UseXattrs() bool {
return c.dbCtx.Options.EnableXattr
}

// numIndexPartitions returns the number of partitions for the collection's indexes. This is controlled at a database level.
func (c *DatabaseCollection) numIndexPartitions() uint32 {
return c.dbCtx.numIndexPartitions()
}

// User will return the user object.
func (c *DatabaseCollectionWithUser) User() auth.User {
return c.user
Expand Down
54 changes: 37 additions & 17 deletions db/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
SyncDocWildcard = `\\_sync:%`
SyncUserWildcard = `\\_sync:user:%`
SyncRoleWildcard = `\\_sync:role:%`

DefaultNumIndexPartitions uint32 = 1
)

// Index and query definitions use syncToken ($sync) to represent the location of sync gateway's metadata.
Expand Down Expand Up @@ -175,6 +177,10 @@ var (
"ORDER BY [op.name, LEAST($sync.`sequence`, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)] " +
"LIMIT 1",
}
partionableIndexes = map[SGIndexType]bool{
IndexAllDocs: true,
IndexChannels: true,
}
)

var sgIndexes map[SGIndexType]SGIndex
Expand All @@ -191,6 +197,7 @@ func init() {
filterExpression: indexFilterExpressions[i],
flags: indexFlags[i],
creationMode: indexCreationModes[i],
partionable: partionableIndexes[i],
}
// If a readiness query is specified for this index, mark the index as required and add to SGIndex
readinessQuery, ok := readinessQueries[i]
Expand All @@ -214,18 +221,23 @@ type SGIndex struct {
readinessQuery string // Query used to determine view readiness
flags SGIndexFlags // Additional index options
creationMode indexCreationMode // Signal when to create indexes
partionable bool // Whether the index is partitionable
}

func (i *SGIndex) fullIndexName(useXattrs bool) string {
return i.indexNameForVersion(i.version, useXattrs)
func (i *SGIndex) fullIndexName(useXattrs bool, numPartitions uint32) string {
return i.indexNameForVersion(i.version, useXattrs, numPartitions)
}

func (i *SGIndex) indexNameForVersion(version int, useXattrs bool) string {
func (i *SGIndex) indexNameForVersion(version int, useXattrs bool, numPartitions uint32) string {
xattrsToken := ""
if useXattrs {
xattrsToken = "x"
}
return fmt.Sprintf(indexNameFormat, i.simpleName, xattrsToken, version)
indexName := fmt.Sprintf(indexNameFormat, i.simpleName, xattrsToken, version)
if i.partionable && numPartitions > 1 {
indexName = fmt.Sprintf("%s_p%d", indexName, numPartitions)
}
return indexName
}

// Tombstone indexing is required for indexes that need to index the _sync xattrs even when the document
Expand Down Expand Up @@ -281,8 +293,10 @@ func (i *SGIndex) shouldCreate(options InitializeIndexOptions) bool {
// Creates index associated with specified SGIndex if not already present. Always defers build - a subsequent BUILD INDEX
// will need to be invoked for any created indexes.
func (i *SGIndex) createIfNeeded(ctx context.Context, bucket base.N1QLStore, options InitializeIndexOptions) error {

indexName := i.fullIndexName(options.UseXattrs)
if options.NumPartitions < 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere if options.NumPartitions<=1, it's treated as "do not use partitions" (i.e. the check seems to consistently be if options.NumPartitions > 1). This might be the right approach, just want to make sure we're not triggering a fatal error when options.NumPartitions isn't initialized (is zero), if it's not actually a fatal condition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the validation for this, once you get to this part of the code we do expect >= 1 to make it simpler to not have to convert everytime here and do a conversion from uninitialized to set to one. I thought this was conceptually easier because it forces the caller to set this value, so the caller decides when to actually make the indexes partitionable.

This was easier for me in testing to make sure I was passing this value through the code but I'm not committed to this approach.

return fmt.Errorf("Invalid number of partitions specified for index %s: %d, needs to be greater than 0", i.simpleName, options.NumPartitions)
}
indexName := i.fullIndexName(options.UseXattrs, options.NumPartitions)

// Create index
base.InfofCtx(ctx, base.KeyQuery, "Creating index %s if it doesn't already exist...", indexName)
Expand All @@ -294,6 +308,9 @@ func (i *SGIndex) createIfNeeded(ctx context.Context, bucket base.N1QLStore, opt
NumReplica: options.NumReplicas,
IndexTombstones: i.shouldIndexTombstones(options.UseXattrs),
}
if i.partionable && options.NumPartitions > 1 {
n1qlOptions.NumPartitions = &options.NumPartitions
}

// Initial retry 1 seconds, max wait 30s, waits up to 10m
sleeper := base.CreateMaxDoublingSleeperFunc(20, 1000, 30000)
Expand Down Expand Up @@ -344,11 +361,14 @@ type InitializeIndexOptions struct {
MetadataIndexes CollectionIndexesType // indicate which indexes to create
Serverless bool // if true, create indexes for serverless
UseXattrs bool // if true, create indexes on xattrs, otherwise, use inline sync data
NumPartitions uint32 // number of partitions to use for the index
}

// Initializes Sync Gateway indexes for datastore. Creates required indexes if not found, then waits for index readiness.
func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options InitializeIndexOptions) error {

if options.NumPartitions < 1 {
return fmt.Errorf("Invalid number of partitions specified: %d, needs to be greater than 0", options.NumPartitions)
}
base.InfofCtx(ctx, base.KeyAll, "Initializing indexes with numReplicas: %d...", options.NumReplicas)

// Create any indexes that aren't present
Expand All @@ -360,7 +380,7 @@ func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options In
continue
}

fullIndexName := sgIndex.fullIndexName(options.UseXattrs)
fullIndexName := sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions)

err := sgIndex.createIfNeeded(ctx, n1QLStore, options)
if err != nil {
Expand All @@ -371,7 +391,6 @@ func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options In

fullIndexNames = append(fullIndexNames, fullIndexName)
}

// Issue BUILD INDEX for any deferred indexes.
if len(fullIndexNames) > 0 {
buildErr := base.BuildDeferredIndexes(ctx, n1QLStore, fullIndexNames)
Expand All @@ -391,7 +410,7 @@ func waitForIndexes(ctx context.Context, bucket base.N1QLStore, options Initiali
var indexes []string

for _, sgIndex := range sgIndexes {
fullIndexName := sgIndex.fullIndexName(options.UseXattrs)
fullIndexName := sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions)
if !sgIndex.shouldCreate(options) {
continue
}
Expand Down Expand Up @@ -421,17 +440,18 @@ func removeObsoleteIndexes(ctx context.Context, bucket base.N1QLStore, previewOn

// Build set of candidates for cleanup
removalCandidates := make([]string, 0)
numPartitions := DefaultNumIndexPartitions // obsolete indexes is always partitions 1, greater than 1 needs to be removed manually
for _, sgIndex := range indexMap {
// Current version, opposite xattr setting
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(!useXattrs))
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(!useXattrs, numPartitions))
// If using views we can remove current version for xattr setting too
if useViews {
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(useXattrs))
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(useXattrs, numPartitions))
}
// Older versions, both xattr and non-xattr
for _, prevVersion := range sgIndex.previousVersions {
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, true))
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, false))
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, true, numPartitions))
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, false, numPartitions))
}
}

Expand Down Expand Up @@ -497,8 +517,8 @@ func replaceSyncTokensQuery(statement string, useXattrs bool) string {
}

// Replace index tokens ($idx) in the provided createIndex statement with the appropriate token, depending on whether xattrs should be used.
func replaceIndexTokensQuery(statement string, idx SGIndex, useXattrs bool) string {
return strings.Replace(statement, indexToken, idx.fullIndexName(useXattrs), -1)
func replaceIndexTokensQuery(statement string, idx SGIndex, useXattrs bool, numPartitions uint32) string {
return strings.Replace(statement, indexToken, idx.fullIndexName(useXattrs, numPartitions), -1)
}

func copySGIndexes(inputMap map[SGIndexType]SGIndex) map[SGIndexType]SGIndex {
Expand All @@ -521,7 +541,7 @@ func GetIndexesName(options InitializeIndexOptions) []string {
continue
}
if sgIndex.shouldCreate(options) {
indexesName = append(indexesName, sgIndex.fullIndexName(options.UseXattrs))
indexesName = append(indexesName, sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions))
}
}
return indexesName
Expand Down
37 changes: 21 additions & 16 deletions db/indexes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func TestPostUpgradeIndexesSimple(t *testing.T) {

// construct indexes as the test expects
options := InitializeIndexOptions{
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumPartitions: DefaultNumIndexPartitions,
}
if db.OnlyDefaultCollection() {
options.MetadataIndexes = IndexesAll
Expand All @@ -57,7 +58,7 @@ func TestPostUpgradeIndexesSimple(t *testing.T) {
var expectedRemovedIndexes []string
for _, sgIndex := range sgIndexes {
if sgIndex.shouldCreate(options) {
expectedRemovedIndexes = append(expectedRemovedIndexes, sgIndex.fullIndexName(db.UseXattrs()))
expectedRemovedIndexes = append(expectedRemovedIndexes, sgIndex.fullIndexName(db.UseXattrs(), DefaultNumIndexPartitions))
}
}
sort.Strings(expectedRemovedIndexes)
Expand Down Expand Up @@ -100,9 +101,10 @@ func TestPostUpgradeIndexesVersionChange(t *testing.T) {
defer func() {
// Restore indexes after test
options := InitializeIndexOptions{
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumPartitions: DefaultNumIndexPartitions,
}
err := InitializeIndexes(ctx, n1qlStore, options)
assert.NoError(t, err)
Expand Down Expand Up @@ -164,9 +166,10 @@ func TestPostUpgradeMultipleCollections(t *testing.T) {
}
useXattrs := false
options := InitializeIndexOptions{
NumReplicas: 0,
Serverless: false,
UseXattrs: useXattrs,
NumReplicas: 0,
Serverless: false,
UseXattrs: useXattrs,
NumPartitions: DefaultNumIndexPartitions,
}

for _, dataStore := range db.getDataStores() {
Expand Down Expand Up @@ -246,9 +249,10 @@ func TestRemoveIndexesUseViewsTrueAndFalse(t *testing.T) {
assert.NoError(t, err)

options := InitializeIndexOptions{
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumPartitions: DefaultNumIndexPartitions,
}
if db.OnlyDefaultCollection() {
options.MetadataIndexes = IndexesAll
Expand Down Expand Up @@ -286,9 +290,10 @@ func TestRemoveObsoleteIndexOnError(t *testing.T) {
n1qlStore, ok := base.AsN1QLStore(dataStore)
assert.True(t, ok)
options := InitializeIndexOptions{
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumReplicas: 0,
Serverless: db.IsServerless(),
UseXattrs: db.UseXattrs(),
NumPartitions: DefaultNumIndexPartitions,
}
err := InitializeIndexes(ctx, n1qlStore, options)
assert.NoError(t, err)
Expand Down
Loading
Loading