Skip to content

Commit 9284a2a

Browse files
authored
feat(domain multi-tenancy): Update persistence layer to use the new fields in transfer task and timer task (#7729)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** Update persistence layer to store the new fields of transfer task and timer task into the underlying database <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** To support Domain multi-tenancy, we want all history tasks to be tagged by their task lists. #7724 <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** unit tests ``` cd common/persistence && go test ./... ``` <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** N/A <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain)
1 parent 19e3901 commit 9284a2a

File tree

13 files changed

+520
-172
lines changed

13 files changed

+520
-172
lines changed

common/persistence/data_manager_interfaces.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ type (
487487
ScheduleID int64
488488
Version int64
489489
RecordVisibility bool
490+
OriginalTaskList string
490491
}
491492

492493
// CrossClusterTaskInfo describes a cross-cluster task
@@ -524,6 +525,7 @@ type (
524525
EventID int64
525526
ScheduleAttempt int64
526527
Version int64
528+
TaskList string
527529
}
528530

529531
// TaskListInfo describes a state of a task list implementation.
@@ -1851,6 +1853,16 @@ func (t *TransferTaskInfo) GetDomainID() string {
18511853
return t.DomainID
18521854
}
18531855

1856+
// GetTaskList returns the task list for transfer task
1857+
func (t *TransferTaskInfo) GetTaskList() string {
1858+
return t.TaskList
1859+
}
1860+
1861+
// GetOriginalTaskList returns the original task list for transfer task
1862+
func (t *TransferTaskInfo) GetOriginalTaskList() string {
1863+
return t.OriginalTaskList
1864+
}
1865+
18541866
// String returns a string representation for transfer task
18551867
func (t *TransferTaskInfo) String() string {
18561868
return fmt.Sprintf("%#v", t)
@@ -1883,39 +1895,50 @@ func (t *TransferTaskInfo) ToTask() (Task, error) {
18831895
TargetDomainID: t.TargetDomainID,
18841896
TaskList: t.TaskList,
18851897
ScheduleID: t.ScheduleID,
1898+
OriginalTaskList: t.OriginalTaskList,
18861899
}, nil
18871900
case TransferTaskTypeCloseExecution:
18881901
return &CloseExecutionTask{
18891902
WorkflowIdentifier: workflowIdentifier,
18901903
TaskData: taskData,
1904+
TaskList: t.TaskList,
18911905
}, nil
18921906
case TransferTaskTypeRecordWorkflowStarted:
18931907
return &RecordWorkflowStartedTask{
18941908
WorkflowIdentifier: workflowIdentifier,
18951909
TaskData: taskData,
1910+
TaskList: t.TaskList,
18961911
}, nil
18971912
case TransferTaskTypeResetWorkflow:
18981913
return &ResetWorkflowTask{
18991914
WorkflowIdentifier: workflowIdentifier,
19001915
TaskData: taskData,
1916+
TaskList: t.TaskList,
19011917
}, nil
19021918
case TransferTaskTypeRecordWorkflowClosed:
19031919
return &RecordWorkflowClosedTask{
19041920
WorkflowIdentifier: workflowIdentifier,
19051921
TaskData: taskData,
1922+
TaskList: t.TaskList,
19061923
}, nil
19071924
case TransferTaskTypeRecordChildExecutionCompleted:
1925+
targetRunID := t.TargetRunID
1926+
if t.TargetRunID == TransferTaskTransferTargetRunID {
1927+
targetRunID = ""
1928+
}
19081929
return &RecordChildExecutionCompletedTask{
19091930
WorkflowIdentifier: workflowIdentifier,
19101931
TaskData: taskData,
19111932
TargetDomainID: t.TargetDomainID,
19121933
TargetWorkflowID: t.TargetWorkflowID,
1913-
TargetRunID: t.TargetRunID,
1934+
TargetRunID: targetRunID,
1935+
TaskList: t.TaskList,
19141936
}, nil
19151937
case TransferTaskTypeUpsertWorkflowSearchAttributes:
19161938
return &UpsertWorkflowSearchAttributesTask{
19171939
WorkflowIdentifier: workflowIdentifier,
19181940
TaskData: taskData,
1941+
TaskList: t.TaskList,
19191942
}, nil
19201943
case TransferTaskTypeStartChildExecution:
19211944
return &StartChildExecutionTask{
@@ -1924,26 +1947,37 @@ func (t *TransferTaskInfo) ToTask() (Task, error) {
19241947
TargetDomainID: t.TargetDomainID,
19251948
TargetWorkflowID: t.TargetWorkflowID,
19261949
InitiatedID: t.ScheduleID,
1950+
TaskList: t.TaskList,
19271951
}, nil
19281952
case TransferTaskTypeCancelExecution:
1953+
targetRunID := t.TargetRunID
1954+
if t.TargetRunID == TransferTaskTransferTargetRunID {
1955+
targetRunID = ""
1956+
}
19291957
return &CancelExecutionTask{
19301958
WorkflowIdentifier: workflowIdentifier,
19311959
TaskData: taskData,
19321960
TargetDomainID: t.TargetDomainID,
19331961
TargetWorkflowID: t.TargetWorkflowID,
1934-
TargetRunID: t.TargetRunID,
1962+
TargetRunID: targetRunID,
19351963
InitiatedID: t.ScheduleID,
19361964
TargetChildWorkflowOnly: t.TargetChildWorkflowOnly,
1965+
TaskList: t.TaskList,
19371966
}, nil
19381967
case TransferTaskTypeSignalExecution:
1968+
targetRunID := t.TargetRunID
1969+
if t.TargetRunID == TransferTaskTransferTargetRunID {
1970+
targetRunID = ""
1971+
}
19391972
return &SignalExecutionTask{
19401973
WorkflowIdentifier: workflowIdentifier,
19411974
TaskData: taskData,
19421975
TargetDomainID: t.TargetDomainID,
19431976
TargetWorkflowID: t.TargetWorkflowID,
1944-
TargetRunID: t.TargetRunID,
1977+
TargetRunID: targetRunID,
19451978
InitiatedID: t.ScheduleID,
19461979
TargetChildWorkflowOnly: t.TargetChildWorkflowOnly,
1980+
TaskList: t.TaskList,
19471981
}, nil
19481982
default:
19491983
return nil, fmt.Errorf("unknown task type: %d", t.TaskType)
@@ -2027,6 +2061,11 @@ func (t *TimerTaskInfo) GetDomainID() string {
20272061
return t.DomainID
20282062
}
20292063

2064+
// GetTaskList returns the task list for timer task
2065+
func (t *TimerTaskInfo) GetTaskList() string {
2066+
return t.TaskList
2067+
}
2068+
20302069
// String returns a string representation for timer task
20312070
func (t *TimerTaskInfo) String() string {
20322071
return fmt.Sprintf(
@@ -2054,6 +2093,7 @@ func (t *TimerTaskInfo) ToTask() (Task, error) {
20542093
EventID: t.EventID,
20552094
ScheduleAttempt: t.ScheduleAttempt,
20562095
TimeoutType: t.TimeoutType,
2096+
TaskList: t.TaskList,
20572097
}, nil
20582098
case TaskTypeActivityTimeout:
20592099
return &ActivityTimeoutTask{
@@ -2062,35 +2102,41 @@ func (t *TimerTaskInfo) ToTask() (Task, error) {
20622102
TimeoutType: t.TimeoutType,
20632103
EventID: t.EventID,
20642104
Attempt: t.ScheduleAttempt,
2105+
TaskList: t.TaskList,
20652106
}, nil
20662107
case TaskTypeDeleteHistoryEvent:
20672108
return &DeleteHistoryEventTask{
20682109
WorkflowIdentifier: workflowIdentifier,
20692110
TaskData: taskData,
2111+
TaskList: t.TaskList,
20702112
}, nil
20712113
case TaskTypeWorkflowTimeout:
20722114
return &WorkflowTimeoutTask{
20732115
WorkflowIdentifier: workflowIdentifier,
20742116
TaskData: taskData,
2117+
TaskList: t.TaskList,
20752118
}, nil
20762119
case TaskTypeUserTimer:
20772120
return &UserTimerTask{
20782121
WorkflowIdentifier: workflowIdentifier,
20792122
TaskData: taskData,
20802123
EventID: t.EventID,
2124+
TaskList: t.TaskList,
20812125
}, nil
20822126
case TaskTypeActivityRetryTimer:
20832127
return &ActivityRetryTimerTask{
20842128
WorkflowIdentifier: workflowIdentifier,
20852129
TaskData: taskData,
20862130
EventID: t.EventID,
20872131
Attempt: t.ScheduleAttempt,
2132+
TaskList: t.TaskList,
20882133
}, nil
20892134
case TaskTypeWorkflowBackoffTimer:
20902135
return &WorkflowBackoffTimerTask{
20912136
WorkflowIdentifier: workflowIdentifier,
20922137
TaskData: taskData,
20932138
TimeoutType: t.TimeoutType,
2139+
TaskList: t.TaskList,
20942140
}, nil
20952141
default:
20962142
return nil, fmt.Errorf("unknown task type: %d", t.TaskType)

common/persistence/data_manager_interfaces_mock.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/nosql/nosql_execution_store_util.go

Lines changed: 6 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -219,57 +219,9 @@ func (d *nosqlExecutionStore) prepareTimerTasksForWorkflowTxn(domainID, workflow
219219
var tasks []*nosqlplugin.HistoryMigrationTask
220220

221221
for _, task := range timerTasks {
222-
var eventID int64
223-
var attempt int64
224-
225-
timeoutType := 0
226-
227-
switch t := task.(type) {
228-
case *persistence.DecisionTimeoutTask:
229-
eventID = t.EventID
230-
timeoutType = t.TimeoutType
231-
attempt = t.ScheduleAttempt
232-
233-
case *persistence.ActivityTimeoutTask:
234-
eventID = t.EventID
235-
timeoutType = t.TimeoutType
236-
attempt = t.Attempt
237-
238-
case *persistence.UserTimerTask:
239-
eventID = t.EventID
240-
241-
case *persistence.ActivityRetryTimerTask:
242-
eventID = t.EventID
243-
attempt = int64(t.Attempt)
244-
245-
case *persistence.WorkflowBackoffTimerTask:
246-
timeoutType = t.TimeoutType
247-
248-
case *persistence.WorkflowTimeoutTask:
249-
// noop
250-
251-
case *persistence.DeleteHistoryEventTask:
252-
// noop
253-
254-
default:
255-
return nil, &types.InternalServiceError{
256-
Message: fmt.Sprintf("Unknow timer type: %v", task.GetTaskType()),
257-
}
258-
}
259-
260-
nt := &nosqlplugin.TimerTask{
261-
TaskType: task.GetTaskType(),
262-
DomainID: domainID,
263-
WorkflowID: workflowID,
264-
RunID: runID,
265-
266-
VisibilityTimestamp: task.GetVisibilityTimestamp(),
267-
TaskID: task.GetTaskID(),
268-
269-
TimeoutType: timeoutType,
270-
EventID: eventID,
271-
ScheduleAttempt: attempt,
272-
Version: task.GetVersion(),
222+
nt, err := task.ToTimerTaskInfo()
223+
if err != nil {
224+
return nil, err
273225
}
274226
var blob *persistence.DataBlob
275227
if d.dc.EnableHistoryTaskDualWriteMode() {
@@ -387,85 +339,9 @@ func (d *nosqlExecutionStore) prepareTransferTasksForWorkflowTxn(domainID, workf
387339
var tasks []*nosqlplugin.HistoryMigrationTask
388340

389341
for _, task := range transferTasks {
390-
var taskList string
391-
var scheduleID int64
392-
targetDomainID := domainID
393-
targetDomainIDs := map[string]struct{}{}
394-
targetWorkflowID := persistence.TransferTaskTransferTargetWorkflowID
395-
targetRunID := persistence.TransferTaskTransferTargetRunID
396-
targetChildWorkflowOnly := false
397-
398-
switch task.GetTaskType() {
399-
case persistence.TransferTaskTypeActivityTask:
400-
targetDomainID = task.(*persistence.ActivityTask).TargetDomainID
401-
taskList = task.(*persistence.ActivityTask).TaskList
402-
scheduleID = task.(*persistence.ActivityTask).ScheduleID
403-
404-
case persistence.TransferTaskTypeDecisionTask:
405-
targetDomainID = task.(*persistence.DecisionTask).TargetDomainID
406-
taskList = task.(*persistence.DecisionTask).TaskList
407-
scheduleID = task.(*persistence.DecisionTask).ScheduleID
408-
409-
case persistence.TransferTaskTypeCancelExecution:
410-
targetDomainID = task.(*persistence.CancelExecutionTask).TargetDomainID
411-
targetWorkflowID = task.(*persistence.CancelExecutionTask).TargetWorkflowID
412-
targetRunID = task.(*persistence.CancelExecutionTask).TargetRunID
413-
if targetRunID == "" {
414-
targetRunID = persistence.TransferTaskTransferTargetRunID
415-
}
416-
targetChildWorkflowOnly = task.(*persistence.CancelExecutionTask).TargetChildWorkflowOnly
417-
scheduleID = task.(*persistence.CancelExecutionTask).InitiatedID
418-
419-
case persistence.TransferTaskTypeSignalExecution:
420-
targetDomainID = task.(*persistence.SignalExecutionTask).TargetDomainID
421-
targetWorkflowID = task.(*persistence.SignalExecutionTask).TargetWorkflowID
422-
targetRunID = task.(*persistence.SignalExecutionTask).TargetRunID
423-
if targetRunID == "" {
424-
targetRunID = persistence.TransferTaskTransferTargetRunID
425-
}
426-
targetChildWorkflowOnly = task.(*persistence.SignalExecutionTask).TargetChildWorkflowOnly
427-
scheduleID = task.(*persistence.SignalExecutionTask).InitiatedID
428-
429-
case persistence.TransferTaskTypeStartChildExecution:
430-
targetDomainID = task.(*persistence.StartChildExecutionTask).TargetDomainID
431-
targetWorkflowID = task.(*persistence.StartChildExecutionTask).TargetWorkflowID
432-
scheduleID = task.(*persistence.StartChildExecutionTask).InitiatedID
433-
434-
case persistence.TransferTaskTypeRecordChildExecutionCompleted:
435-
targetDomainID = task.(*persistence.RecordChildExecutionCompletedTask).TargetDomainID
436-
targetWorkflowID = task.(*persistence.RecordChildExecutionCompletedTask).TargetWorkflowID
437-
targetRunID = task.(*persistence.RecordChildExecutionCompletedTask).TargetRunID
438-
if targetRunID == "" {
439-
targetRunID = persistence.TransferTaskTransferTargetRunID
440-
}
441-
442-
case persistence.TransferTaskTypeCloseExecution,
443-
persistence.TransferTaskTypeRecordWorkflowStarted,
444-
persistence.TransferTaskTypeResetWorkflow,
445-
persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
446-
persistence.TransferTaskTypeRecordWorkflowClosed:
447-
// No explicit property needs to be set
448-
449-
default:
450-
return nil, &types.InternalServiceError{
451-
Message: fmt.Sprintf("Unknown transfer type: %v", task.GetTaskType()),
452-
}
453-
}
454-
t := &nosqlplugin.TransferTask{
455-
TaskType: task.GetTaskType(),
456-
DomainID: domainID,
457-
WorkflowID: workflowID,
458-
RunID: runID,
459-
VisibilityTimestamp: task.GetVisibilityTimestamp(),
460-
TaskID: task.GetTaskID(),
461-
TargetDomainID: targetDomainID,
462-
TargetDomainIDs: targetDomainIDs,
463-
TargetWorkflowID: targetWorkflowID,
464-
TargetRunID: targetRunID,
465-
TargetChildWorkflowOnly: targetChildWorkflowOnly,
466-
TaskList: taskList,
467-
ScheduleID: scheduleID,
468-
Version: task.GetVersion(),
342+
t, err := task.ToTransferTaskInfo()
343+
if err != nil {
344+
return nil, err
469345
}
470346
var blob *persistence.DataBlob
471347
if d.dc.EnableHistoryTaskDualWriteMode() {

0 commit comments

Comments
 (0)