Skip to content

Commit 0e9740a

Browse files
torcolvinbbrks
andauthored
CBG-4548 allow creation of partitioned indexes (#7428)
Co-authored-by: Ben Brooks <[email protected]>
1 parent 0afb808 commit 0e9740a

31 files changed

+495
-154
lines changed

base/bucket_n1ql.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ const PrimaryIndexName = "#primary"
2020

2121
// IndexOptions used to build the 'with' clause
2222
type N1qlIndexOptions struct {
23-
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
24-
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
25-
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
23+
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
24+
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
25+
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
26+
NumPartitions *uint32 `json:"num_partition,omitempty"` // The number of partitions to use for the index. 1 will be a non-partitioned index.
2627
}

base/collection.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
239239
// Available on all supported server versions
240240
return true
241241
case sgbucket.BucketStoreFeatureN1ql:
242-
agent, err := b.getGoCBAgent()
242+
agent, err := b.GetGoCBAgent()
243243
if err != nil {
244244
return false
245245
}
@@ -295,7 +295,7 @@ func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArgum
295295

296296
func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {
297297

298-
agent, agentErr := b.getGoCBAgent()
298+
agent, agentErr := b.GetGoCBAgent()
299299
if agentErr != nil {
300300
return nil, nil, agentErr
301301
}
@@ -353,7 +353,7 @@ func (b *GocbV2Bucket) GetMaxVbno() (uint16, error) {
353353
}
354354

355355
func (b *GocbV2Bucket) getConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
356-
agent, err := b.getGoCBAgent()
356+
agent, err := b.GetGoCBAgent()
357357
if err != nil {
358358
return nil, fmt.Errorf("no gocbcore.Agent: %w", err)
359359
}
@@ -448,7 +448,7 @@ func (b *GocbV2Bucket) BucketItemCount(ctx context.Context) (itemCount int, err
448448
}
449449

450450
func (b *GocbV2Bucket) MgmtEps() (url []string, err error) {
451-
agent, err := b.getGoCBAgent()
451+
agent, err := b.GetGoCBAgent()
452452
if err != nil {
453453
return url, err
454454
}
@@ -460,7 +460,7 @@ func (b *GocbV2Bucket) MgmtEps() (url []string, err error) {
460460
}
461461

462462
func (b *GocbV2Bucket) QueryEpsCount() (int, error) {
463-
agent, err := b.getGoCBAgent()
463+
agent, err := b.GetGoCBAgent()
464464
if err != nil {
465465
return 0, err
466466
}
@@ -479,7 +479,7 @@ func (b *GocbV2Bucket) MaxTTL(ctx context.Context) (int, error) {
479479
}
480480

481481
func (b *GocbV2Bucket) HttpClient(ctx context.Context) *http.Client {
482-
agent, err := b.getGoCBAgent()
482+
agent, err := b.GetGoCBAgent()
483483
if err != nil {
484484
WarnfCtx(ctx, "Unable to obtain gocbcore.Agent while retrieving httpClient:%v", err)
485485
return nil
@@ -527,7 +527,7 @@ func (b *GocbV2Bucket) releaseKvOp() {
527527
}
528528

529529
// GetGoCBAgent returns the underlying agent from gocbcore
530-
func (b *GocbV2Bucket) getGoCBAgent() (*gocbcore.Agent, error) {
530+
func (b *GocbV2Bucket) GetGoCBAgent() (*gocbcore.Agent, error) {
531531
return b.bucket.Internal().IORouter()
532532
}
533533

base/collection_gocb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (c *Collection) isRecoverableWriteError(err error) bool {
415415
// current use cases (on-demand import). If there's a need for expiry as part of normal get, this shouldn't be
416416
// used - an enhanced version of Get() should be implemented to avoid two ops
417417
func (c *Collection) GetExpiry(ctx context.Context, k string) (expiry uint32, getMetaError error) {
418-
agent, err := c.Bucket.getGoCBAgent()
418+
agent, err := c.Bucket.GetGoCBAgent()
419419
if err != nil {
420420
WarnfCtx(ctx, "Unable to obtain gocbcore.Agent while retrieving expiry:%v", err)
421421
return 0, err
@@ -569,7 +569,7 @@ func (c *Collection) setCollectionID() error {
569569
c.kvCollectionID = DefaultCollectionID
570570
return nil
571571
}
572-
agent, err := c.Bucket.getGoCBAgent()
572+
agent, err := c.Bucket.GetGoCBAgent()
573573
if err != nil {
574574
return err
575575
}

base/collection_n1ql_common.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,16 @@ func createIndex(ctx context.Context, store N1QLStore, indexName string, express
161161
if filterExpression != "" {
162162
filterExpressionStr = " WHERE " + filterExpression
163163
}
164+
var partitionExpresionStr string
165+
if options.NumPartitions != nil && *options.NumPartitions > 1 {
166+
partitionExpresionStr = " PARTITION BY HASH(META().id)"
167+
}
164168

165-
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, filterExpressionStr)
169+
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s %s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, partitionExpresionStr, filterExpressionStr)
166170

167171
// Replace any KeyspaceQueryToken references in the index expression
168172
createStatement = strings.ReplaceAll(createStatement, KeyspaceQueryToken, store.EscapedKeyspace())
173+
169174
createErr := createIndexFromStatement(ctx, store, indexName, createStatement, options)
170175
if IsIndexAlreadyExistsError(createErr) || IsCreateDuplicateIndexError(createErr) {
171176
// Pre-7.1 compatibility: Swallow this error like Server does when specifying `IF NOT EXISTS`

base/collection_view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ type NoNameDesignDocument struct {
161161

162162
func (b *GocbV2Bucket) putDDocForTombstones(ctx context.Context, ddoc *gocb.DesignDocument) error {
163163
username, password, _ := b.Spec.Auth.GetCredentials()
164-
agent, err := b.getGoCBAgent()
164+
agent, err := b.GetGoCBAgent()
165165
if err != nil {
166166
return fmt.Errorf("Unable to get handle for bucket router: %v", err)
167167
}

db/background_mgr_resync_dcp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ func initializePrincipalDocsIndex(ctx context.Context, db *Database) error {
389389
NumReplicas: db.Options.NumIndexReplicas,
390390
MetadataIndexes: IndexesPrincipalOnly,
391391
UseXattrs: db.UseXattrs(),
392+
NumPartitions: db.numIndexPartitions(),
392393
}
393394

394395
return InitializeIndexes(ctx, n1qlStore, options)

db/database.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ type DatabaseContextOptions struct {
185185
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
186186
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
187187
NumIndexReplicas uint // Number of replicas for GSI indexes
188+
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
188189
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
189190
}
190191

@@ -1915,6 +1916,14 @@ func (context *DatabaseContext) UseXattrs() bool {
19151916
return context.Options.EnableXattr
19161917
}
19171918

1919+
// numIndexPartitions returns the number of index partitions to use for the database's indexes.
1920+
func (context *DatabaseContext) numIndexPartitions() uint32 {
1921+
if context.Options.NumIndexPartitions != nil {
1922+
return *context.Options.NumIndexPartitions
1923+
}
1924+
return DefaultNumIndexPartitions
1925+
}
1926+
19181927
func (context *DatabaseContext) UseViews() bool {
19191928
return context.Options.UseViews
19201929
}

db/database_collection.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ func (c *DatabaseCollection) UseXattrs() bool {
265265
return c.dbCtx.Options.EnableXattr
266266
}
267267

268+
// numIndexPartitions returns the number of partitions for the collection's indexes. This is controlled at a database level.
269+
func (c *DatabaseCollection) numIndexPartitions() uint32 {
270+
return c.dbCtx.numIndexPartitions()
271+
}
272+
268273
// User will return the user object.
269274
func (c *DatabaseCollectionWithUser) User() auth.User {
270275
return c.user

db/indexes.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ import (
2121
)
2222

2323
const (
24-
indexNameFormat = "sg_%s_%s%d" // Name, xattrs, version. e.g. "sg_channels_x1"
25-
syncRelativeToken = "$relativesync" // Relative sync token (no keyspace), used to swap between xattr/non-xattr handling in n1ql statements
26-
syncToken = "$sync" // Sync token, used to swap between xattr/non-xattr handling in n1ql statements
27-
indexToken = "$idx" // Index token, used to hint which index should be used for the query
24+
indexNameFormat = "sg_%s_%s%d" // Name, xattrs, version. e.g. "sg_channels_x1"
25+
partitionableIndexNameFormat = "%s_p%d" // indexName, numPartitions
26+
syncRelativeToken = "$relativesync" // Relative sync token (no keyspace), used to swap between xattr/non-xattr handling in n1ql statements
27+
syncToken = "$sync" // Sync token, used to swap between xattr/non-xattr handling in n1ql statements
28+
indexToken = "$idx" // Index token, used to hint which index should be used for the query
2829

2930
// N1ql-encoded wildcard expression matching the '_sync:' prefix used for all sync gateway's system documents.
3031
// Need to escape the underscore in '_sync' to prevent it being treated as a N1QL wildcard
3132
SyncDocWildcard = `\\_sync:%`
3233
SyncUserWildcard = `\\_sync:user:%`
3334
SyncRoleWildcard = `\\_sync:role:%`
35+
36+
DefaultNumIndexPartitions uint32 = 1
3437
)
3538

3639
// Index and query definitions use syncToken ($sync) to represent the location of sync gateway's metadata.
@@ -175,6 +178,10 @@ var (
175178
"ORDER BY [op.name, LEAST($sync.`sequence`, op.val.seq),IFMISSING(op.val.rev,null),IFMISSING(op.val.del,null)] " +
176179
"LIMIT 1",
177180
}
181+
partitionableIndexes = map[SGIndexType]bool{
182+
IndexAllDocs: true,
183+
IndexChannels: true,
184+
}
178185
)
179186

180187
var sgIndexes map[SGIndexType]SGIndex
@@ -191,6 +198,7 @@ func init() {
191198
filterExpression: indexFilterExpressions[i],
192199
flags: indexFlags[i],
193200
creationMode: indexCreationModes[i],
201+
partitionable: partitionableIndexes[i],
194202
}
195203
// If a readiness query is specified for this index, mark the index as required and add to SGIndex
196204
readinessQuery, ok := readinessQueries[i]
@@ -214,18 +222,23 @@ type SGIndex struct {
214222
readinessQuery string // Query used to determine view readiness
215223
flags SGIndexFlags // Additional index options
216224
creationMode indexCreationMode // Signal when to create indexes
225+
partitionable bool // Whether the index is partitionable
217226
}
218227

219-
func (i *SGIndex) fullIndexName(useXattrs bool) string {
220-
return i.indexNameForVersion(i.version, useXattrs)
228+
func (i *SGIndex) fullIndexName(useXattrs bool, numPartitions uint32) string {
229+
return i.indexNameForVersion(i.version, useXattrs, numPartitions)
221230
}
222231

223-
func (i *SGIndex) indexNameForVersion(version int, useXattrs bool) string {
232+
func (i *SGIndex) indexNameForVersion(version int, useXattrs bool, numPartitions uint32) string {
224233
xattrsToken := ""
225234
if useXattrs {
226235
xattrsToken = "x"
227236
}
228-
return fmt.Sprintf(indexNameFormat, i.simpleName, xattrsToken, version)
237+
indexName := fmt.Sprintf(indexNameFormat, i.simpleName, xattrsToken, version)
238+
if i.partitionable && numPartitions > 1 {
239+
indexName = fmt.Sprintf(partitionableIndexNameFormat, indexName, numPartitions)
240+
}
241+
return indexName
229242
}
230243

231244
// Tombstone indexing is required for indexes that need to index the _sync xattrs even when the document
@@ -281,8 +294,10 @@ func (i *SGIndex) shouldCreate(options InitializeIndexOptions) bool {
281294
// Creates index associated with specified SGIndex if not already present. Always defers build - a subsequent BUILD INDEX
282295
// will need to be invoked for any created indexes.
283296
func (i *SGIndex) createIfNeeded(ctx context.Context, bucket base.N1QLStore, options InitializeIndexOptions) error {
284-
285-
indexName := i.fullIndexName(options.UseXattrs)
297+
if options.NumPartitions < 1 {
298+
return fmt.Errorf("Invalid number of partitions specified for index %s: %d, needs to be greater than 0", i.simpleName, options.NumPartitions)
299+
}
300+
indexName := i.fullIndexName(options.UseXattrs, options.NumPartitions)
286301

287302
// Create index
288303
base.InfofCtx(ctx, base.KeyQuery, "Creating index %s if it doesn't already exist...", indexName)
@@ -294,6 +309,9 @@ func (i *SGIndex) createIfNeeded(ctx context.Context, bucket base.N1QLStore, opt
294309
NumReplica: options.NumReplicas,
295310
IndexTombstones: i.shouldIndexTombstones(options.UseXattrs),
296311
}
312+
if i.partitionable && options.NumPartitions > 1 {
313+
n1qlOptions.NumPartitions = &options.NumPartitions
314+
}
297315

298316
// Initial retry 1 seconds, max wait 30s, waits up to 10m
299317
sleeper := base.CreateMaxDoublingSleeperFunc(20, 1000, 30000)
@@ -344,11 +362,14 @@ type InitializeIndexOptions struct {
344362
MetadataIndexes CollectionIndexesType // indicate which indexes to create
345363
Serverless bool // if true, create indexes for serverless
346364
UseXattrs bool // if true, create indexes on xattrs, otherwise, use inline sync data
365+
NumPartitions uint32 // number of partitions to use for the index
347366
}
348367

349368
// Initializes Sync Gateway indexes for datastore. Creates required indexes if not found, then waits for index readiness.
350369
func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options InitializeIndexOptions) error {
351-
370+
if options.NumPartitions < 1 {
371+
return fmt.Errorf("Invalid number of partitions specified: %d, needs to be greater than 0", options.NumPartitions)
372+
}
352373
base.InfofCtx(ctx, base.KeyAll, "Initializing indexes with numReplicas: %d...", options.NumReplicas)
353374

354375
// Create any indexes that aren't present
@@ -360,7 +381,7 @@ func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options In
360381
continue
361382
}
362383

363-
fullIndexName := sgIndex.fullIndexName(options.UseXattrs)
384+
fullIndexName := sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions)
364385

365386
err := sgIndex.createIfNeeded(ctx, n1QLStore, options)
366387
if err != nil {
@@ -371,7 +392,6 @@ func InitializeIndexes(ctx context.Context, n1QLStore base.N1QLStore, options In
371392

372393
fullIndexNames = append(fullIndexNames, fullIndexName)
373394
}
374-
375395
// Issue BUILD INDEX for any deferred indexes.
376396
if len(fullIndexNames) > 0 {
377397
buildErr := base.BuildDeferredIndexes(ctx, n1QLStore, fullIndexNames)
@@ -391,7 +411,7 @@ func waitForIndexes(ctx context.Context, bucket base.N1QLStore, options Initiali
391411
var indexes []string
392412

393413
for _, sgIndex := range sgIndexes {
394-
fullIndexName := sgIndex.fullIndexName(options.UseXattrs)
414+
fullIndexName := sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions)
395415
if !sgIndex.shouldCreate(options) {
396416
continue
397417
}
@@ -421,17 +441,18 @@ func removeObsoleteIndexes(ctx context.Context, bucket base.N1QLStore, previewOn
421441

422442
// Build set of candidates for cleanup
423443
removalCandidates := make([]string, 0)
444+
numPartitions := DefaultNumIndexPartitions // obsolete indexes is always partitions 1, greater than 1 needs to be removed manually
424445
for _, sgIndex := range indexMap {
425446
// Current version, opposite xattr setting
426-
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(!useXattrs))
447+
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(!useXattrs, numPartitions))
427448
// If using views we can remove current version for xattr setting too
428449
if useViews {
429-
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(useXattrs))
450+
removalCandidates = append(removalCandidates, sgIndex.fullIndexName(useXattrs, numPartitions))
430451
}
431452
// Older versions, both xattr and non-xattr
432453
for _, prevVersion := range sgIndex.previousVersions {
433-
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, true))
434-
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, false))
454+
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, true, numPartitions))
455+
removalCandidates = append(removalCandidates, sgIndex.indexNameForVersion(prevVersion, false, numPartitions))
435456
}
436457
}
437458

@@ -497,8 +518,8 @@ func replaceSyncTokensQuery(statement string, useXattrs bool) string {
497518
}
498519

499520
// Replace index tokens ($idx) in the provided createIndex statement with the appropriate token, depending on whether xattrs should be used.
500-
func replaceIndexTokensQuery(statement string, idx SGIndex, useXattrs bool) string {
501-
return strings.Replace(statement, indexToken, idx.fullIndexName(useXattrs), -1)
521+
func replaceIndexTokensQuery(statement string, idx SGIndex, useXattrs bool, numPartitions uint32) string {
522+
return strings.Replace(statement, indexToken, idx.fullIndexName(useXattrs, numPartitions), -1)
502523
}
503524

504525
func copySGIndexes(inputMap map[SGIndexType]SGIndex) map[SGIndexType]SGIndex {
@@ -521,7 +542,7 @@ func GetIndexesName(options InitializeIndexOptions) []string {
521542
continue
522543
}
523544
if sgIndex.shouldCreate(options) {
524-
indexesName = append(indexesName, sgIndex.fullIndexName(options.UseXattrs))
545+
indexesName = append(indexesName, sgIndex.fullIndexName(options.UseXattrs, options.NumPartitions))
525546
}
526547
}
527548
return indexesName

0 commit comments

Comments
 (0)