Skip to content

Commit 07524f3

Browse files
committed
feat(domain multi-tenancy): Store normalized task list name in history tasks
1 parent 9284a2a commit 07524f3

File tree

5 files changed

+100
-16
lines changed

5 files changed

+100
-16
lines changed

common/persistence/tasks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ type Task interface {
4040
GetDomainID() string
4141
GetWorkflowID() string
4242
GetRunID() string
43+
// GetTaskList returns the name of the task list the task is currently
44+
// associated with. This may differ from the original task list if the
45+
// task is a sticky decision task.
4346
GetTaskList() string
47+
// GetOriginalTaskList returns the task list on which the task was initially
48+
// scheduled. It is used to enforce rate limits and ensure fair scheduling
49+
// across task lists.
4450
GetOriginalTaskList() string
4551
GetVersion() int64
4652
SetVersion(version int64)

service/history/execution/mutable_state_task_generator.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowStartTasks(
145145
VisibilityTimestamp: workflowTimeoutTimestamp,
146146
Version: startVersion,
147147
},
148+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
148149
})
149150

150151
return nil
@@ -156,6 +157,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
156157
) error {
157158

158159
executionInfo := r.mutableState.GetExecutionInfo()
160+
taskList := getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind)
159161
r.mutableState.AddTransferTasks(&persistence.CloseExecutionTask{
160162
WorkflowIdentifier: persistence.WorkflowIdentifier{
161163
DomainID: executionInfo.DomainID,
@@ -166,6 +168,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
166168
// TaskID and VisibilityTimestamp are set by shard context
167169
Version: closeEvent.Version,
168170
},
171+
TaskList: taskList,
169172
})
170173

171174
retentionInDays := defaultWorkflowRetentionInDays
@@ -202,6 +205,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks(
202205
VisibilityTimestamp: closeTimestamp.Add(retentionDuration),
203206
Version: closeEvent.Version,
204207
},
208+
TaskList: taskList,
205209
})
206210

207211
return nil
@@ -250,6 +254,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateDelayedDecisionTasks(
250254
Version: startVersion,
251255
},
252256
TimeoutType: firstDecisionDelayType,
257+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
253258
})
254259

255260
return nil
@@ -272,6 +277,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateRecordWorkflowStartedTasks(
272277
// TaskID and VisibilityTimestamp are set by shard context
273278
Version: startVersion,
274279
},
280+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
275281
})
276282

277283
return nil
@@ -291,6 +297,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateDecisionScheduleTasks(
291297
}
292298
}
293299

300+
originalTaskList := getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind)
294301
r.mutableState.AddTransferTasks(&persistence.DecisionTask{
295302
WorkflowIdentifier: persistence.WorkflowIdentifier{
296303
DomainID: executionInfo.DomainID,
@@ -301,9 +308,10 @@ func (r *mutableStateTaskGeneratorImpl) GenerateDecisionScheduleTasks(
301308
// TaskID and VisibilityTimestamp are set by shard context
302309
Version: decision.Version,
303310
},
304-
TargetDomainID: executionInfo.DomainID,
305-
TaskList: decision.TaskList,
306-
ScheduleID: decision.ScheduleID,
311+
TargetDomainID: executionInfo.DomainID,
312+
TaskList: decision.TaskList,
313+
ScheduleID: decision.ScheduleID,
314+
OriginalTaskList: originalTaskList,
307315
})
308316

309317
if scheduleToStartTimeout := r.mutableState.GetDecisionScheduleToStartTimeout(); scheduleToStartTimeout != 0 {
@@ -322,6 +330,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateDecisionScheduleTasks(
322330
TimeoutType: int(TimerTypeScheduleToStart),
323331
EventID: decision.ScheduleID,
324332
ScheduleAttempt: decision.Attempt,
333+
TaskList: originalTaskList,
325334
})
326335
}
327336

@@ -369,6 +378,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateDecisionStartTasks(
369378
TimeoutType: int(TimerTypeStartToClose),
370379
EventID: decision.ScheduleID,
371380
ScheduleAttempt: decision.Attempt,
381+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
372382
})
373383

374384
return nil
@@ -445,8 +455,9 @@ func (r *mutableStateTaskGeneratorImpl) GenerateActivityRetryTasks(
445455
Version: ai.Version,
446456
VisibilityTimestamp: ai.ScheduledTime,
447457
},
448-
EventID: ai.ScheduleID,
449-
Attempt: int64(ai.Attempt),
458+
EventID: ai.ScheduleID,
459+
Attempt: int64(ai.Attempt),
460+
TaskList: getNormalizedTaskListName(ai.TaskList, ai.TaskListKind),
450461
})
451462
return nil
452463
}
@@ -490,6 +501,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateChildWorkflowTasks(
490501
TargetDomainID: targetDomainID,
491502
TargetWorkflowID: childWorkflowInfo.StartedWorkflowID,
492503
InitiatedID: childWorkflowInfo.InitiatedID,
504+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
493505
}
494506

495507
r.mutableState.AddTransferTasks(startChildExecutionTask)
@@ -536,6 +548,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateRequestCancelExternalTasks(
536548
TargetRunID: targetRunID,
537549
TargetChildWorkflowOnly: targetChildOnly,
538550
InitiatedID: scheduleID,
551+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
539552
}
540553

541554
r.mutableState.AddTransferTasks(cancelExecutionTask)
@@ -583,6 +596,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateSignalExternalTasks(
583596
TargetRunID: targetRunID,
584597
TargetChildWorkflowOnly: targetChildOnly,
585598
InitiatedID: scheduleID,
599+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
586600
}
587601

588602
r.mutableState.AddTransferTasks(signalExecutionTask)
@@ -605,6 +619,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowSearchAttrTasks() error
605619
// TaskID and VisibilityTimestamp are set by shard context
606620
Version: currentVersion, // task processing does not check this version
607621
},
622+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
608623
})
609624

610625
return nil
@@ -625,6 +640,7 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowResetTasks() error {
625640
// TaskID and VisibilityTimestamp are set by shard context
626641
Version: currentVersion,
627642
},
643+
TaskList: getNormalizedTaskListName(executionInfo.TaskList, executionInfo.TaskListKind),
628644
})
629645

630646
return nil
@@ -681,6 +697,13 @@ func (r *mutableStateTaskGeneratorImpl) validateChildWorkflowParameters(msbDomai
681697
return nil
682698
}
683699

700+
func getNormalizedTaskListName(name string, kind types.TaskListKind) string {
701+
if kind == types.TaskListKindEphemeral {
702+
return "__ephemeral__"
703+
}
704+
return name
705+
}
706+
684707
func getNextDecisionTimeout(attempt int64, defaultStartToCloseTimeout time.Duration) time.Duration {
685708
if attempt <= 1 {
686709
return defaultStartToCloseTimeout

0 commit comments

Comments
 (0)