Skip to content

Commit fefbd63

Browse files
authored
CBG-4880: Use versionPruningWindowHrs for HLV Compaction instead of metadata purge interval (#7777)
1 parent f43c0ff commit fefbd63

File tree

6 files changed

+133
-51
lines changed

6 files changed

+133
-51
lines changed

base/bucket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type WrappingDatastore interface {
5555
type CouchbaseBucketStore interface {
5656
GetName() string
5757
MgmtEps() ([]string, error)
58+
VersionPruningWindow(ctx context.Context) (time.Duration, error)
5859
MetadataPurgeInterval(ctx context.Context) (time.Duration, error)
5960
MaxTTL(context.Context) (int, error)
6061
HttpClient(context.Context) *http.Client

base/bucket_gocb_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2632,3 +2632,22 @@ func TestWriteUpdateWithXattrsDocumentTombstone(t *testing.T) {
26322632
require.JSONEq(t, string(xattrModifiedBody), string(xattrs[xattr1Key]))
26332633
require.NotContains(t, xattrs, xattr2Key)
26342634
}
2635+
2636+
func TestVersionPruningWindow(t *testing.T) {
2637+
if UnitTestUrlIsWalrus() {
2638+
t.Skip("This test only works against Couchbase Server")
2639+
}
2640+
2641+
ctx := TestCtx(t)
2642+
bucket := GetTestBucket(t)
2643+
defer bucket.Close(ctx)
2644+
2645+
cbStore, ok := AsCouchbaseBucketStore(bucket)
2646+
require.True(t, ok)
2647+
2648+
vpw, err := cbStore.VersionPruningWindow(ctx)
2649+
require.NoError(t, err)
2650+
2651+
// it's assumed that anywhere this test is running has a default pruning window (30 days) - but this at least ensures we can retrieve it
2652+
assert.Equal(t, time.Hour*24*30, vpw)
2653+
}

base/collection.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,12 +468,35 @@ func (b *GocbV2Bucket) QueryEpsCount() (int, error) {
468468
return len(agent.N1qlEps()), nil
469469
}
470470

471-
// Gets the metadata purge interval for the bucket. First checks for a bucket-specific value. If not
472-
// found, retrieves the cluster-wide value.
471+
// MetadataPurgeInterval gets the metadata purge interval for the bucket. Checks for a bucket-specific value before the cluster value.
473472
func (b *GocbV2Bucket) MetadataPurgeInterval(ctx context.Context) (time.Duration, error) {
474473
return getMetadataPurgeInterval(ctx, b)
475474
}
476475

476+
// VersionPruningWindow gets the version pruning window for the bucket.
477+
func (b *GocbV2Bucket) VersionPruningWindow(ctx context.Context) (time.Duration, error) {
478+
uri := fmt.Sprintf("/pools/default/buckets/%s", b.GetName())
479+
respBytes, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
480+
if err != nil {
481+
return 0, err
482+
}
483+
484+
if statusCode == http.StatusForbidden {
485+
return 0, RedactErrorf("403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri))
486+
} else if statusCode != http.StatusOK {
487+
return 0, fmt.Errorf("failed with status code %d", statusCode)
488+
}
489+
490+
var response struct {
491+
VersionPruningWindowHrs int64 `json:"versionPruningWindowHrs,omitempty"`
492+
}
493+
if err := JSONUnmarshal(respBytes, &response); err != nil {
494+
return 0, err
495+
}
496+
497+
return time.Duration(response.VersionPruningWindowHrs) * time.Hour, nil
498+
}
499+
477500
func (b *GocbV2Bucket) MaxTTL(ctx context.Context) (int, error) {
478501
return getMaxTTL(ctx, b)
479502
}

db/crud.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,8 +1033,8 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document
10331033
}
10341034
// clean up PV only if we have more than a handful of source IDs - reduce Compaction and false-conflict risk where we don't need it
10351035
if len(d.HLV.PreviousVersions) > minPVEntriesBeforeCompaction {
1036-
mpi := db.dbCtx.GetMetadataPurgeInterval(ctx, false)
1037-
d.HLV.Compact(ctx, d.ID, mpi)
1036+
vpw := db.dbCtx.GetVersionPruningWindow(ctx, false)
1037+
d.HLV.Compact(ctx, d.ID, vpw)
10381038
}
10391039
d.SyncData.SetCV(d.HLV)
10401040
return d, nil

db/database.go

Lines changed: 83 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ const (
6969
// used as default metadata purge interval when the server’s purge
7070
// interval (either bucket specific or cluster wide) is not available.
7171
DefaultPurgeInterval = 30 * 24 * time.Hour
72+
DefaultVersionPruningWindow = DefaultPurgeInterval
7273
DefaultSGReplicateEnabled = true
7374
DefaultSGReplicateWebsocketPingInterval = time.Minute * 5
7475
DefaultCompactInterval = 24 * time.Hour
@@ -159,57 +160,59 @@ type DatabaseContext struct {
159160
BroadcastSlowMode atomic.Bool // bool to indicate if a slower ticker value should be used to notify changes feeds of changes
160161
DatabaseStartupError *DatabaseError // Error that occurred during database online processes startup
161162
CachedPurgeInterval atomic.Pointer[time.Duration] // If set, the cached value of the purge interval to avoid repeated lookups
163+
CachedVersionPruningWindow atomic.Pointer[time.Duration] // If set, the cached value of the version pruning window to avoid repeated lookups
162164
}
163165

164166
type Scope struct {
165167
Collections map[string]*DatabaseCollection
166168
}
167169

168170
type DatabaseContextOptions struct {
169-
CacheOptions *CacheOptions
170-
RevisionCacheOptions *RevisionCacheOptions
171-
OldRevExpirySeconds uint32
172-
AdminInterface *string
173-
UnsupportedOptions *UnsupportedOptions
174-
OIDCOptions *auth.OIDCOptions
175-
LocalJWTConfig auth.LocalJWTConfig
176-
ImportOptions ImportOptions
177-
EnableXattr bool // Use xattr for _sync
178-
LocalDocExpirySecs uint32 // The _local doc expiry time in seconds
179-
SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride
180-
SessionCookieName string // Pass-through DbConfig.SessionCookieName
181-
SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly
182-
UserFunctions *UserFunctions // JS/N1QL functions clients can call
183-
AllowConflicts *bool // False forbids creating conflicts
184-
SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header
185-
DisablePasswordAuthentication bool // True enforces OIDC/guest only
186-
UseViews bool // Force use of views
187-
DeltaSyncOptions DeltaSyncOptions // Delta Sync Options
188-
CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run
189-
SGReplicateOptions SGReplicateOptions
190-
SlowQueryWarningThreshold time.Duration
191-
QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit
192-
UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled.
193-
ClientPartitionWindow time.Duration
194-
BcryptCost int
195-
GroupID string
196-
JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter)
197-
UseLegacySyncDocsIndex bool
198-
Scopes ScopesOptions
199-
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
200-
MetadataID string // MetadataID used for metadata storage
201-
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
202-
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
203-
ConfigPrincipals *ConfigPrincipals
204-
TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests
205-
LoggingConfig *base.DbLogConfig // Per-database log configuration
206-
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
207-
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
208-
NumIndexReplicas uint // Number of replicas for GSI indexes
209-
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
210-
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
211-
DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database
212-
StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs
171+
CacheOptions *CacheOptions
172+
RevisionCacheOptions *RevisionCacheOptions
173+
OldRevExpirySeconds uint32
174+
AdminInterface *string
175+
UnsupportedOptions *UnsupportedOptions
176+
OIDCOptions *auth.OIDCOptions
177+
LocalJWTConfig auth.LocalJWTConfig
178+
ImportOptions ImportOptions
179+
EnableXattr bool // Use xattr for _sync
180+
LocalDocExpirySecs uint32 // The _local doc expiry time in seconds
181+
SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride
182+
SessionCookieName string // Pass-through DbConfig.SessionCookieName
183+
SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly
184+
UserFunctions *UserFunctions // JS/N1QL functions clients can call
185+
AllowConflicts *bool // False forbids creating conflicts
186+
SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header
187+
DisablePasswordAuthentication bool // True enforces OIDC/guest only
188+
UseViews bool // Force use of views
189+
DeltaSyncOptions DeltaSyncOptions // Delta Sync Options
190+
CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run
191+
SGReplicateOptions SGReplicateOptions
192+
SlowQueryWarningThreshold time.Duration
193+
QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit
194+
UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled.
195+
ClientPartitionWindow time.Duration
196+
BcryptCost int
197+
GroupID string
198+
JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter)
199+
UseLegacySyncDocsIndex bool
200+
Scopes ScopesOptions
201+
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
202+
MetadataID string // MetadataID used for metadata storage
203+
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
204+
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
205+
ConfigPrincipals *ConfigPrincipals
206+
TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests
207+
TestVersionPruningWindowOverride *time.Duration // If set, use this value for db.GetVersionPruningWindow - test seam to force specific pruning window for tests
208+
LoggingConfig *base.DbLogConfig // Per-database log configuration
209+
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
210+
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
211+
NumIndexReplicas uint // Number of replicas for GSI indexes
212+
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
213+
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
214+
DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database
215+
StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs
213216
}
214217

215218
type ConfigPrincipals struct {
@@ -1626,6 +1629,42 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context, forceRe
16261629
return mpi
16271630
}
16281631

1632+
// GetVersionPruningWindow returns the current value for the XDCR Version Pruning Window for the backing bucket.
1633+
// if forceRefresh is set, we'll always fetch a new value from the bucket, even if we had one cached.
1634+
func (db *DatabaseContext) GetVersionPruningWindow(ctx context.Context, forceRefresh bool) time.Duration {
1635+
// test override
1636+
if db.Options.TestVersionPruningWindowOverride != nil {
1637+
return *db.Options.TestVersionPruningWindowOverride
1638+
}
1639+
1640+
// fetch cached value if available
1641+
if !forceRefresh {
1642+
vpw := db.CachedVersionPruningWindow.Load()
1643+
if vpw != nil {
1644+
return *vpw
1645+
}
1646+
}
1647+
1648+
// fetch from server
1649+
cbStore, ok := base.AsCouchbaseBucketStore(db.Bucket)
1650+
if !ok {
1651+
return DefaultVersionPruningWindow
1652+
}
1653+
1654+
serverVersionPruningWindow, err := cbStore.VersionPruningWindow(ctx)
1655+
if err != nil {
1656+
base.WarnfCtx(ctx, "Unable to retrieve server's version pruning window - using default %.2f days. %s", DefaultVersionPruningWindow.Hours()/24, err)
1657+
}
1658+
1659+
vpw := DefaultVersionPruningWindow
1660+
if serverVersionPruningWindow > 0 {
1661+
vpw = serverVersionPruningWindow
1662+
}
1663+
1664+
db.CachedVersionPruningWindow.Store(&vpw)
1665+
return vpw
1666+
}
1667+
16291668
func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error {
16301669
users, roles, err := c.allPrincipalIDs(ctx)
16311670
if err != nil {

rest/replicatortest/replicator_conflict_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,9 +1039,9 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) {
10391039
ctx1 := rt1.Context()
10401040
ctx2 := rt2.Context()
10411041

1042-
// disable purge interval so we can avoid HLV compaction for the artificially low HLV values in this test
1043-
rt1.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0))
1044-
rt2.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0))
1042+
// disable pruning window so we can avoid HLV compaction for the artificially low HLV values in this test
1043+
rt1.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0))
1044+
rt2.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0))
10451045

10461046
docID := "doc1_"
10471047
version := rt2.PutDoc(docID, `{"source":"rt2","channels":["alice"]}`)

0 commit comments

Comments
 (0)