Skip to content

Commit ef889e1

Browse files
Shaddollins-tril
authored andcommitted
Set up simulation test for history queue v2's pending task alert (cadence-workflow#7180)
1 parent fbf79f2 commit ef889e1

15 files changed

+180
-69
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2614,6 +2614,7 @@ const (
26142614
// Default value: 5s (5*time.Second)
26152615
// Allowed filters: N/A
26162616
QueueProcessorPollBackoffInterval
2617+
VirtualSliceForceAppendInterval
26172618
// TimerProcessorUpdateAckInterval is update interval for timer processor
26182619
// KeyName: history.timerProcessorUpdateAckInterval
26192620
// Value type: Duration
@@ -5129,6 +5130,11 @@ var DurationKeys = map[DurationKey]DynamicDuration{
51295130
Description: "QueueProcessorPollBackoffInterval is the backoff duration when queue processor is throttled",
51305131
DefaultValue: time.Second * 5,
51315132
},
5133+
VirtualSliceForceAppendInterval: {
5134+
KeyName: "history.virtualSliceForceAppendInterval",
5135+
Description: "VirtualSliceForceAppendInterval is the duration forcing a new virtual slice to be appended to the root virtual queue instead of being merged. It has 2 benefits: First, virtual slices won't grow infinitely, task loading for that slice can complete and its scope can be shrinked. Second, when we need to unload a virtual slice to free memory, we won't unload too many tasks.",
5136+
DefaultValue: time.Minute * 5,
5137+
},
51325138
TimerProcessorUpdateAckInterval: {
51335139
KeyName: "history.timerProcessorUpdateAckInterval",
51345140
Description: "TimerProcessorUpdateAckInterval is update interval for timer processor",

docker/github_actions/docker-compose-local-history-simulation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ services:
4949
- -e
5050
- -c
5151
- >
52-
go test -timeout 180s
52+
go test -timeout 300s
5353
-run ^TestHistorySimulation.*$
5454
-count 1
5555
-v

service/history/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type Config struct {
119119
EnableTransferQueueV2PendingTaskCountAlert dynamicproperties.BoolPropertyFnWithShardIDFilter
120120
QueueCriticalPendingTaskCount dynamicproperties.IntPropertyFn
121121
QueueMaxVirtualQueueCount dynamicproperties.IntPropertyFn
122+
VirtualSliceForceAppendInterval dynamicproperties.DurationPropertyFn
122123

123124
// QueueProcessor settings
124125
QueueProcessorEnableSplit dynamicproperties.BoolPropertyFn
@@ -417,6 +418,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
417418
EnableTransferQueueV2PendingTaskCountAlert: dc.GetBoolPropertyFilteredByShardID(dynamicproperties.EnableTransferQueueV2PendingTaskCountAlert),
418419
QueueCriticalPendingTaskCount: dc.GetIntProperty(dynamicproperties.QueueCriticalPendingTaskCount),
419420
QueueMaxVirtualQueueCount: dc.GetIntProperty(dynamicproperties.QueueMaxVirtualQueueCount),
421+
VirtualSliceForceAppendInterval: dc.GetDurationProperty(dynamicproperties.VirtualSliceForceAppendInterval),
420422

421423
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicproperties.QueueProcessorEnableSplit),
422424
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicproperties.QueueProcessorSplitMaxLevel),

service/history/config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ func TestNewConfig(t *testing.T) {
275275
"EnableTransferQueueV2PendingTaskCountAlert": {dynamicproperties.EnableTransferQueueV2PendingTaskCountAlert, true},
276276
"QueueCriticalPendingTaskCount": {dynamicproperties.QueueCriticalPendingTaskCount, 100},
277277
"QueueMaxVirtualQueueCount": {dynamicproperties.QueueMaxVirtualQueueCount, 101},
278+
"VirtualSliceForceAppendInterval": {dynamicproperties.VirtualSliceForceAppendInterval, time.Second},
278279
}
279280
client := dynamicconfig.NewInMemoryClient()
280281
for fieldName, expected := range fields {

service/history/queuev2/queue_base.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type (
6060
MaxPendingTasksCount dynamicproperties.IntPropertyFn
6161
PollBackoffInterval dynamicproperties.DurationPropertyFn
6262
PollBackoffIntervalJitterCoefficient dynamicproperties.FloatPropertyFn
63+
VirtualSliceForceAppendInterval dynamicproperties.DurationPropertyFn
6364
// monitor & mitigator options
6465
CriticalPendingTaskCount dynamicproperties.IntPropertyFn
6566
EnablePendingTaskCountAlert func() bool
@@ -162,22 +163,25 @@ func newQueueBase(
162163
timeSource,
163164
quotas.NewDynamicRateLimiter(options.MaxPollRPS.AsFloat64()),
164165
monitor,
165-
&VirtualQueueOptions{
166-
PageSize: options.PageSize,
167-
MaxPendingTasksCount: options.MaxPendingTasksCount,
168-
PollBackoffInterval: options.PollBackoffInterval,
169-
PollBackoffIntervalJitterCoefficient: options.PollBackoffIntervalJitterCoefficient,
170-
},
171-
&VirtualQueueOptions{
172-
PageSize: options.PageSize,
173-
// non-root queues should not trigger task unloading
174-
// otherwise those virtual queues will keep loading, hit pending task count limit, unload, throttle, load, etc...
175-
// use a limit lower than the critical pending task count instead
176-
MaxPendingTasksCount: func(opts ...dynamicproperties.FilterOption) int {
177-
return int(float64(options.CriticalPendingTaskCount(opts...)) * nonRootQueueMaxPendingTaskCoefficient)
166+
&VirtualQueueManagerOptions{
167+
RootQueueOptions: &VirtualQueueOptions{
168+
PageSize: options.PageSize,
169+
MaxPendingTasksCount: options.MaxPendingTasksCount,
170+
PollBackoffInterval: options.PollBackoffInterval,
171+
PollBackoffIntervalJitterCoefficient: options.PollBackoffIntervalJitterCoefficient,
172+
},
173+
NonRootQueueOptions: &VirtualQueueOptions{
174+
PageSize: options.PageSize,
175+
// non-root queues should not trigger task unloading
176+
// otherwise those virtual queues will keep loading, hit pending task count limit, unload, throttle, load, etc...
177+
// use a limit lower than the critical pending task count instead
178+
MaxPendingTasksCount: func(opts ...dynamicproperties.FilterOption) int {
179+
return int(float64(options.CriticalPendingTaskCount(opts...)) * nonRootQueueMaxPendingTaskCoefficient)
180+
},
181+
PollBackoffInterval: options.PollBackoffInterval,
182+
PollBackoffIntervalJitterCoefficient: options.PollBackoffIntervalJitterCoefficient,
178183
},
179-
PollBackoffInterval: options.PollBackoffInterval,
180-
PollBackoffIntervalJitterCoefficient: options.PollBackoffIntervalJitterCoefficient,
184+
VirtualSliceForceAppendInterval: options.VirtualSliceForceAppendInterval,
181185
},
182186
queueState.VirtualQueueStates,
183187
)

service/history/queuev2/queue_immediate_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestImmediateQueue_LifeCycle(t *testing.T) {
7878
MaxPollRPS: dynamicproperties.GetIntPropertyFn(100),
7979
MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100),
8080
CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90),
81+
VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10),
8182
EnablePendingTaskCountAlert: func() bool { return true },
8283
MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2),
8384
}

service/history/queuev2/queue_scheduled_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestScheduledQueue_LifeCycle(t *testing.T) {
7878
MaxPollRPS: dynamicproperties.GetIntPropertyFn(100),
7979
MaxPendingTasksCount: dynamicproperties.GetIntPropertyFn(100),
8080
PollBackoffIntervalJitterCoefficient: dynamicproperties.GetFloatPropertyFn(0.0),
81+
VirtualSliceForceAppendInterval: dynamicproperties.GetDurationPropertyFn(time.Second * 10),
8182
CriticalPendingTaskCount: dynamicproperties.GetIntPropertyFn(90),
8283
EnablePendingTaskCountAlert: func() bool { return true },
8384
MaxVirtualQueueCount: dynamicproperties.GetIntPropertyFn(2),

service/history/queuev2/timer_queue_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func (f *timerQueueFactory) createQueuev2(
151151
MaxPendingTasksCount: config.QueueMaxPendingTaskCount,
152152
PollBackoffInterval: config.QueueProcessorPollBackoffInterval,
153153
PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
154+
VirtualSliceForceAppendInterval: config.VirtualSliceForceAppendInterval,
154155
MaxStartJitterInterval: dynamicproperties.GetDurationPropertyFn(0),
155156
RedispatchInterval: config.ActiveTaskRedispatchInterval,
156157
CriticalPendingTaskCount: config.QueueCriticalPendingTaskCount,

service/history/queuev2/transfer_queue_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func (f *transferQueueFactory) createQueuev2(
158158
MaxPendingTasksCount: config.QueueMaxPendingTaskCount,
159159
PollBackoffInterval: config.QueueProcessorPollBackoffInterval,
160160
PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
161+
VirtualSliceForceAppendInterval: config.VirtualSliceForceAppendInterval,
161162
EnableValidator: config.TransferProcessorEnableValidator,
162163
ValidationInterval: config.TransferProcessorValidationInterval,
163164
MaxStartJitterInterval: dynamicproperties.GetDurationPropertyFn(0),

service/history/queuev2/virtual_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
348348
if q.pauseController.IsPaused() {
349349
// emit a metric indicating that the virtual queue is paused
350350
q.metricsScope.UpdateGauge(metrics.VirtualQueuePausedGauge, 1.0)
351-
q.logger.Debug("virtual queue is paused")
351+
q.logger.Debug("virtual queue is paused", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount))
352352
return
353353
}
354354

0 commit comments

Comments
 (0)