Skip to content

Commit 5bf2d12

Browse files
torcolvinbbrks
andauthored
CBG-4839 set Unknown+Source for import from before ECCV (#7776)
Co-authored-by: Ben Brooks <[email protected]>
1 parent 36ac1d0 commit 5bf2d12

File tree

8 files changed

+297
-49
lines changed

8 files changed

+297
-49
lines changed

base/bucket.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type CouchbaseBucketStore interface {
6161
HttpClient(context.Context) *http.Client
6262
GetSpec() BucketSpec
6363
GetMaxVbno() (uint16, error)
64+
GetCCVSettings(ctx context.Context) (ccvEnabled bool, maxCAS map[VBNo]uint64, err error)
6465

6566
// GetStatsVbSeqno retrieves the high sequence number for all vbuckets and returns
6667
// a map of UUIDS and a map of high sequence numbers (map from vbno -> seq)
@@ -576,3 +577,5 @@ func GetNewDatabaseSleeperFunc() RetrySleeper {
576577
5, // InitialRetrySleepTimeMS
577578
)
578579
}
580+
581+
var _ CouchbaseBucketStore = &GocbV2Bucket{}

base/bucket_gocb_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,3 +2651,29 @@ func TestVersionPruningWindow(t *testing.T) {
26512651
// it's assumed that anywhere this test is running has a default pruning window (30 days) - but this at least ensures we can retrieve it
26522652
assert.Equal(t, time.Hour*24*30, vpw)
26532653
}
2654+
2655+
func TestGetCCVStartingCAS(t *testing.T) {
2656+
if UnitTestUrlIsWalrus() {
2657+
t.Skip("This test only works against Couchbase Server")
2658+
}
2659+
2660+
ctx := TestCtx(t)
2661+
bucket := GetTestBucket(t)
2662+
defer bucket.Close(ctx)
2663+
2664+
cbStore, ok := AsCouchbaseBucketStore(bucket)
2665+
require.True(t, ok)
2666+
2667+
eccv, startingCAS, err := cbStore.GetCCVSettings(ctx)
2668+
require.NoError(t, err)
2669+
require.True(t, eccv)
2670+
2671+
numVBuckets, err := cbStore.GetMaxVbno()
2672+
require.NoError(t, err)
2673+
require.Len(t, startingCAS, int(numVBuckets))
2674+
for vbNo := range numVBuckets {
2675+
cas, ok := startingCAS[VBNo(vbNo)]
2676+
require.True(t, ok, "Expected starting CAS for vbucket %d", vbNo)
2677+
require.NotEqual(t, uint64(0), cas, "Expected non-zero starting CAS for vbucket %d", vbNo)
2678+
}
2679+
}

base/collection.go

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"io"
2121
"net/http"
22+
"strconv"
2223
"strings"
2324
"sync"
2425
"time"
@@ -175,12 +176,6 @@ func GetGocbV2BucketFromCluster(ctx context.Context, cluster *gocb.Cluster, spec
175176
}
176177
gocbv2Bucket.kvOps = make(chan struct{}, MaxConcurrentSingleOps*nodeCount*(*numPools))
177178

178-
// Query to see if mobile XDCR bucket setting is set and store on bucket object
179-
err = gocbv2Bucket.queryHLVBucketSetting(ctx)
180-
if err != nil {
181-
return nil, err
182-
}
183-
184179
return gocbv2Bucket, nil
185180
}
186181

@@ -265,29 +260,6 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
265260
}
266261
}
267262

268-
// queryHLVBucketSetting sends request to server to check for enableCrossClusterVersioning bucket setting
269-
func (b *GocbV2Bucket) queryHLVBucketSetting(ctx context.Context) error {
270-
url := fmt.Sprintf("/pools/default/buckets/%s", b.GetName())
271-
output, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, url, "application/x-www-form-urlencoded", nil)
272-
if err != nil || statusCode != http.StatusOK {
273-
return fmt.Errorf("error executing query for mobile XDCR bucket setting, status code: %d error: %v output: %s", statusCode, err, string(output))
274-
}
275-
276-
type bucket struct {
277-
SupportsHLV *bool `json:"enableCrossClusterVersioning,omitempty"`
278-
}
279-
var bucketSettings bucket
280-
err = JSONUnmarshal(output, &bucketSettings)
281-
if err != nil {
282-
return err
283-
}
284-
// In Server < 7.6.1 this field will not be present, but if it is not configured on the bucket, it will return false
285-
if bucketSettings.SupportsHLV != nil {
286-
b.supportsHLV = *bucketSettings.SupportsHLV
287-
}
288-
return nil
289-
}
290-
291263
func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
292264
groupID := ""
293265
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
@@ -352,6 +324,56 @@ func (b *GocbV2Bucket) GetMaxVbno() (uint16, error) {
352324
return uint16(vbNo), nil
353325
}
354326

327+
// GetCCVSettings returns the highest CAS value across all vBuckets for a bucket with CCV enabled.
328+
func (b *GocbV2Bucket) GetCCVSettings(ctx context.Context) (ccvEnabled bool, maxCAS map[VBNo]uint64, err error) {
329+
uri := "/pools/default/buckets/" + b.GetName()
330+
output, status, err := b.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
331+
if err != nil {
332+
return false, nil, RedactErrorf("unable to get CCV starting cas for bucket %q: %w", UD(b.GetName()), err)
333+
}
334+
if status != http.StatusOK {
335+
return false, nil, RedactErrorf("unable to get CCV starting cas for bucket %q, status %d", UD(b.GetName()), status)
336+
}
337+
338+
var response struct {
339+
EnableCrossClusterVersioning *bool `json:"enableCrossClusterVersioning"`
340+
VBucketsMaxCas []string `json:"vBucketsMaxCas"`
341+
}
342+
if err := JSONUnmarshal(output, &response); err != nil {
343+
return false, nil, RedactErrorf("unable to parse bucket info JSON for %q: %w", UD(b.GetName()), err)
344+
}
345+
346+
// In Server < 7.6.1 this field will not be present at all
347+
if response.EnableCrossClusterVersioning == nil {
348+
return false, nil, nil
349+
}
350+
// CCV supported but not enabled
351+
if !*response.EnableCrossClusterVersioning {
352+
InfofCtx(ctx, KeyAll, "Bucket %q does not have enableCrossClusterVersioning set", UD(b.GetName()))
353+
return false, nil, nil
354+
}
355+
356+
numVBuckets, err := b.GetMaxVbno()
357+
if err != nil {
358+
return false, nil, fmt.Errorf("error getting vbucket count: %v", err)
359+
}
360+
// we'd always expect a CAS value per vbucket if CCV is enabled and has propagated correctly - so fail if that's not the case
361+
if len(response.VBucketsMaxCas) != int(numVBuckets) {
362+
return false, nil, fmt.Errorf("error getting vbucket CAS, expected %d vbucket CAS values, got %q", numVBuckets, response.VBucketsMaxCas)
363+
}
364+
365+
highCAS := make(map[VBNo]uint64, numVBuckets)
366+
for i, casStr := range response.VBucketsMaxCas {
367+
cas, err := strconv.ParseUint(casStr, 10, 64)
368+
if err != nil {
369+
return false, nil, fmt.Errorf("error parsing vbucket CAS value %q for vBucket %d: %v", casStr, i, err)
370+
}
371+
highCAS[VBNo(i)] = cas
372+
}
373+
374+
return true, highCAS, nil
375+
}
376+
355377
func (b *GocbV2Bucket) getConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
356378
agent, err := b.GetGoCBAgent()
357379
if err != nil {

base/vbucket.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2025-Present Couchbase, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License included
4+
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
5+
// in that file, in accordance with the Business Source License, use of this
6+
// software will be governed by the Apache License, Version 2.0, included in
7+
// the file licenses/APL2.txt.
8+
9+
package base
10+
11+
import (
12+
"context"
13+
"sync"
14+
)
15+
16+
// VBNo represents a vBucket number
17+
type VBNo uint16
18+
19+
// VBucketCAS is a map of vbucket number to a CAS value. This can not be copied, it is an atomic data structure.
20+
type VBucketCAS struct {
21+
m sync.Map
22+
}
23+
24+
// Load returns the CAS value for the specified vbucket. This is a thread safe accessor optimized for reads. If no CAS value is found, it asserts in debug builds and returns 0.
25+
func (v *VBucketCAS) Load(ctx context.Context, vbNo VBNo) uint64 {
26+
rawCas, ok := v.m.Load(vbNo)
27+
if !ok {
28+
AssertfCtx(ctx, "VBucketCAS: No CAS found for vbucket %d", vbNo)
29+
return 0
30+
}
31+
cas, ok := rawCas.(uint64)
32+
if !ok {
33+
AssertfCtx(ctx, "VBucketCAS: Invalid CAS type %#v for vbucket %d", rawCas, vbNo)
34+
return 0
35+
}
36+
return cas
37+
}
38+
39+
// Store the CAS value for the specified vbucket. This is a thread safe method optimized for reads.
40+
func (v *VBucketCAS) Store(vbNo VBNo, cas uint64) {
41+
v.m.Store(vbNo, cas)
42+
}
43+
44+
// Clear removes all entries from the VBucketCAS. This is a thread safe method.
45+
func (v *VBucketCAS) Clear() {
46+
v.m.Clear()
47+
}

db/crud.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,16 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document
992992
if !hasHLV || (!cvCASMatch && !mouMatch) {
993993
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
994994
newVVEntry := Version{}
995-
newVVEntry.SourceID = db.dbCtx.EncodedSourceID
995+
sourceID := db.dbCtx.EncodedSourceID
996+
// use unknown source ID when CCV is enabled but doc cas is less than CCV CAS
997+
if db.dbCtx.CachedCCVEnabled.Load() {
998+
vbNo := sgbucket.VBHash(d.ID, db.dbCtx.numVBuckets)
999+
ccvStartingCas := db.dbCtx.CachedCCVStartingCas.Load(ctx, base.VBNo(vbNo))
1000+
if d.Cas <= ccvStartingCas {
1001+
sourceID = unknownSourceID
1002+
}
1003+
}
1004+
newVVEntry.SourceID = sourceID
9961005
newVVEntry.Value = d.Cas
9971006
err := d.HLV.AddVersion(newVVEntry)
9981007
if err != nil {

db/database.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ type DatabaseContext struct {
161161
DatabaseStartupError *DatabaseError // Error that occurred during database online processes startup
162162
CachedPurgeInterval atomic.Pointer[time.Duration] // If set, the cached value of the purge interval to avoid repeated lookups
163163
CachedVersionPruningWindow atomic.Pointer[time.Duration] // If set, the cached value of the version pruning window to avoid repeated lookups
164+
CachedCCVStartingCas *base.VBucketCAS // If set, the cached value of the CCV starting CAS value to avoid repeated lookups
165+
CachedCCVEnabled atomic.Bool // If set, the cached value of the CCV Enabled flag (this is not expected to transition from true->false, but could go false->true)
166+
numVBuckets uint16 // Number of vbuckets in the bucket
164167
}
165168

166169
type Scope struct {
@@ -416,19 +419,28 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
416419
RegisterImportPindexImpl(ctx, options.GroupID)
417420

418421
dbContext := &DatabaseContext{
419-
Name: dbName,
420-
UUID: cbgt.NewUUID(),
421-
MetadataStore: metadataStore,
422-
Bucket: bucket,
423-
BucketUUID: bucketUUID,
424-
EncodedSourceID: sourceID,
425-
StartTime: time.Now(),
426-
autoImport: autoImport,
427-
Options: options,
428-
DbStats: dbStats,
429-
CollectionByID: make(map[uint32]*DatabaseCollection),
430-
ServerUUID: serverUUID,
431-
UserFunctionTimeout: defaultUserFunctionTimeout,
422+
Name: dbName,
423+
UUID: cbgt.NewUUID(),
424+
MetadataStore: metadataStore,
425+
Bucket: bucket,
426+
BucketUUID: bucketUUID,
427+
EncodedSourceID: sourceID,
428+
StartTime: time.Now(),
429+
autoImport: autoImport,
430+
Options: options,
431+
DbStats: dbStats,
432+
CollectionByID: make(map[uint32]*DatabaseCollection),
433+
ServerUUID: serverUUID,
434+
UserFunctionTimeout: defaultUserFunctionTimeout,
435+
CachedCCVStartingCas: &base.VBucketCAS{},
436+
}
437+
dbContext.numVBuckets, err = bucket.GetMaxVbno()
438+
if err != nil {
439+
return nil, err
440+
}
441+
err = dbContext.updateCCVSettings(ctx)
442+
if err != nil {
443+
return nil, err
432444
}
433445

434446
// set up cancellable context based on the background context (context lifecycle for the database
@@ -1665,6 +1677,30 @@ func (db *DatabaseContext) GetVersionPruningWindow(ctx context.Context, forceRef
16651677
return vpw
16661678
}
16671679

1680+
// updateCCVSettings performs a management query to determine the latest crossClusterVersioning and max cas settings.
1681+
func (db *DatabaseContext) updateCCVSettings(ctx context.Context) error {
1682+
cbStore, ok := base.AsCouchbaseBucketStore(db.Bucket)
1683+
if !ok {
1684+
db.CachedCCVEnabled.Store(false)
1685+
return nil
1686+
}
1687+
1688+
// Fetch from Couchbase Server
1689+
enabled, maxCAS, err := cbStore.GetCCVSettings(ctx)
1690+
if err != nil {
1691+
return fmt.Errorf("Unable to retrieve server's CCV Starting CAS: %w", err)
1692+
}
1693+
1694+
db.CachedCCVEnabled.Store(enabled)
1695+
if !enabled {
1696+
db.CachedCCVStartingCas.Clear()
1697+
}
1698+
for vbNo, cas := range maxCAS {
1699+
db.CachedCCVStartingCas.Store(vbNo, cas)
1700+
}
1701+
return nil
1702+
}
1703+
16681704
func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error {
16691705
users, roles, err := c.allPrincipalIDs(ctx)
16701706
if err != nil {

db/hybrid_logical_vector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ const (
5151
minPVEntriesRetained = 3 // minPVEntriesRetained defines the minimum number of PV entries that should be retained after compaction, to avoid removing all history for infrequently updated/replicated documents.
5252
)
5353

54+
const (
55+
unknownSourceID = "Unknown+Source" // used if the document was written before ECCV was enabled
56+
)
57+
5458
type HLVVersions map[string]uint64 // map of source ID to version uint64 version value
5559

5660
// sorted will iterate through the map returning entries in a stable sorted order. Used by testing to make it easier

0 commit comments

Comments
 (0)