Skip to content

Commit 10e75b6

Browse files
authored
Return ActiveClusterSelectionPolicy on DescribeWorkflow requests (#7181)
1 parent 4196948 commit 10e75b6

File tree

3 files changed

+1277
-346
lines changed

3 files changed

+1277
-346
lines changed

service/history/engine/engineimpl/describe_workflow_execution.go

Lines changed: 196 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/uber/cadence/common"
29+
"github.com/uber/cadence/common/cache"
2930
"github.com/uber/cadence/common/constants"
3031
"github.com/uber/cadence/common/persistence"
3132
"github.com/uber/cadence/common/types"
@@ -37,8 +38,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
3738
ctx context.Context,
3839
request *types.HistoryDescribeWorkflowExecutionRequest,
3940
) (retResp *types.DescribeWorkflowExecutionResponse, retError error) {
40-
41-
if err := common.ValidateDomainUUID(request.DomainUUID); err != nil {
41+
if err := validateDescribeWorkflowExecutionRequest(request); err != nil {
4242
return nil, err
4343
}
4444

@@ -55,41 +55,37 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
5555
if err1 != nil {
5656
return nil, err1
5757
}
58+
5859
// If history is corrupted, return an error to the end user
5960
if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
6061
return nil, err
6162
} else if corrupted {
6263
return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."}
6364
}
6465

66+
domainCache := e.shard.GetDomainCache()
67+
result, err := createDescribeWorkflowExecutionResponse(ctx, mutableState, domainCache)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
return result, nil
73+
}
74+
75+
// validateDescribeWorkflowExecutionRequest validates the input request
76+
func validateDescribeWorkflowExecutionRequest(request *types.HistoryDescribeWorkflowExecutionRequest) error {
77+
return common.ValidateDomainUUID(request.DomainUUID)
78+
}
79+
80+
func createDescribeWorkflowExecutionResponse(ctx context.Context, mutableState execution.MutableState, domainCache cache.DomainCache) (*types.DescribeWorkflowExecutionResponse, error) {
6581
executionInfo := mutableState.GetExecutionInfo()
82+
executionConfiguration, err := mapWorkflowExecutionConfiguration(executionInfo)
83+
if err != nil {
84+
return nil, err
85+
}
6686

6787
result := &types.DescribeWorkflowExecutionResponse{
68-
ExecutionConfiguration: &types.WorkflowExecutionConfiguration{
69-
TaskList: &types.TaskList{Name: executionInfo.TaskList},
70-
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.WorkflowTimeout),
71-
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.DecisionStartToCloseTimeout),
72-
},
73-
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
74-
Execution: &types.WorkflowExecution{
75-
WorkflowID: executionInfo.WorkflowID,
76-
RunID: executionInfo.RunID,
77-
},
78-
TaskList: &types.TaskList{
79-
Name: executionInfo.TaskList,
80-
Kind: executionInfo.TaskListKind.Ptr(),
81-
},
82-
Type: &types.WorkflowType{Name: executionInfo.WorkflowTypeName},
83-
StartTime: common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()),
84-
HistoryLength: mutableState.GetNextEventID() - constants.FirstEventID,
85-
AutoResetPoints: executionInfo.AutoResetPoints,
86-
Memo: &types.Memo{Fields: executionInfo.CopyMemo()},
87-
IsCron: len(executionInfo.CronSchedule) > 0,
88-
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
89-
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.CopySearchAttributes()},
90-
PartitionConfig: executionInfo.CopyPartitionConfig(),
91-
CronOverlapPolicy: &executionInfo.CronOverlapPolicy,
92-
},
88+
ExecutionConfiguration: executionConfiguration,
9389
}
9490

9591
// TODO: we need to consider adding execution time to mutable state
@@ -99,125 +95,197 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
9995
if err != nil {
10096
return nil, err
10197
}
98+
99+
var completionEvent *types.HistoryEvent
100+
if executionInfo.State == persistence.WorkflowStateCompleted {
101+
completionEvent, err = mutableState.GetCompletionEvent(ctx)
102+
if err != nil {
103+
return nil, err
104+
}
105+
}
106+
107+
historyLength := mutableState.GetNextEventID() - constants.FirstEventID
108+
workflowExecutionInfo, err := mapWorkflowExecutionInfo(executionInfo, startEvent, domainCache, historyLength, completionEvent)
109+
if err != nil {
110+
return nil, err
111+
}
112+
result.WorkflowExecutionInfo = workflowExecutionInfo
113+
114+
pendingActivityInfos := mutableState.GetPendingActivityInfos()
115+
for _, ai := range pendingActivityInfos {
116+
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduleID)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
p := mapPendingActivityInfo(ai, scheduledEvent)
122+
result.PendingActivities = append(result.PendingActivities, p)
123+
}
124+
125+
childExecutions := mutableState.GetPendingChildExecutionInfos()
126+
domainEntry := mutableState.GetDomainEntry()
127+
for _, childExecution := range childExecutions {
128+
pendingChild, err := mapPendingChildExecutionInfo(childExecution, domainEntry, domainCache)
129+
if err != nil {
130+
return nil, err
131+
}
132+
result.PendingChildren = append(result.PendingChildren, pendingChild)
133+
}
134+
135+
if di, ok := mutableState.GetPendingDecision(); ok {
136+
result.PendingDecision = mapPendingDecisionInfo(di)
137+
}
138+
139+
return result, nil
140+
}
141+
142+
func mapWorkflowExecutionConfiguration(executionInfo *persistence.WorkflowExecutionInfo) (*types.WorkflowExecutionConfiguration, error) {
143+
return &types.WorkflowExecutionConfiguration{
144+
TaskList: mapDecisionInfoToTaskList(executionInfo),
145+
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.WorkflowTimeout),
146+
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.DecisionStartToCloseTimeout),
147+
}, nil
148+
}
149+
150+
func mapWorkflowExecutionInfo(executionInfo *persistence.WorkflowExecutionInfo, startEvent *types.HistoryEvent, domainCache cache.DomainCache, historyLength int64, completionEvent *types.HistoryEvent) (*types.WorkflowExecutionInfo, error) {
151+
result := &types.WorkflowExecutionInfo{
152+
Execution: &types.WorkflowExecution{
153+
WorkflowID: executionInfo.WorkflowID,
154+
RunID: executionInfo.RunID,
155+
},
156+
TaskList: mapDecisionInfoToTaskList(executionInfo),
157+
Type: &types.WorkflowType{Name: executionInfo.WorkflowTypeName},
158+
StartTime: common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()),
159+
HistoryLength: historyLength,
160+
AutoResetPoints: executionInfo.AutoResetPoints,
161+
Memo: &types.Memo{Fields: executionInfo.CopyMemo()},
162+
IsCron: len(executionInfo.CronSchedule) > 0,
163+
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
164+
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.CopySearchAttributes()},
165+
PartitionConfig: executionInfo.CopyPartitionConfig(),
166+
CronOverlapPolicy: &executionInfo.CronOverlapPolicy,
167+
ActiveClusterSelectionPolicy: executionInfo.ActiveClusterSelectionPolicy,
168+
}
169+
102170
backoffDuration := time.Duration(startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second
103-
result.WorkflowExecutionInfo.ExecutionTime = common.Int64Ptr(result.WorkflowExecutionInfo.GetStartTime() + backoffDuration.Nanoseconds())
171+
result.ExecutionTime = common.Int64Ptr(result.GetStartTime() + backoffDuration.Nanoseconds())
104172

105173
if executionInfo.ParentRunID != "" {
106-
result.WorkflowExecutionInfo.ParentExecution = &types.WorkflowExecution{
174+
result.ParentExecution = &types.WorkflowExecution{
107175
WorkflowID: executionInfo.ParentWorkflowID,
108176
RunID: executionInfo.ParentRunID,
109177
}
110-
result.WorkflowExecutionInfo.ParentDomainID = common.StringPtr(executionInfo.ParentDomainID)
111-
result.WorkflowExecutionInfo.ParentInitiatedID = common.Int64Ptr(executionInfo.InitiatedID)
112-
parentDomain, err := e.shard.GetDomainCache().GetDomainName(executionInfo.ParentDomainID)
178+
result.ParentDomainID = common.StringPtr(executionInfo.ParentDomainID)
179+
result.ParentInitiatedID = common.Int64Ptr(executionInfo.InitiatedID)
180+
parentDomain, err := domainCache.GetDomainName(executionInfo.ParentDomainID)
113181
if err != nil {
114182
return nil, err
115183
}
116-
result.WorkflowExecutionInfo.ParentDomain = common.StringPtr(parentDomain)
184+
result.ParentDomain = common.StringPtr(parentDomain)
117185
}
186+
118187
if executionInfo.State == persistence.WorkflowStateCompleted {
119-
// for closed workflow
120-
result.WorkflowExecutionInfo.CloseStatus = persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
121-
completionEvent, err := mutableState.GetCompletionEvent(ctx)
122-
if err != nil {
123-
return nil, err
124-
}
125-
result.WorkflowExecutionInfo.CloseTime = common.Int64Ptr(completionEvent.GetTimestamp())
126-
}
127-
128-
if len(mutableState.GetPendingActivityInfos()) > 0 {
129-
for _, ai := range mutableState.GetPendingActivityInfos() {
130-
p := &types.PendingActivityInfo{
131-
ActivityID: ai.ActivityID,
132-
ScheduleID: ai.ScheduleID,
133-
}
134-
state := types.PendingActivityStateScheduled
135-
if ai.CancelRequested {
136-
state = types.PendingActivityStateCancelRequested
137-
} else if ai.StartedID != constants.EmptyEventID {
138-
state = types.PendingActivityStateStarted
139-
}
140-
p.State = &state
141-
lastHeartbeatUnixNano := ai.LastHeartBeatUpdatedTime.UnixNano()
142-
if lastHeartbeatUnixNano > 0 {
143-
p.LastHeartbeatTimestamp = common.Int64Ptr(lastHeartbeatUnixNano)
144-
p.HeartbeatDetails = ai.Details
145-
}
146-
// TODO: move to mutable state instead of loading it from event
147-
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduleID)
148-
if err != nil {
149-
return nil, err
150-
}
151-
p.ActivityType = scheduledEvent.ActivityTaskScheduledEventAttributes.ActivityType
152-
if state == types.PendingActivityStateScheduled {
153-
p.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano())
154-
} else {
155-
p.LastStartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
156-
}
157-
if ai.HasRetryPolicy {
158-
p.Attempt = ai.Attempt
159-
if !ai.ExpirationTime.IsZero() {
160-
p.ExpirationTimestamp = common.Int64Ptr(ai.ExpirationTime.UnixNano())
161-
}
162-
if ai.MaximumAttempts != 0 {
163-
p.MaximumAttempts = ai.MaximumAttempts
164-
}
165-
if ai.LastFailureReason != "" {
166-
p.LastFailureReason = common.StringPtr(ai.LastFailureReason)
167-
p.LastFailureDetails = ai.LastFailureDetails
168-
}
169-
if ai.LastWorkerIdentity != "" {
170-
p.LastWorkerIdentity = ai.LastWorkerIdentity
171-
}
172-
if ai.StartedIdentity != "" {
173-
p.StartedWorkerIdentity = ai.StartedIdentity
174-
}
175-
}
176-
result.PendingActivities = append(result.PendingActivities, p)
188+
result.CloseStatus = persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
189+
if completionEvent != nil {
190+
result.CloseTime = common.Int64Ptr(completionEvent.GetTimestamp())
177191
}
178192
}
179193

180-
if len(mutableState.GetPendingChildExecutionInfos()) > 0 {
181-
for _, ch := range mutableState.GetPendingChildExecutionInfos() {
182-
childDomainName, err := execution.GetChildExecutionDomainName(
183-
ch,
184-
e.shard.GetDomainCache(),
185-
mutableState.GetDomainEntry(),
186-
)
187-
if err != nil {
188-
if !common.IsEntityNotExistsError(err) {
189-
return nil, err
190-
}
191-
// child domain already deleted, instead of failing the request,
192-
// return domainID instead since this field is only for information purpose
193-
childDomainName = ch.DomainID
194-
}
195-
p := &types.PendingChildExecutionInfo{
196-
Domain: childDomainName,
197-
WorkflowID: ch.StartedWorkflowID,
198-
RunID: ch.StartedRunID,
199-
WorkflowTypeName: ch.WorkflowTypeName,
200-
InitiatedID: ch.InitiatedID,
201-
ParentClosePolicy: &ch.ParentClosePolicy,
202-
}
203-
result.PendingChildren = append(result.PendingChildren, p)
204-
}
194+
return result, nil
195+
}
196+
197+
func mapPendingActivityInfo(ai *persistence.ActivityInfo, activityScheduledEvent *types.HistoryEvent) *types.PendingActivityInfo {
198+
p := &types.PendingActivityInfo{
199+
ActivityID: ai.ActivityID,
200+
ScheduleID: ai.ScheduleID,
205201
}
206202

207-
if di, ok := mutableState.GetPendingDecision(); ok {
208-
pendingDecision := &types.PendingDecisionInfo{
209-
State: types.PendingDecisionStateScheduled.Ptr(),
210-
ScheduledTimestamp: common.Int64Ptr(di.ScheduledTimestamp),
211-
Attempt: di.Attempt,
212-
OriginalScheduledTimestamp: common.Int64Ptr(di.OriginalScheduledTimestamp),
213-
ScheduleID: di.ScheduleID,
203+
state := types.PendingActivityStateScheduled
204+
if ai.CancelRequested {
205+
state = types.PendingActivityStateCancelRequested
206+
} else if ai.StartedID != constants.EmptyEventID {
207+
state = types.PendingActivityStateStarted
208+
}
209+
p.State = &state
210+
211+
lastHeartbeatUnixNano := ai.LastHeartBeatUpdatedTime.UnixNano()
212+
if lastHeartbeatUnixNano > 0 {
213+
p.LastHeartbeatTimestamp = common.Int64Ptr(lastHeartbeatUnixNano)
214+
p.HeartbeatDetails = ai.Details
215+
}
216+
217+
p.ActivityType = activityScheduledEvent.ActivityTaskScheduledEventAttributes.ActivityType
218+
if state == types.PendingActivityStateScheduled {
219+
p.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano())
220+
} else {
221+
p.LastStartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
222+
}
223+
224+
if ai.HasRetryPolicy {
225+
p.Attempt = ai.Attempt
226+
if !ai.ExpirationTime.IsZero() {
227+
p.ExpirationTimestamp = common.Int64Ptr(ai.ExpirationTime.UnixNano())
228+
}
229+
if ai.MaximumAttempts != 0 {
230+
p.MaximumAttempts = ai.MaximumAttempts
231+
}
232+
if ai.LastFailureReason != "" {
233+
p.LastFailureReason = common.StringPtr(ai.LastFailureReason)
234+
p.LastFailureDetails = ai.LastFailureDetails
214235
}
215-
if di.StartedID != constants.EmptyEventID {
216-
pendingDecision.State = types.PendingDecisionStateStarted.Ptr()
217-
pendingDecision.StartedTimestamp = common.Int64Ptr(di.StartedTimestamp)
236+
if ai.LastWorkerIdentity != "" {
237+
p.LastWorkerIdentity = ai.LastWorkerIdentity
238+
}
239+
if ai.StartedIdentity != "" {
240+
p.StartedWorkerIdentity = ai.StartedIdentity
218241
}
219-
result.PendingDecision = pendingDecision
220242
}
221243

222-
return result, nil
244+
return p
245+
}
246+
247+
func mapPendingChildExecutionInfo(childExecution *persistence.ChildExecutionInfo, domainEntry *cache.DomainCacheEntry, domainCache cache.DomainCache) (*types.PendingChildExecutionInfo, error) {
248+
childDomainName, err := execution.GetChildExecutionDomainName(
249+
childExecution,
250+
domainCache,
251+
domainEntry,
252+
)
253+
if err != nil {
254+
if !common.IsEntityNotExistsError(err) {
255+
return nil, err
256+
}
257+
// child domain already deleted, instead of failing the request,
258+
// return domainID instead since this field is only for information purpose
259+
childDomainName = childExecution.DomainID
260+
}
261+
return &types.PendingChildExecutionInfo{
262+
Domain: childDomainName,
263+
WorkflowID: childExecution.StartedWorkflowID,
264+
RunID: childExecution.StartedRunID,
265+
WorkflowTypeName: childExecution.WorkflowTypeName,
266+
InitiatedID: childExecution.InitiatedID,
267+
ParentClosePolicy: &childExecution.ParentClosePolicy,
268+
}, nil
269+
}
270+
271+
func mapPendingDecisionInfo(di *execution.DecisionInfo) *types.PendingDecisionInfo {
272+
pendingDecision := &types.PendingDecisionInfo{
273+
State: types.PendingDecisionStateScheduled.Ptr(),
274+
ScheduledTimestamp: common.Int64Ptr(di.ScheduledTimestamp),
275+
Attempt: di.Attempt,
276+
OriginalScheduledTimestamp: common.Int64Ptr(di.OriginalScheduledTimestamp),
277+
ScheduleID: di.ScheduleID,
278+
}
279+
if di.StartedID != constants.EmptyEventID {
280+
pendingDecision.State = types.PendingDecisionStateStarted.Ptr()
281+
pendingDecision.StartedTimestamp = common.Int64Ptr(di.StartedTimestamp)
282+
}
283+
return pendingDecision
284+
}
285+
286+
func mapDecisionInfoToTaskList(executionInfo *persistence.WorkflowExecutionInfo) *types.TaskList {
287+
return &types.TaskList{
288+
Name: executionInfo.TaskList,
289+
Kind: executionInfo.TaskListKind.Ptr(),
290+
}
223291
}

0 commit comments

Comments
 (0)