Skip to content

Commit 23eede1

Browse files
authored
Set up integration test for pending task alert of history queue v2 (#7196)
1 parent bebe6c3 commit 23eede1

17 files changed

+194
-27
lines changed

.github/workflows/ci-checks.yml

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,37 @@ jobs:
133133
name: go-cassandra-running-history-queue-v2-integration-coverage
134134
path: .build/coverage/*.out
135135

136+
golang-integration-test-with-cassandra-running-history-queue-v2-alert:
137+
name: Golang integration test with running history queue v2 with alert
138+
runs-on: ubuntu-latest
139+
continue-on-error: true
140+
141+
steps:
142+
- name: Checkout
143+
uses: actions/checkout@v4
144+
with:
145+
submodules: true
146+
147+
- name: Setup Go environment
148+
uses: actions/setup-go@v5
149+
with:
150+
go-version: '1.23.4'
151+
152+
- name: Run integration profile for cassandra running history queue v2 with alert
153+
uses: nick-fields/retry@v3
154+
with:
155+
max_attempts: 2
156+
timeout_minutes: 30
157+
command: |
158+
docker compose -f docker/github_actions/docker-compose.yml run integration-test-cassandra-queue-v2-alert bash -c "make .just-build && make cover_integration_profile"
159+
160+
- name: Upload coverage artifacts
161+
if: always()
162+
uses: actions/upload-artifact@v4
163+
with:
164+
name: go-cassandra-running-history-queue-v2-alert-integration-coverage
165+
path: .build/coverage/*.out
166+
136167

137168
golang-integration-test-with-cassandra-and-elasticsearch-v7:
138169
name: Golang integration test with cassandra and elasticsearch v7
@@ -465,4 +496,4 @@ jobs:
465496
max_attempts: 2
466497
timeout_minutes: 30
467498
command: |
468-
docker compose -f docker/github_actions/docker-compose.yml run integration-test-with-etcd bash -c "make .just-build && make integration_tests_etcd"
499+
docker compose -f docker/github_actions/docker-compose.yml run integration-test-with-etcd bash -c "make .just-build && make integration_tests_etcd"

common/persistence/persistence-tests/persistenceTestBase.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase {
156156
EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true),
157157
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
158158
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
159-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
160159
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
160+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
161161
}
162162
params := TestBaseParams{
163163
DefaultTestCluster: testCluster,
@@ -188,8 +188,8 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase {
188188
EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true),
189189
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
190190
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
191-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
192191
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
192+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
193193
}
194194
params := TestBaseParams{
195195
DefaultTestCluster: testCluster,

docker/github_actions/docker-compose-local.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,37 @@ services:
130130
aliases:
131131
- integration-test
132132

133+
integration-test-cassandra-queue-v2-alert:
134+
build:
135+
context: ../../
136+
dockerfile: ./docker/github_actions/Dockerfile${DOCKERFILE_SUFFIX}
137+
command: make cover_integration_profile
138+
environment:
139+
- "CASSANDRA_HOST=cassandra"
140+
- "CASSANDRA=1"
141+
- "CASSANDRA_SEEDS=cassandra"
142+
- "ES_SEEDS=elasticsearch"
143+
- "KAFKA_SEEDS=kafka"
144+
- "TEST_TAG=esintegration"
145+
- "ENABLE_QUEUE_V2=true"
146+
- "ENABLE_QUEUE_V2_ALERT=true"
147+
- "V=1"
148+
depends_on:
149+
cassandra:
150+
condition: service_healthy
151+
elasticsearch:
152+
condition: service_started
153+
kafka:
154+
condition: service_started
155+
volumes:
156+
- ../../:/cadence
157+
- /cadence/.build/ # ensure we don't mount the build directory
158+
- /cadence/.bin/ # ensure we don't mount the bin directory
159+
networks:
160+
services-network:
161+
aliases:
162+
- integration-test
163+
133164
integration-test-mysql:
134165
build:
135166
context: ../../

docker/github_actions/docker-compose.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,32 @@ services:
159159
aliases:
160160
- integration-test
161161

162+
integration-test-cassandra-queue-v2-alert:
163+
build:
164+
context: ../../
165+
dockerfile: ./docker/github_actions/Dockerfile${DOCKERFILE_SUFFIX}
166+
environment:
167+
- "CASSANDRA=1"
168+
- "CASSANDRA_SEEDS=cassandra"
169+
- "ES_SEEDS=elasticsearch"
170+
- "KAFKA_SEEDS=kafka"
171+
- "TEST_TAG=esintegration"
172+
- "ENABLE_QUEUE_V2=true"
173+
- "ENABLE_QUEUE_V2_ALERT=true"
174+
depends_on:
175+
cassandra:
176+
condition: service_healthy
177+
elasticsearch:
178+
condition: service_started
179+
kafka:
180+
condition: service_started
181+
volumes:
182+
- ../../:/cadence
183+
networks:
184+
services-network:
185+
aliases:
186+
- integration-test
187+
162188
integration-test-mysql:
163189
build:
164190
context: ../../

host/async_wf_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func (s *AsyncWFIntegrationSuite) SetupSuite() {
9999
EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true),
100100
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
101101
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
102-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
103102
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
103+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
104104
}
105105
params := pt.TestBaseParams{
106106
DefaultTestCluster: s.DefaultTestCluster,

host/integration_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ func TestIntegrationSuite(t *testing.T) {
5050
flag.Parse()
5151

5252
configPath := "testdata/integration_test_cluster.yaml"
53+
// TODO: remove this logic once we deprecate history queue v1
5354
if os.Getenv("ENABLE_QUEUE_V2") == "true" {
5455
configPath = "testdata/integration_queuev2_cluster.yaml"
56+
if os.Getenv("ENABLE_QUEUE_V2_ALERT") == "true" {
57+
configPath = "testdata/integration_queuev2_with_alert_cluster.yaml"
58+
}
5559
}
5660
clusterConfig, err := GetTestClusterConfig(configPath)
5761
if err != nil {

host/integrationbase.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ func (s *IntegrationBase) setupSuite() {
116116
EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true),
117117
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
118118
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
119-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
120119
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
120+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
121121
}
122122
params := pt.TestBaseParams{
123123
DefaultTestCluster: s.DefaultTestCluster,

host/ndc/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ func (s *NDCIntegrationTestSuite) SetupSuite() {
102102
EnableCassandraAllConsistencyLevelDelete: dynamicproperties.GetBoolPropertyFn(true),
103103
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
104104
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
105-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
106105
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
106+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
107107
}
108108
params := pt.TestBaseParams{
109109
DefaultTestCluster: s.defaultTestCluster,

host/pinot_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ func (s *PinotIntegrationSuite) SetupSuite() {
114114
EnableShardIDMetrics: dynamicproperties.GetBoolPropertyFn(true),
115115
EnableHistoryTaskDualWriteMode: dynamicproperties.GetBoolPropertyFn(true),
116116
ReadNoSQLHistoryTaskFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
117-
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(false),
118117
SerializationEncoding: dynamicproperties.GetStringPropertyFn(string(constants.EncodingTypeThriftRW)),
118+
ReadNoSQLShardFromDataBlob: dynamicproperties.GetBoolPropertyFn(true),
119119
}
120120
params := pt.TestBaseParams{
121121
DefaultTestCluster: s.DefaultTestCluster,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
frontend.warmupDuration:
2+
- value: "1s"
3+
constraints: {}
4+
history.enableTimerQueueV2:
5+
- value: true
6+
constraints: {}
7+
history.enableTransferQueueV2:
8+
- value: true
9+
constraints: {}
10+
history.shardUpdateMinInterval:
11+
- value: 3s
12+
constraints: {}
13+
history.timerProcessorUpdateAckInterval:
14+
- value: 5s
15+
constraints: {}
16+
history.timerProcessorUpdateAckIntervalJitterCoefficient:
17+
- value: 0
18+
constraints: {}
19+
history.transferProcessorUpdateAckInterval:
20+
- value: 5s
21+
constraints: {}
22+
history.transferProcessorUpdateAckIntervalJitterCoefficient:
23+
- value: 0
24+
constraints: {}
25+
history.queueProcessorPollBackoffInterval:
26+
- value: 5s
27+
constraints: {}
28+
history.virtualSliceForceAppendInterval:
29+
- value: 100ms
30+
constraints: {}
31+
# Only Enable 1 level of split, which has been verified in simulation
32+
history.queueMaxVirtualQueueCount:
33+
- value: 2
34+
constraints: {}
35+
# Enable task rate limiter so that the number of pending tasks increases
36+
history.taskSchedulerEnableRateLimiter:
37+
- value: true
38+
constraints: {}
39+
history.taskSchedulerEnableRateLimiterShadowMode:
40+
- value: false
41+
constraints: {}
42+
history.taskSchedulerGlobalDomainRPS:
43+
- value: 30
44+
constraints: {}
45+
# Enable pending task queue alert for integration test
46+
history.enableTransferQueueV2PendingTaskCountAlert:
47+
- value: true
48+
constraints: {}
49+
history.enableTimerQueueV2PendingTaskCountAlert:
50+
- value: true
51+
constraints: {}
52+
# Set a low number to trigger queue pause with load from tests
53+
history.queueMaxPendingTaskCount:
54+
- value: 20
55+
constraints: {}
56+
# Set a low number to trigger queue split with load from tests
57+
history.queueCriticalPendingTaskCount:
58+
- value: 18
59+
constraints: {}

0 commit comments

Comments
 (0)