Skip to content

Commit 33f755a

Browse files
authored
Task manager - task cleanup on passive side using task completer (#6514)
* Task manager - task cleanup on passive side using task completer
1 parent 9d268a0 commit 33f755a

File tree

16 files changed

+945
-5
lines changed

16 files changed

+945
-5
lines changed

common/dynamicconfig/constants.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,6 +1648,12 @@ const (
16481648
// Value type: Bool
16491649
// Default value: false
16501650
MatchingEnableTasklistGuardAgainstOwnershipShardLoss
1651+
// MatchingEnableStandbyTaskCompletion is to enable completion of tasks in the domain's passive side
1652+
// KeyName: matching.enableStandbyTaskCompletion
1653+
// Value type: Bool
1654+
// Default value: false
1655+
// Allowed filters: DomainName,TasklistName,TasklistType
1656+
MatchingEnableStandbyTaskCompletion
16511657

16521658
MatchingEnableGetNumberOfPartitionsFromCache
16531659
MatchingEnableAdaptiveScaler
@@ -4026,6 +4032,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
40264032
Description: "MatchingEnableGetNumberOfPartitionsFromCache is to enable getting number of partitions from cache instead of dynamic config",
40274033
DefaultValue: false,
40284034
},
4035+
MatchingEnableStandbyTaskCompletion: {
4036+
KeyName: "matching.enableStandbyTaskCompletion",
4037+
Filters: []Filter{DomainName, TaskListName, TaskType},
4038+
Description: "MatchingEnableStandbyTaskCompletion is to enable completion of tasks in the domain's passive side",
4039+
DefaultValue: false,
4040+
},
40294041
MatchingEnableAdaptiveScaler: {
40304042
KeyName: "matching.enableAdaptiveScaler",
40314043
Filters: []Filter{DomainName, TaskListName, TaskType},

common/metrics/defs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2634,6 +2634,9 @@ const (
26342634
EstimatedAddTaskQPSGauge
26352635
TaskListPartitionUpscaleThresholdGauge
26362636
TaskListPartitionDownscaleThresholdGauge
2637+
StandbyClusterTasksCompletedCounterPerTaskList
2638+
StandbyClusterTasksNotStartedCounterPerTaskList
2639+
StandbyClusterTasksCompletionFailurePerTaskList
26372640

26382641
NumMatchingMetrics
26392642
)
@@ -3326,6 +3329,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
33263329
EstimatedAddTaskQPSGauge: {metricName: "estimated_add_task_qps_per_tl", metricType: Gauge},
33273330
TaskListPartitionUpscaleThresholdGauge: {metricName: "tasklist_partition_upscale_threshold", metricType: Gauge},
33283331
TaskListPartitionDownscaleThresholdGauge: {metricName: "tasklist_partition_downscale_threshold", metricType: Gauge},
3332+
StandbyClusterTasksCompletedCounterPerTaskList: {metricName: "standby_cluster_tasks_completed_per_tl", metricType: Counter},
3333+
StandbyClusterTasksNotStartedCounterPerTaskList: {metricName: "standby_cluster_tasks_not_started_per_tl", metricType: Counter},
3334+
StandbyClusterTasksCompletionFailurePerTaskList: {metricName: "standby_cluster_tasks_completion_failure_per_tl", metricType: Counter},
33293335
},
33303336
Worker: {
33313337
ReplicatorMessages: {metricName: "replicator_messages"},

common/util.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ const (
8080
replicationServiceBusyMaxInterval = 10 * time.Second
8181
replicationServiceBusyExpirationInterval = 5 * time.Minute
8282

83+
taskCompleterInitialInterval = 1 * time.Second
84+
taskCompleterMaxInterval = 10 * time.Second
85+
taskCompleterExpirationInterval = 5 * time.Minute
86+
8387
contextExpireThreshold = 10 * time.Millisecond
8488

8589
// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
@@ -203,6 +207,15 @@ func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
203207
return policy
204208
}
205209

210+
// CreateTaskCompleterRetryPolicy creates a retry policy to handle tasks not being started
211+
func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy {
212+
policy := backoff.NewExponentialRetryPolicy(taskCompleterInitialInterval)
213+
policy.SetMaximumInterval(taskCompleterMaxInterval)
214+
policy.SetExpirationInterval(taskCompleterExpirationInterval)
215+
216+
return policy
217+
}
218+
206219
// IsValidIDLength checks if id is valid according to its length
207220
func IsValidIDLength(
208221
id string,

config/dynamicconfig/development.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ frontend.enableClientVersionCheck:
44
system.minRetentionDays:
55
- value: 0
66
constraints: {}
7+
matching.enableStandbyTaskCompletion:
8+
- value: true
9+
constraints: {}
10+
history.standbyClusterDelay:
11+
- value: 30s
12+
constraints: {}
13+
history.standbyTaskMissingEventsResendDelay:
14+
- value: 30s
15+
constraints: {}
716
history.EnableConsistentQueryByDomain:
817
- value: true
918
constraints: {}

service/history/constants/test_constants.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,22 @@ var (
134134
},
135135
TestVersion,
136136
)
137+
138+
// TestGlobalStandbyDomainEntry is the global standby domain cache entry for test
139+
TestGlobalStandbyDomainEntry = cache.NewGlobalDomainCacheEntryForTest(
140+
&persistence.DomainInfo{ID: TestDomainID, Name: TestDomainName},
141+
&persistence.DomainConfig{
142+
Retention: 1,
143+
VisibilityArchivalStatus: types.ArchivalStatusEnabled,
144+
VisibilityArchivalURI: "test:///visibility/archival",
145+
},
146+
&persistence.DomainReplicationConfig{
147+
ActiveClusterName: cluster.TestAlternativeClusterName,
148+
Clusters: []*persistence.ClusterReplicationConfig{
149+
{ClusterName: cluster.TestCurrentClusterName},
150+
{ClusterName: cluster.TestAlternativeClusterName},
151+
},
152+
},
153+
TestVersion,
154+
)
137155
)

service/matching/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type (
6060
PartitionDownscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
6161
AdaptiveScalerUpdateInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
6262
EnableAdaptiveScaler dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
63+
EnableStandbyTaskCompletion dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
6364

6465
// Time to hold a poll request before returning an empty response if there are no tasks
6566
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
@@ -138,6 +139,8 @@ type (
138139
TaskDispatchRPSTTL time.Duration
139140
// task gc configuration
140141
MaxTimeBetweenTaskDeletes time.Duration
142+
// standby task completion configuration
143+
EnableStandbyTaskCompletion func() bool
141144
}
142145
)
143146

@@ -189,5 +192,6 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups
189192
TaskDispatchRPSTTL: time.Minute,
190193
MaxTimeBetweenTaskDeletes: time.Second,
191194
AllIsolationGroups: getIsolationGroups,
195+
EnableStandbyTaskCompletion: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableStandbyTaskCompletion),
192196
}
193197
}

service/matching/config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func TestNewConfig(t *testing.T) {
8787
"PartitionDownscaleSustainedDuration": {dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Duration(33)},
8888
"AdaptiveScalerUpdateInterval": {dynamicconfig.MatchingAdaptiveScalerUpdateInterval, time.Duration(34)},
8989
"EnableAdaptiveScaler": {dynamicconfig.MatchingEnableAdaptiveScaler, true},
90+
"EnableStandbyTaskCompletion": {dynamicconfig.MatchingEnableStandbyTaskCompletion, false},
9091
}
9192
client := dynamicconfig.NewInMemoryClient()
9293
for fieldName, expected := range fields {

service/matching/handler/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
245245
e.config,
246246
e.timeSource,
247247
e.timeSource.Now(),
248+
e.historyService,
248249
)
249250
if err != nil {
250251
e.taskListsLock.Unlock()

service/matching/handler/engine_integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ func (s *matchingEngineSuite) TestOnlyUnloadMatchingInstance() {
225225
&tlKind,
226226
s.matchingEngine.config,
227227
s.matchingEngine.timeSource,
228-
s.matchingEngine.timeSource.Now())
228+
s.matchingEngine.timeSource.Now(),
229+
s.matchingEngine.historyService)
229230
s.Require().NoError(err)
230231

231232
// try to unload a different tlm instance with the same taskListID

service/matching/tasklist/interfaces.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist Manager
2424
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist TaskMatcher
2525
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist Forwarder
26+
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist TaskCompleter
2627

2728
package tasklist
2829

@@ -83,4 +84,8 @@ type (
8384
AddReqTokenC() <-chan *ForwarderReqToken
8485
PollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken
8586
}
87+
88+
TaskCompleter interface {
89+
CompleteTaskIfStarted(ctx context.Context, task *InternalTask) error
90+
}
8691
)

0 commit comments

Comments
 (0)