Skip to content

Commit e8befec

Browse files
committed
CBG-4435: Count scheduled compaction runs as idle KV/Query ops (#7310)
* Count scheduled compaction runs as idle KV/Query ops * Relax assertions because there are other idle KV ops happening behind the test * remove test logging
1 parent 99699a6 commit e8befec

File tree

7 files changed

+129
-65
lines changed

7 files changed

+129
-65
lines changed

base/stats.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ const (
8787
StatAddedVersion3dot1dot4 = "3.1.4"
8888
StatAddedVersion3dot2dot0 = "3.2.0"
8989
StatAddedVersion3dot2dot1 = "3.2.1"
90+
StatAddedVersion3dot2dot2 = "3.2.2"
9091
StatAddedVersion3dot3dot0 = "3.3.0"
9192

9293
StatDeprecatedVersionNotDeprecated = ""
@@ -312,6 +313,10 @@ func (g *GlobalStat) initResourceUtilizationStats() error {
312313
if err != nil {
313314
return err
314315
}
316+
resUtil.NumIdleQueryOps, err = NewIntStat(SubsystemDatabaseKey, "num_idle_query_ops", StatUnitNoUnits, NumIdleQueryOpsDesc, StatAddedVersion3dot2dot2, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, nil, nil, prometheus.CounterValue, 0)
317+
if err != nil {
318+
return err
319+
}
315320

316321
resUtil.Uptime, err = NewDurStat(ResourceUtilizationSubsystem, "uptime", StatUnitNanoseconds, UptimeDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, nil, nil, prometheus.CounterValue, time.Now())
317322
if err != nil {
@@ -366,8 +371,9 @@ type ResourceUtilization struct {
366371
// The node CPU usage calculation based values from /proc of user + system since the last time this function was called.
367372
NodeCpuPercentUtil *SgwFloatStat `json:"node_cpu_percent_utilization"`
368373

369-
// The number of background kv operations.
370-
NumIdleKvOps *SgwIntStat `json:"idle_kv_ops"`
374+
// The number of background kv/query operations.
375+
NumIdleKvOps *SgwIntStat `json:"idle_kv_ops"`
376+
NumIdleQueryOps *SgwIntStat `json:"idle_query_ops"`
371377

372378
// The memory utilization (Resident Set Size) for the process, in bytes.
373379
ProcessMemoryResident *SgwIntStat `json:"process_memory_resident"`

base/stats_descriptions.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ const (
314314

315315
SyncProcessComputeDesc = "The compute unit for syncing with clients measured through cpu time and memory used for sync"
316316

317-
NumIdleKvOpsDesc = "The total number of idle kv operations."
317+
NumIdleKvOpsDesc = "The total number of idle kv operations."
318+
NumIdleQueryOpsDesc = "The total number of idle query operations."
318319
)
319320

320321
// Delta Sync stats descriptions

db/background_mgr_tombstone_compaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ func (t *TombstoneCompactionManager) Run(ctx context.Context, options map[string
4545
database := options["database"].(*Database)
4646

4747
defer atomic.CompareAndSwapUint32(&database.CompactState, DBCompactRunning, DBCompactNotRunning)
48-
callback := func(docsPurged *int) {
48+
updateStatusCallback := func(docsPurged *int) {
4949
atomic.StoreInt64(&t.PurgedDocCount, int64(*docsPurged))
5050
}
5151

52-
_, err := database.Compact(ctx, true, callback, terminator)
52+
_, err := database.Compact(ctx, true, updateStatusCallback, terminator, false)
5353
if err != nil {
5454
return err
5555
}

db/database.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,15 +1440,19 @@ func (db *DatabaseContext) GetRoleIDs(ctx context.Context, useViews, includeDele
14401440
return roles, nil
14411441
}
14421442

1443-
// Trigger tombstone compaction from view and/or GSI indexes. Several Sync Gateway indexes server tombstones (deleted documents with an xattr).
1443+
type compactProgressCallbackFunc func(purgedDocCount *int)
1444+
1445+
// Compact runs tombstone compaction from view and/or GSI indexes - ensuring there's nothing left in the indexes for tombstoned documents that have been purged by the server.
1446+
//
1447+
// Several Sync Gateway indexes server tombstones (deleted documents with an xattr).
14441448
// There currently isn't a mechanism for server to remove these docs from the index when the tombstone is purged by the server during
14451449
// metadata purge, because metadata purge doesn't trigger a DCP event.
14461450
// When compact is run, Sync Gateway initiates a normal delete operation for the document and xattr (a Sync Gateway purge). This triggers
14471451
// removal of the document from the index. In the event that the document has already been purged by server, we need to recreate and delete
14481452
// the document to accomplish the same result.
1449-
type compactCallbackFunc func(purgedDocCount *int)
1450-
1451-
func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, callback compactCallbackFunc, terminator *base.SafeTerminator) (int, error) {
1453+
//
1454+
// The `isScheduledBackgroundTask` parameter is used to indicate if the compaction is being run as part of a scheduled background task, or an ad-hoc user-initiated `/{db}/_compact` request.
1455+
func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, optionalProgressCallback compactProgressCallbackFunc, terminator *base.SafeTerminator, isScheduledBackgroundTask bool) (purgedDocCount int, err error) {
14521456
if !skipRunningStateCheck {
14531457
if !atomic.CompareAndSwapUint32(&db.CompactState, DBCompactNotRunning, DBCompactRunning) {
14541458
return 0, base.HTTPErrorf(http.StatusServiceUnavailable, "Compaction already running")
@@ -1469,12 +1473,13 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
14691473
startTime := time.Now()
14701474
purgeOlderThan := startTime.Add(-purgeInterval)
14711475

1472-
purgedDocCount := 0
14731476
purgeErrorCount := 0
14741477
addErrorCount := 0
14751478
deleteErrorCount := 0
14761479

1477-
defer callback(&purgedDocCount)
1480+
if optionalProgressCallback != nil {
1481+
defer optionalProgressCallback(&purgedDocCount)
1482+
}
14781483

14791484
base.InfofCtx(ctx, base.KeyAll, "Starting compaction of purged tombstones for %s ...", base.MD(db.Name))
14801485

@@ -1493,6 +1498,9 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
14931498
for {
14941499
purgedDocs := make([]string, 0)
14951500
results, err := collection.QueryTombstones(ctx, purgeOlderThan, QueryTombstoneBatch)
1501+
if isScheduledBackgroundTask {
1502+
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Add(1)
1503+
}
14961504
if err != nil {
14971505
return 0, err
14981506
}
@@ -1513,11 +1521,17 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
15131521
base.DebugfCtx(ctx, base.KeyCRUD, "\tDeleting %q", tombstonesRow.Id)
15141522
// First, attempt to purge.
15151523
purgeErr := collection.Purge(ctx, tombstonesRow.Id, false)
1524+
if isScheduledBackgroundTask {
1525+
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
1526+
}
15161527
if purgeErr == nil {
15171528
purgedDocs = append(purgedDocs, tombstonesRow.Id)
15181529
} else if base.IsDocNotFoundError(purgeErr) {
15191530
// If key no longer exists, need to add and remove to trigger removal from view
15201531
_, addErr := collection.dataStore.Add(tombstonesRow.Id, 0, purgeBody)
1532+
if isScheduledBackgroundTask {
1533+
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
1534+
}
15211535
if addErr != nil {
15221536
addErrorCount++
15231537
base.InfofCtx(ctx, base.KeyAll, "Couldn't compact key %s (add): %v", base.UD(tombstonesRow.Id), addErr)
@@ -1528,7 +1542,11 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
15281542
// so mark it to be removed from cache, even if the subsequent delete fails
15291543
purgedDocs = append(purgedDocs, tombstonesRow.Id)
15301544

1531-
if delErr := collection.dataStore.Delete(tombstonesRow.Id); delErr != nil {
1545+
delErr := collection.dataStore.Delete(tombstonesRow.Id)
1546+
if isScheduledBackgroundTask {
1547+
base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Add(1)
1548+
}
1549+
if delErr != nil {
15321550
deleteErrorCount++
15331551
base.InfofCtx(ctx, base.KeyAll, "Couldn't compact key %s (delete): %v", base.UD(tombstonesRow.Id), delErr)
15341552
}
@@ -1552,7 +1570,9 @@ func (db *Database) Compact(ctx context.Context, skipRunningStateCheck bool, cal
15521570
}
15531571
base.InfofCtx(ctx, base.KeyAll, "Compacted %v tombstones", count)
15541572

1555-
callback(&purgedDocCount)
1573+
if optionalProgressCallback != nil {
1574+
optionalProgressCallback(&purgedDocCount)
1575+
}
15561576

15571577
if resultCount < QueryTombstoneBatch {
15581578
break
@@ -2442,7 +2462,7 @@ func (db *DatabaseContext) StartOnlineProcesses(ctx context.Context) (returnedEr
24422462
bgtTerminator.Close()
24432463
}()
24442464
bgt, err := NewBackgroundTask(ctx, "Compact", func(ctx context.Context) error {
2445-
_, err := db.Compact(ctx, false, func(purgedDocCount *int) {}, bgtTerminator)
2465+
_, err := db.Compact(ctx, false, nil, bgtTerminator, true)
24462466
if err != nil {
24472467
base.WarnfCtx(ctx, "Error trying to compact tombstoned documents for %q with error: %v", db.Name, err)
24482468
}

docs/api/components/schemas.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ ExpVars:
138138
num_idle_kv_ops:
139139
type: integer
140140
description: "The total number of idle kv operations."
141+
num_idle_query_ops:
142+
type: integer
143+
description: "The total number of idle query operations."
141144
process_cpu_percent_utilization:
142145
type: number
143146
format: float

rest/adminapitest/admin_api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4166,7 +4166,7 @@ func TestTombstoneCompactionPurgeInterval(t *testing.T) {
41664166

41674167
// Start compact to modify purge interval
41684168
database, _ := db.GetDatabase(dbc, nil)
4169-
_, err = database.Compact(ctx, false, func(purgedDocCount *int) {}, base.NewSafeTerminator())
4169+
_, err = database.Compact(ctx, false, nil, base.NewSafeTerminator(), false)
41704170
require.NoError(t, err)
41714171

41724172
assert.EqualValues(t, test.expectedPurgeIntervalAfterCompact, dbc.GetMetadataPurgeInterval(ctx))

rest/changestest/changes_api_test.go

Lines changed: 84 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3958,6 +3958,20 @@ func TestTombstoneCompaction(t *testing.T) {
39583958
t.Skip("If running with no xattrs compact acts as a no-op")
39593959
}
39603960

3961+
tests := []struct {
3962+
numDocs int
3963+
runAsScheduledBackgroundTask bool
3964+
}{
3965+
// Multiples of Batch Size
3966+
{numDocs: db.QueryTombstoneBatch},
3967+
{numDocs: db.QueryTombstoneBatch * 4},
3968+
// Smaller Than Batch Size
3969+
{numDocs: 2},
3970+
{numDocs: db.QueryTombstoneBatch / 4},
3971+
// Larger than Batch Size
3972+
{numDocs: db.QueryTombstoneBatch + 20},
3973+
}
3974+
39613975
var rt *rest.RestTester
39623976
numCollections := 1
39633977

@@ -3968,64 +3982,84 @@ func TestTombstoneCompaction(t *testing.T) {
39683982
rt = rest.NewRestTester(t, nil)
39693983
}
39703984
defer rt.Close()
3971-
zero := time.Duration(0)
3972-
rt.GetDatabase().Options.PurgeInterval = &zero
3973-
3974-
compactionTotal := 0
3975-
expectedBatches := 0
3985+
rt.GetDatabase().Options.PurgeInterval = base.Ptr(time.Duration(0))
39763986

3977-
TestCompact := func(numDocs int) {
3978-
3979-
count := 0
3987+
for _, test := range tests {
3988+
for _, runAsScheduledBackgroundTask := range []bool{false, true} {
3989+
t.Run(fmt.Sprintf("numDocs:%d asBackgroundTask:%v", test.numDocs, runAsScheduledBackgroundTask), func(t *testing.T) {
3990+
3991+
// seed with tombstones
3992+
for count := 0; count < test.numDocs; count++ {
3993+
for _, keyspace := range rt.GetKeyspaces() {
3994+
response := rt.SendAdminRequest("POST", fmt.Sprintf("/%s/", keyspace), `{"foo":"bar"}`)
3995+
assert.Equal(t, http.StatusOK, response.Code)
3996+
var body db.Body
3997+
err := base.JSONUnmarshal(response.Body.Bytes(), &body)
3998+
assert.NoError(t, err)
3999+
revID := body["rev"].(string)
4000+
docID := body["id"].(string)
4001+
4002+
response = rt.SendAdminRequest("DELETE", fmt.Sprintf("/%s/%s?rev=%s", keyspace, docID, revID), "")
4003+
assert.Equal(t, http.StatusOK, response.Code)
4004+
}
4005+
}
39804006

3981-
for count < numDocs {
3982-
count++
3983-
for _, keyspace := range rt.GetKeyspaces() {
3984-
response := rt.SendAdminRequest("POST", fmt.Sprintf("/%s/", keyspace), `{"foo":"bar"}`)
3985-
assert.Equal(t, 200, response.Code)
3986-
var body db.Body
3987-
err := base.JSONUnmarshal(response.Body.Bytes(), &body)
3988-
assert.NoError(t, err)
3989-
revId := body["rev"].(string)
3990-
docId := body["id"].(string)
4007+
expectedCompactions := test.numDocs * numCollections
4008+
expectedBatches := (test.numDocs/db.QueryTombstoneBatch + 1) * numCollections
39914009

3992-
response = rt.SendAdminRequest("DELETE", fmt.Sprintf("/%s/%s?rev=%s", keyspace, docId, revId), "")
3993-
assert.Equal(t, 200, response.Code)
3994-
}
3995-
}
3996-
resp := rt.SendAdminRequest("POST", "/{{.db}}/_compact", "")
3997-
rest.RequireStatus(t, resp, http.StatusOK)
4010+
numCompactionsBefore := int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value())
4011+
var numBatchesBefore int
4012+
if base.TestsDisableGSI() {
4013+
numBatchesBefore = int(rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value())
4014+
} else {
4015+
numBatchesBefore = int(rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value())
4016+
}
39984017

3999-
err := rt.WaitForCondition(func() bool {
4000-
time.Sleep(1 * time.Second)
4001-
return rt.GetDatabase().TombstoneCompactionManager.GetRunState() == db.BackgroundProcessStateCompleted
4002-
})
4003-
assert.NoError(t, err)
4018+
numIdleKvOpsBefore := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
4019+
numIdleQueryOpsBefore := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())
4020+
4021+
if runAsScheduledBackgroundTask {
4022+
database, err := db.CreateDatabase(rt.GetDatabase())
4023+
require.NoError(t, err)
4024+
purgedCount, err := database.Compact(base.TestCtx(t), false, nil, base.NewSafeTerminator(), true)
4025+
require.NoError(t, err)
4026+
require.Equal(t, expectedCompactions, purgedCount)
4027+
4028+
numIdleKvOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
4029+
numIdleQueryOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())
4030+
4031+
// cannot do equal here because there are other idle kv ops unrelated to compaction
4032+
assert.GreaterOrEqual(t, numIdleKvOpsAfter-numIdleKvOpsBefore, expectedCompactions)
4033+
assert.Equal(t, numIdleQueryOpsAfter-numIdleQueryOpsBefore, expectedBatches)
4034+
} else {
4035+
resp := rt.SendAdminRequest("POST", "/{{.db}}/_compact", "")
4036+
rest.RequireStatus(t, resp, http.StatusOK)
4037+
err := rt.WaitForCondition(func() bool {
4038+
return rt.GetDatabase().TombstoneCompactionManager.GetRunState() == db.BackgroundProcessStateCompleted
4039+
})
4040+
assert.NoError(t, err)
4041+
4042+
numIdleKvOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleKvOps.Value())
4043+
numIdleQueryOpsAfter := int(base.SyncGatewayStats.GlobalStats.ResourceUtilizationStats().NumIdleQueryOps.Value())
4044+
4045+
// ad-hoc compactions don't invoke idle ops - but we do have other idle kv ops so can't ensure it stays zero
4046+
assert.GreaterOrEqual(t, numIdleKvOpsAfter-numIdleKvOpsBefore, 0)
4047+
assert.Equal(t, numIdleQueryOpsAfter-numIdleQueryOpsBefore, 0)
4048+
}
40044049

4005-
compactionTotal += (numDocs * numCollections)
4006-
require.Equal(t, compactionTotal, int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value()))
4050+
actualCompactions := int(rt.GetDatabase().DbStats.Database().NumTombstonesCompacted.Value()) - numCompactionsBefore
4051+
require.Equal(t, expectedCompactions, actualCompactions)
40074052

4008-
var actualBatches int64
4009-
if base.TestsDisableGSI() {
4010-
actualBatches = rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value()
4011-
} else {
4012-
actualBatches = rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value()
4053+
var actualBatches int
4054+
if base.TestsDisableGSI() {
4055+
actualBatches = int(rt.GetDatabase().DbStats.Query(fmt.Sprintf(base.StatViewFormat, db.DesignDocSyncHousekeeping(), db.ViewTombstones)).QueryCount.Value()) - numBatchesBefore
4056+
} else {
4057+
actualBatches = int(rt.GetDatabase().DbStats.Query(db.QueryTypeTombstones).QueryCount.Value()) - numBatchesBefore
4058+
}
4059+
require.Equal(t, expectedBatches, actualBatches)
4060+
})
40134061
}
4014-
4015-
expectedBatches += (numDocs/db.QueryTombstoneBatch + 1) * numCollections
4016-
require.Equal(t, expectedBatches, int(actualBatches))
40174062
}
4018-
4019-
// Multiples of Batch Size
4020-
TestCompact(db.QueryTombstoneBatch)
4021-
TestCompact(db.QueryTombstoneBatch * 4)
4022-
4023-
// Smaller Than Batch Size
4024-
TestCompact(2)
4025-
TestCompact(db.QueryTombstoneBatch / 4)
4026-
4027-
// Larger than Batch Size
4028-
TestCompact(db.QueryTombstoneBatch + 20)
40294063
}
40304064

40314065
// TestOneShotGrantTiming simulates a one-shot changes feed returning before a previously issued grant has been

0 commit comments

Comments
 (0)