Skip to content

Commit 655b97e

Browse files
authored
Merge branch 'main' into CBG-4584
2 parents 614d974 + 55b5c52 commit 655b97e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+777
-344
lines changed

base/bucket_n1ql.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ const PrimaryIndexName = "#primary"
2020

2121
// IndexOptions used to build the 'with' clause
2222
type N1qlIndexOptions struct {
23-
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
24-
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
25-
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
23+
NumReplica uint `json:"num_replica,omitempty"` // Number of replicas
24+
IndexTombstones bool `json:"retain_deleted_xattr,omitempty"` // Whether system xattrs on tombstones should be indexed
25+
DeferBuild bool `json:"defer_build,omitempty"` // Whether to defer initial build of index (requires a subsequent BUILD INDEX invocation)
26+
NumPartitions *uint32 `json:"num_partition,omitempty"` // The number of partitions to use for the index. 1 will be a non-partitioned index.
2627
}

base/collection.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
239239
// Available on all supported server versions
240240
return true
241241
case sgbucket.BucketStoreFeatureN1ql:
242-
agent, err := b.getGoCBAgent()
242+
agent, err := b.GetGoCBAgent()
243243
if err != nil {
244244
return false
245245
}
@@ -295,7 +295,7 @@ func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArgum
295295

296296
func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {
297297

298-
agent, agentErr := b.getGoCBAgent()
298+
agent, agentErr := b.GetGoCBAgent()
299299
if agentErr != nil {
300300
return nil, nil, agentErr
301301
}
@@ -353,7 +353,7 @@ func (b *GocbV2Bucket) GetMaxVbno() (uint16, error) {
353353
}
354354

355355
func (b *GocbV2Bucket) getConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
356-
agent, err := b.getGoCBAgent()
356+
agent, err := b.GetGoCBAgent()
357357
if err != nil {
358358
return nil, fmt.Errorf("no gocbcore.Agent: %w", err)
359359
}
@@ -448,7 +448,7 @@ func (b *GocbV2Bucket) BucketItemCount(ctx context.Context) (itemCount int, err
448448
}
449449

450450
func (b *GocbV2Bucket) MgmtEps() (url []string, err error) {
451-
agent, err := b.getGoCBAgent()
451+
agent, err := b.GetGoCBAgent()
452452
if err != nil {
453453
return url, err
454454
}
@@ -460,7 +460,7 @@ func (b *GocbV2Bucket) MgmtEps() (url []string, err error) {
460460
}
461461

462462
func (b *GocbV2Bucket) QueryEpsCount() (int, error) {
463-
agent, err := b.getGoCBAgent()
463+
agent, err := b.GetGoCBAgent()
464464
if err != nil {
465465
return 0, err
466466
}
@@ -479,7 +479,7 @@ func (b *GocbV2Bucket) MaxTTL(ctx context.Context) (int, error) {
479479
}
480480

481481
func (b *GocbV2Bucket) HttpClient(ctx context.Context) *http.Client {
482-
agent, err := b.getGoCBAgent()
482+
agent, err := b.GetGoCBAgent()
483483
if err != nil {
484484
WarnfCtx(ctx, "Unable to obtain gocbcore.Agent while retrieving httpClient:%v", err)
485485
return nil
@@ -527,7 +527,7 @@ func (b *GocbV2Bucket) releaseKvOp() {
527527
}
528528

529529
// GetGoCBAgent returns the underlying agent from gocbcore
530-
func (b *GocbV2Bucket) getGoCBAgent() (*gocbcore.Agent, error) {
530+
func (b *GocbV2Bucket) GetGoCBAgent() (*gocbcore.Agent, error) {
531531
return b.bucket.Internal().IORouter()
532532
}
533533

base/collection_gocb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (c *Collection) isRecoverableWriteError(err error) bool {
415415
// current use cases (on-demand import). If there's a need for expiry as part of normal get, this shouldn't be
416416
// used - an enhanced version of Get() should be implemented to avoid two ops
417417
func (c *Collection) GetExpiry(ctx context.Context, k string) (expiry uint32, getMetaError error) {
418-
agent, err := c.Bucket.getGoCBAgent()
418+
agent, err := c.Bucket.GetGoCBAgent()
419419
if err != nil {
420420
WarnfCtx(ctx, "Unable to obtain gocbcore.Agent while retrieving expiry:%v", err)
421421
return 0, err
@@ -569,7 +569,7 @@ func (c *Collection) setCollectionID() error {
569569
c.kvCollectionID = DefaultCollectionID
570570
return nil
571571
}
572-
agent, err := c.Bucket.getGoCBAgent()
572+
agent, err := c.Bucket.GetGoCBAgent()
573573
if err != nil {
574574
return err
575575
}

base/collection_n1ql_common.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,16 @@ func createIndex(ctx context.Context, store N1QLStore, indexName string, express
161161
if filterExpression != "" {
162162
filterExpressionStr = " WHERE " + filterExpression
163163
}
164+
var partitionExpresionStr string
165+
if options.NumPartitions != nil && *options.NumPartitions > 1 {
166+
partitionExpresionStr = " PARTITION BY HASH(META().id)"
167+
}
164168

165-
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, filterExpressionStr)
169+
createStatement := fmt.Sprintf("CREATE INDEX `%s`%s ON %s(%s)%s %s", indexName, ifNotExistsStr, store.EscapedKeyspace(), expression, partitionExpresionStr, filterExpressionStr)
166170

167171
// Replace any KeyspaceQueryToken references in the index expression
168172
createStatement = strings.ReplaceAll(createStatement, KeyspaceQueryToken, store.EscapedKeyspace())
173+
169174
createErr := createIndexFromStatement(ctx, store, indexName, createStatement, options)
170175
if IsIndexAlreadyExistsError(createErr) || IsCreateDuplicateIndexError(createErr) {
171176
// Pre-7.1 compatibility: Swallow this error like Server does when specifying `IF NOT EXISTS`

base/collection_view.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ type NoNameDesignDocument struct {
161161

162162
func (b *GocbV2Bucket) putDDocForTombstones(ctx context.Context, ddoc *gocb.DesignDocument) error {
163163
username, password, _ := b.Spec.Auth.GetCredentials()
164-
agent, err := b.getGoCBAgent()
164+
agent, err := b.GetGoCBAgent()
165165
if err != nil {
166166
return fmt.Errorf("Unable to get handle for bucket router: %v", err)
167167
}

base/dcp_client_stream_observer.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) {
4848
cas: mutation.Cas,
4949
datatype: mutation.Datatype,
5050
collection: mutation.CollectionID,
51-
key: mutation.Key,
52-
value: mutation.Value,
51+
52+
// The byte slices must be copied to ensure that memory associated with the underlying memd mutationEvent and Packet are independent and can be released or reused by gocbcore as needed.
53+
key: EfficientBytesClone(mutation.Key),
54+
value: EfficientBytesClone(mutation.Value),
5355
}
5456
dc.workerForVbno(mutation.VbID).Send(dc.ctx, e)
5557
}
@@ -69,8 +71,10 @@ func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) {
6971
revNo: deletion.RevNo,
7072
datatype: deletion.Datatype,
7173
collection: deletion.CollectionID,
72-
key: deletion.Key,
73-
value: deletion.Value,
74+
75+
// The byte slices must be copied to ensure that memory associated with the underlying memd mutationEvent and Packet are independent and can be released or reused by gocbcore as needed.
76+
key: EfficientBytesClone(deletion.Key),
77+
value: EfficientBytesClone(deletion.Value),
7478
}
7579
dc.workerForVbno(deletion.VbID).Send(dc.ctx, e)
7680

base/dcp_common.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -522,15 +522,13 @@ func dcpKeyFilter(key []byte, metaKeys *MetadataKeys) bool {
522522
}
523523

524524
// Makes a feedEvent that can be passed to a FeedEventCallbackFunc implementation
525+
// The byte slices must be copied to ensure that memory associated with the memd mutationEvent and Packet are independent and can be released or reused by gocbcore as needed.
525526
func makeFeedEvent(key []byte, value []byte, dataType uint8, cas uint64, expiry uint32, vbNo uint16, collectionID uint32, opcode sgbucket.FeedOpcode) sgbucket.FeedEvent {
526527

527-
// not currently doing rq.Extras handling (as in gocouchbase/upr_feed, makeUprEvent) as SG doesn't use
528-
// expiry/flags information, and snapshot handling is done by cbdatasource and sent as
529-
// SnapshotStart, SnapshotEnd
530528
event := sgbucket.FeedEvent{
531529
Opcode: opcode,
532-
Key: key,
533-
Value: value,
530+
Key: EfficientBytesClone(key),
531+
Value: EfficientBytesClone(value),
534532
CollectionID: collectionID,
535533
DataType: dataType,
536534
Cas: cas,

base/dcp_common_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"testing"
1818

1919
"github.com/couchbase/cbgt"
20+
sgbucket "github.com/couchbase/sg-bucket"
2021
"github.com/stretchr/testify/assert"
2122
"github.com/stretchr/testify/require"
2223
)
@@ -83,3 +84,26 @@ func TestDCPNameLength(t *testing.T) {
8384
})
8485
}
8586
}
87+
88+
// TestFeedEventByteSliceCopy( ensures that the byte slices in the FeedEvent are copies and not the original ones - CBG-4540
89+
func TestFeedEventByteSliceCopy(t *testing.T) {
90+
const (
91+
keyData = "key"
92+
valueData = "value"
93+
)
94+
keySlice := []byte(keyData)
95+
valueSlice := []byte(valueData)
96+
e := makeFeedEvent(keySlice, valueSlice, 0, 0, 0, 0, 0, sgbucket.FeedOpMutation)
97+
require.Equal(t, keyData, string(e.Key))
98+
require.Equal(t, valueData, string(e.Value))
99+
require.Equal(t, keyData, string(keySlice))
100+
require.Equal(t, valueData, string(valueSlice))
101+
102+
// mutate the originals and ensure the FeedEvent byte slices didn't change with it
103+
keySlice[0] = 'x'
104+
valueSlice[0] = 'x'
105+
assert.Equal(t, keyData, string(e.Key))
106+
assert.Equal(t, valueData, string(e.Value))
107+
assert.NotEqual(t, keyData, string(keySlice))
108+
assert.NotEqual(t, valueData, string(valueSlice))
109+
}

base/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ var (
7878
// ErrSkippedSequencesMissing is returned when attempting to remove a sequence range form the skipped sequence list and at least one sequence in that range is not present
7979
ErrSkippedSequencesMissing = &sgError{"Sequence range has sequences that aren't present in skipped list"}
8080

81+
// ErrMaxSequenceReleasedExceeded is returned when the maximum number of sequences to be released as part of nextSequenceGreaterThan is exceeded
82+
ErrMaxSequenceReleasedExceeded = &sgError{"Maximum number of sequences to release to catch up with document sequence exceeded"}
83+
8184
// ErrInvalidJSON is returned when the JSON being unmarshalled cannot be parsed.
8285
ErrInvalidJSON = HTTPErrorf(http.StatusBadRequest, "Invalid JSON")
8386
)

base/stats.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,8 @@ type DatabaseStats struct {
645645
SequenceReleasedCount *SgwIntStat `json:"sequence_released_count"`
646646
// The total number of sequences reserved by Sync Gateway.
647647
SequenceReservedCount *SgwIntStat `json:"sequence_reserved_count"`
648+
// The total number of corrupt sequences above the MaxSequencesToRelease threshold seen at the sequence allocator
649+
CorruptSequenceCount *SgwIntStat `json:"corrupt_sequence_count"`
648650
// The total number of warnings relating to the channel name size.
649651
WarnChannelNameSizeCount *SgwIntStat `json:"warn_channel_name_size_count"`
650652
// The total number of warnings relating to the channel count exceeding the channel count threshold.
@@ -1754,6 +1756,10 @@ func (d *DbStats) initDatabaseStats() error {
17541756
if err != nil {
17551757
return err
17561758
}
1759+
resUtil.CorruptSequenceCount, err = NewIntStat(SubsystemDatabaseKey, "corrupt_sequence_count", StatUnitNoUnits, CorruptSequenceCountDesc, StatAddedVersion3dot2dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
1760+
if err != nil {
1761+
return err
1762+
}
17571763
resUtil.WarnChannelNameSizeCount, err = NewIntStat(SubsystemDatabaseKey, "warn_channel_name_size_count", StatUnitNoUnits, WarnChannelNameSizeCountDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
17581764
if err != nil {
17591765
return err
@@ -1839,6 +1845,7 @@ func (d *DbStats) unregisterDatabaseStats() {
18391845
prometheus.Unregister(d.DatabaseStats.SequenceIncrCount)
18401846
prometheus.Unregister(d.DatabaseStats.SequenceReleasedCount)
18411847
prometheus.Unregister(d.DatabaseStats.SequenceReservedCount)
1848+
prometheus.Unregister(d.DatabaseStats.CorruptSequenceCount)
18421849
prometheus.Unregister(d.DatabaseStats.WarnChannelNameSizeCount)
18431850
prometheus.Unregister(d.DatabaseStats.WarnChannelsPerDocCount)
18441851
prometheus.Unregister(d.DatabaseStats.WarnGrantsPerDocCount)

0 commit comments

Comments
 (0)