Skip to content

Commit 1ac5d43

Browse files
authored
fix(ReplicationBudgetManager): enable ReplicationBudgetManager by default (#7448)
<!-- Describe what has changed in this PR --> **What changed?** Enable ReplicationBudgetManager by default, removing the dynamic config to enable it. <!-- Tell your future self why have you made these changes --> **Why?** It can be disabled directly in the manager and it's disabled by default. It also creates confusion and allows the replication cache to be enable by shard only if the configs are not set properly. This could cause OOM issues. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Ran simulation and checked logs locally <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** This enables the budget manager by default but the budget manager capacity is set to zero. It should be unless the capacity has been explicitly set to not 0. <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: fimanishi <[email protected]>
1 parent 1151669 commit 1ac5d43

File tree

7 files changed

+69
-31
lines changed

7 files changed

+69
-31
lines changed

common/cache/budget.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ func (m *manager) metricsLoop() {
318318

319319
// updateMetrics emits current state metrics
320320
func (m *manager) updateMetrics() {
321+
// Skip metrics if both capacity limits are disabled (set to 0)
322+
if m.maxBytes() == 0 && m.maxCount() == 0 {
323+
return
324+
}
325+
321326
// Emit capacity metrics
322327
capacityBytes := m.CapacityBytes()
323328
// Only emit if not unlimited
@@ -344,6 +349,11 @@ func (m *manager) updateMetrics() {
344349

345350
// emitHardCapExceeded logs when hard capacity limit is exceeded and increments counter
346351
func (m *manager) emitHardCapExceeded(cacheID, budgetType string, requested, available uint64) {
352+
// Skip metrics if both capacity limits are disabled (set to 0)
353+
if m.maxBytes() == 0 && m.maxCount() == 0 {
354+
return
355+
}
356+
347357
if m.scope != nil {
348358
m.scope.IncCounter(metrics.BudgetManagerHardCapExceeded)
349359
}
@@ -361,6 +371,11 @@ func (m *manager) emitHardCapExceeded(cacheID, budgetType string, requested, ava
361371

362372
// emitSoftCapExceeded logs when soft cap is exceeded and increments counter
363373
func (m *manager) emitSoftCapExceeded(cacheID, budgetType string, requested, available uint64) {
374+
// Skip metrics if both capacity limits are disabled (set to 0)
375+
if m.maxBytes() == 0 && m.maxCount() == 0 {
376+
return
377+
}
378+
364379
if m.scope != nil {
365380
m.scope.IncCounter(metrics.BudgetManagerSoftCapExceeded)
366381
}
@@ -378,6 +393,11 @@ func (m *manager) emitSoftCapExceeded(cacheID, budgetType string, requested, ava
378393

379394
// emitCapacityExceeded emits appropriate metrics based on the error type
380395
func (m *manager) emitCapacityExceeded(cacheID string, err error, requestedBytes uint64, requestedCount int64, capResult CapEnforcementResult) {
396+
// Skip metrics if both capacity limits are disabled (set to 0)
397+
if m.maxBytes() == 0 && m.maxCount() == 0 {
398+
return
399+
}
400+
381401
switch err {
382402
case ErrBytesBudgetExceeded:
383403
m.emitHardCapExceeded(cacheID, budgetTypeBytes, requestedBytes, capResult.AvailableBytes)

common/cache/budget_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"time"
1212

1313
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/mock"
1415

1516
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
1617
"github.com/uber/cadence/common/log/testlogger"
18+
metricsmocks "github.com/uber/cadence/common/metrics/mocks"
1719
)
1820

1921
func TestBudgetManager_ReserveForCache(t *testing.T) {
@@ -2230,3 +2232,40 @@ func BenchmarkBudgetManager_HighContention(b *testing.B) {
22302232
}
22312233
})
22322234
}
2235+
2236+
func TestBudgetManager_DisabledManager_NoMetricsEmitted(t *testing.T) {
2237+
// Create a mock scope that will track all metric calls
2238+
mockScope := &metricsmocks.Scope{}
2239+
// Mock the Tagged call which returns itself
2240+
mockScope.On("Tagged", mock.Anything).Return(mockScope)
2241+
2242+
// Create manager with BOTH capacities disabled (set to 0)
2243+
mgr := NewBudgetManager(
2244+
"test-disabled-manager",
2245+
dynamicproperties.GetIntPropertyFn(0), // maxBytes = 0 (disabled)
2246+
dynamicproperties.GetIntPropertyFn(0), // maxCount = 0 (disabled)
2247+
AdmissionOptimistic,
2248+
0,
2249+
mockScope,
2250+
testlogger.New(t),
2251+
dynamicproperties.GetFloatPropertyFn(0.8),
2252+
)
2253+
defer mgr.Stop()
2254+
2255+
// Give time for any initial metrics to be emitted
2256+
time.Sleep(100 * time.Millisecond)
2257+
2258+
// Try to reserve capacity (should fail but shouldn't emit metrics)
2259+
err := mgr.(*manager).ReserveForCache("cache1", 100, 1)
2260+
assert.Error(t, err)
2261+
2262+
// Try to release capacity (should not emit metrics)
2263+
mgr.(*manager).ReleaseForCache("cache1", 100, 1)
2264+
2265+
// Wait a bit to ensure no async metrics are emitted
2266+
time.Sleep(100 * time.Millisecond)
2267+
2268+
// Verify NO metrics were emitted (no calls to UpdateGauge or IncCounter)
2269+
mockScope.AssertNotCalled(t, "UpdateGauge")
2270+
mockScope.AssertNotCalled(t, "IncCounter")
2271+
}

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,12 +1828,6 @@ const (
18281828
// Default value: true
18291829
// Allowed filters: DomainID, WorkflowID
18301830
EnableReplicationTaskGeneration
1831-
// EnableReplicationBudgetManager enables the replication budget manager for cache capacity control
1832-
// KeyName: history.enableReplicationBudgetManager
1833-
// Value type: Bool
1834-
// Default value: false
1835-
// Allowed filters: N/A
1836-
EnableReplicationBudgetManager
18371831
// UseNewInitialFailoverVersion is a switch to issue a failover version based on the minFailoverVersion
18381832
// rather than the default initialFailoverVersion. USed as a per-domain migration switch
18391833
// KeyName: history.useNewInitialFailoverVersion
@@ -4442,11 +4436,6 @@ var BoolKeys = map[BoolKey]DynamicBool{
44424436
Description: "EnableReplicationTaskGeneration is the flag to control replication generation",
44434437
DefaultValue: true,
44444438
},
4445-
EnableReplicationBudgetManager: {
4446-
KeyName: "history.enableReplicationBudgetManager",
4447-
Description: "EnableReplicationBudgetManager enables the replication budget manager for cache capacity control",
4448-
DefaultValue: false,
4449-
},
44504439
UseNewInitialFailoverVersion: {
44514440
KeyName: "history.useNewInitialFailoverVersion",
44524441
Description: "use the minInitialFailover version",

config/dynamicconfig/replication_simulation_budget_manager.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ frontend.failoverCoolDown:
1717
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
1818
- value: 10ms
1919

20-
# Budget Manager Configuration - Enable the manager
21-
history.enableReplicationBudgetManager:
22-
- value: true
23-
constraints: {}
24-
2520
# Set budget limits for testing
2621
# Max size in bytes - set to a moderate value to test capacity enforcement
2722
history.replicationBudgetManagerMaxSizeBytes:

service/history/config/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ type Config struct {
191191
ReplicationBudgetManagerMaxSizeBytes dynamicproperties.IntPropertyFn
192192
ReplicationBudgetManagerMaxSizeCount dynamicproperties.IntPropertyFn
193193
ReplicationBudgetManagerSoftCapThreshold dynamicproperties.FloatPropertyFn
194-
EnableReplicationBudgetManager dynamicproperties.BoolPropertyFn
195194

196195
// System Limits
197196
MaximumBufferedEventsBatch dynamicproperties.IntPropertyFn
@@ -491,7 +490,6 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
491490
ReplicationBudgetManagerMaxSizeBytes: dc.GetIntProperty(dynamicproperties.ReplicationBudgetManagerMaxSizeBytes),
492491
ReplicationBudgetManagerMaxSizeCount: dc.GetIntProperty(dynamicproperties.ReplicationBudgetManagerMaxSizeCount),
493492
ReplicationBudgetManagerSoftCapThreshold: dc.GetFloat64Property(dynamicproperties.ReplicationBudgetManagerSoftCapThreshold),
494-
EnableReplicationBudgetManager: dc.GetBoolProperty(dynamicproperties.EnableReplicationBudgetManager),
495493

496494
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicproperties.MaximumBufferedEventsBatch),
497495
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByDomain(dynamicproperties.MaximumSignalsPerExecution),

service/history/config/config_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ func TestNewConfig(t *testing.T) {
170170
"ReplicationBudgetManagerMaxSizeBytes": {dynamicproperties.ReplicationBudgetManagerMaxSizeBytes, 0},
171171
"ReplicationBudgetManagerMaxSizeCount": {dynamicproperties.ReplicationBudgetManagerMaxSizeCount, 0},
172172
"ReplicationBudgetManagerSoftCapThreshold": {dynamicproperties.ReplicationBudgetManagerSoftCapThreshold, 1.0},
173-
"EnableReplicationBudgetManager": {dynamicproperties.EnableReplicationBudgetManager, false},
174173
"ExecutionMgrNumConns": {dynamicproperties.ExecutionMgrNumConns, 57},
175174
"HistoryMgrNumConns": {dynamicproperties.HistoryMgrNumConns, 58},
176175
"MaximumBufferedEventsBatch": {dynamicproperties.MaximumBufferedEventsBatch, 59},

service/history/handler/handler.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,16 @@ func (h *handlerImpl) Start() {
137137
h.config,
138138
)
139139

140-
if h.config.EnableReplicationBudgetManager() {
141-
h.replicationBudgetManager = cache.NewBudgetManager(
142-
"replication-budget-manager",
143-
h.config.ReplicationBudgetManagerMaxSizeBytes,
144-
h.config.ReplicationBudgetManagerMaxSizeCount,
145-
cache.AdmissionOptimistic,
146-
0,
147-
h.GetMetricsClient().Scope(metrics.ReplicatorCacheManagerScope),
148-
h.GetLogger(),
149-
h.config.ReplicationBudgetManagerSoftCapThreshold,
150-
)
151-
}
140+
h.replicationBudgetManager = cache.NewBudgetManager(
141+
"replication-budget-manager",
142+
h.config.ReplicationBudgetManagerMaxSizeBytes,
143+
h.config.ReplicationBudgetManagerMaxSizeCount,
144+
cache.AdmissionOptimistic,
145+
0,
146+
h.GetMetricsClient().Scope(metrics.ReplicatorCacheManagerScope),
147+
h.GetLogger(),
148+
h.config.ReplicationBudgetManagerSoftCapThreshold,
149+
)
152150

153151
h.controller = shard.NewShardController(
154152
h.Resource,

0 commit comments

Comments
 (0)