Skip to content

Commit a4b986f

Browse files
authored
More reliable describe workflow api (#693)
* use describe workflow api * address nits * remove extra comment * change it to upper case * fix typo
1 parent 00fdd4e commit a4b986f

File tree

13 files changed

+307
-89
lines changed

13 files changed

+307
-89
lines changed
Lines changed: 117 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import merge from 'lodash/merge';
12
import { NextResponse, type NextRequest } from 'next/server';
23

34
import decodeUrlParams from '@/utils/decode-url-params';
45
import * as grpcClient from '@/utils/grpc/grpc-client';
56
import { getHTTPStatusCode, GRPCError } from '@/utils/grpc/grpc-error';
67
import logger, { type RouteHandlerErrorPayload } from '@/utils/logger';
78

8-
import { type RequestParams } from './describe-workflow.types';
9+
import {
10+
type DescribeUnArchivedWorkflowResponse,
11+
type DescribeArchivedWorkflowResponse,
12+
type RequestParams,
13+
} from './describe-workflow.types';
914

1015
export default async function describeWorkflow(
1116
_: NextRequest,
@@ -14,7 +19,7 @@ export default async function describeWorkflow(
1419
const decodedParams = decodeUrlParams(requestParams.params);
1520

1621
try {
17-
const res = await grpcClient.clusterMethods[
22+
const describeWorkflowResponse = await grpcClient.clusterMethods[
1823
decodedParams.cluster
1924
].describeWorkflow({
2025
domain: decodedParams.domain,
@@ -24,20 +29,118 @@ export default async function describeWorkflow(
2429
},
2530
});
2631

32+
const res: DescribeUnArchivedWorkflowResponse = merge(
33+
{},
34+
describeWorkflowResponse,
35+
{
36+
workflowExecutionInfo: { closeEvent: null, isArchived: false as const },
37+
}
38+
);
39+
if (
40+
res.workflowExecutionInfo &&
41+
res.workflowExecutionInfo.closeStatus &&
42+
res.workflowExecutionInfo.closeStatus !==
43+
'WORKFLOW_EXECUTION_CLOSE_STATUS_INVALID'
44+
) {
45+
const closeEventResponse = await grpcClient.clusterMethods[
46+
decodedParams.cluster
47+
].getHistory({
48+
domain: decodedParams.domain,
49+
workflowExecution: {
50+
workflowId: decodedParams.workflowId,
51+
runId: decodedParams.runId,
52+
},
53+
historyEventFilterType: 'EVENT_FILTER_TYPE_CLOSE_EVENT',
54+
});
55+
if (closeEventResponse.history?.events?.[0])
56+
res.workflowExecutionInfo.closeEvent =
57+
closeEventResponse.history?.events?.[0];
58+
}
59+
2760
return NextResponse.json(res);
2861
} catch (e) {
29-
logger.error<RouteHandlerErrorPayload>(
30-
{ requestParams: decodedParams, cause: e },
31-
'Error fetching workflow info'
32-
);
62+
// DescribeWorkflow depends on a temp datasource, so sometimes data is not available
63+
// to make it more reliable we depend on history to construct similar response in case data is not available
64+
try {
65+
if (e instanceof GRPCError && e.httpStatusCode !== 404) {
66+
throw e;
67+
}
68+
const archivedHistoryResponse = await grpcClient.clusterMethods[
69+
decodedParams.cluster
70+
].getHistory({
71+
domain: decodedParams.domain,
72+
workflowExecution: {
73+
workflowId: decodedParams.workflowId,
74+
runId: decodedParams.runId,
75+
},
76+
pageSize: 1,
77+
});
78+
const archivedHistoryEvents =
79+
archivedHistoryResponse.history?.events || [];
3380

34-
return NextResponse.json(
35-
{
36-
message:
37-
e instanceof GRPCError ? e.message : 'Error fetching workflow info',
38-
cause: e,
39-
},
40-
{ status: getHTTPStatusCode(e) }
41-
);
81+
if (!archivedHistoryEvents[0]?.workflowExecutionStartedEventAttributes) {
82+
throw e;
83+
}
84+
85+
const {
86+
eventTime: startTime,
87+
workflowExecutionStartedEventAttributes: {
88+
taskList,
89+
executionStartToCloseTimeout,
90+
taskStartToCloseTimeout,
91+
workflowType: type,
92+
},
93+
} = archivedHistoryEvents[0];
94+
95+
const res: DescribeArchivedWorkflowResponse = {
96+
executionConfiguration: {
97+
taskList,
98+
executionStartToCloseTimeout,
99+
taskStartToCloseTimeout,
100+
},
101+
workflowExecutionInfo: {
102+
workflowExecution: {
103+
runId: decodedParams.runId,
104+
workflowId: decodedParams.workflowId,
105+
},
106+
isArchived: true,
107+
startTime,
108+
type,
109+
closeTime: null,
110+
closeStatus: null,
111+
closeEvent: null,
112+
historyLength: null,
113+
parentExecutionInfo: null,
114+
executionTime: null,
115+
memo: null,
116+
searchAttributes: null,
117+
autoResetPoints: null,
118+
taskList: '',
119+
isCron: null,
120+
updateTime: null,
121+
partitionConfig: null,
122+
},
123+
pendingActivities: [],
124+
pendingChildren: [],
125+
pendingDecision: null,
126+
};
127+
return NextResponse.json(res);
128+
} catch (e) {
129+
logger.error<RouteHandlerErrorPayload>(
130+
{ requestParams: decodedParams, cause: e },
131+
'Error fetching workflow execution info'
132+
);
133+
134+
return NextResponse.json(
135+
{
136+
message:
137+
e instanceof GRPCError
138+
? e.message
139+
: 'Error fetching workflow execution info',
140+
cause: e,
141+
},
142+
{ status: getHTTPStatusCode(e) }
143+
);
144+
}
42145
}
43146
}

src/route-handlers/describe-workflow/describe-workflow.types.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { type DescribeWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeWorkflowExecutionResponse';
2+
import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent';
13
import { type WorkflowExecutionInfo } from '@/__generated__/proto-ts/uber/cadence/api/v1/WorkflowExecutionInfo';
24

35
export type RouteParams = {
@@ -11,4 +13,35 @@ export type RequestParams = {
1113
params: RouteParams;
1214
};
1315

14-
export type DescribeWorkflowResponse = WorkflowExecutionInfo;
16+
export type DescribeWorkflowResponse =
17+
| DescribeUnArchivedWorkflowResponse
18+
| DescribeArchivedWorkflowResponse;
19+
20+
export type DescribeUnArchivedWorkflowResponse = Omit<
21+
DescribeWorkflowExecutionResponse,
22+
'workflowExecutionInfo'
23+
> & {
24+
workflowExecutionInfo:
25+
| (WorkflowExecutionInfo & {
26+
closeEvent: HistoryEvent | null; // TODO @assem.hafez enhance type to make it close events instead of the generic history event
27+
isArchived: false;
28+
})
29+
| null;
30+
};
31+
32+
export type DescribeArchivedWorkflowResponse = Omit<
33+
DescribeWorkflowExecutionResponse,
34+
'workflowExecutionInfo'
35+
> & {
36+
workflowExecutionInfo: Omit<
37+
WorkflowExecutionInfo,
38+
'closeStatus' | 'historyLength' | 'partitionConfig' | 'isCron'
39+
> & {
40+
isArchived: true;
41+
closeEvent: null;
42+
isCron: null;
43+
closeStatus: null;
44+
historyLength: null;
45+
partitionConfig: null;
46+
};
47+
};

src/utils/grpc/grpc-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { type DescribeClusterRequest__Input } from '@/__generated__/proto-ts/uber/cadence/admin/v1/DescribeClusterRequest';
22
import { type DescribeClusterResponse } from '@/__generated__/proto-ts/uber/cadence/admin/v1/DescribeClusterResponse';
33
import { type DescribeWorkflowExecutionRequest__Input } from '@/__generated__/proto-ts/uber/cadence/admin/v1/DescribeWorkflowExecutionRequest';
4-
import { type DescribeWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/admin/v1/DescribeWorkflowExecutionResponse';
54
import { type DescribeDomainRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeDomainRequest';
65
import { type DescribeDomainResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeDomainResponse';
76
import { type DescribeTaskListRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeTaskListRequest';
87
import { type DescribeTaskListResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeTaskListResponse';
8+
import { type DescribeWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/DescribeWorkflowExecutionResponse';
99
import { type GetWorkflowExecutionHistoryRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/GetWorkflowExecutionHistoryRequest';
1010
import { type GetWorkflowExecutionHistoryResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/GetWorkflowExecutionHistoryResponse';
1111
import { type ListArchivedWorkflowExecutionsRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/ListArchivedWorkflowExecutionsRequest';
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
const WORKFLOW_PAGE_STATUS_REFRESH_INTERVAL = 10000;
2+
3+
export default WORKFLOW_PAGE_STATUS_REFRESH_INTERVAL;

src/views/workflow-page/helpers/__tests__/get-workflow-status-tag-props.test.ts

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
// Import the function to be tested
2+
import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent';
3+
import { continueAsNewWorkflowExecutionEvent } from '@/views/workflow-history/__fixtures__/workflow-history-single-events';
4+
25
import getWorkflowIsCompleted from '../get-workflow-is-completed';
36
import getWorkflowStatusTagProps from '../get-workflow-status-tag-props';
47

@@ -20,7 +23,9 @@ describe('getWorkflowStatusTagProps', () => {
2023

2124
it('should return INVALID (running) status if workflow is not completed', () => {
2225
mockedGetWorkflowIsCompleted.mockReturnValue(false);
23-
const lastEvent = { attributes: 'someRunningEventAttributes' };
26+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
27+
attributes: 'workflowExecutionStartedEventAttributes',
28+
};
2429

2530
expect(getWorkflowStatusTagProps(lastEvent)).toEqual({
2631
status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_INVALID',
@@ -29,7 +34,9 @@ describe('getWorkflowStatusTagProps', () => {
2934

3035
it('should return FAILED status if workflow failed', () => {
3136
mockedGetWorkflowIsCompleted.mockReturnValue(true);
32-
const lastEvent = { attributes: 'workflowExecutionFailedEventAttributes' };
37+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
38+
attributes: 'workflowExecutionFailedEventAttributes',
39+
};
3340

3441
expect(getWorkflowStatusTagProps(lastEvent)).toEqual({
3542
status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_FAILED',
@@ -38,7 +45,7 @@ describe('getWorkflowStatusTagProps', () => {
3845

3946
it('should return CANCELED status if workflow canceled or cancel requested', () => {
4047
mockedGetWorkflowIsCompleted.mockReturnValue(true);
41-
const lastEvent = {
48+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
4249
attributes: 'workflowExecutionCanceledEventAttributes',
4350
};
4451

@@ -55,7 +62,7 @@ describe('getWorkflowStatusTagProps', () => {
5562

5663
it('should return COMPLETED status if workflow completed', () => {
5764
mockedGetWorkflowIsCompleted.mockReturnValue(true);
58-
const lastEvent = {
65+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
5966
attributes: 'workflowExecutionCompletedEventAttributes',
6067
};
6168

@@ -66,7 +73,7 @@ describe('getWorkflowStatusTagProps', () => {
6673

6774
it('should return TERMINATED status if workflow terminated', () => {
6875
mockedGetWorkflowIsCompleted.mockReturnValue(true);
69-
const lastEvent = {
76+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
7077
attributes: 'workflowExecutionTerminatedEventAttributes',
7178
};
7279

@@ -77,12 +84,11 @@ describe('getWorkflowStatusTagProps', () => {
7784

7885
it('should return CONTINUED_AS_NEW status and link if workflow continued as new', () => {
7986
mockedGetWorkflowIsCompleted.mockReturnValue(true);
80-
const lastEvent = {
81-
attributes: 'workflowExecutionContinuedAsNewEventAttributes',
82-
workflowExecutionContinuedAsNewEventAttributes: {
83-
newExecutionRunId: 'newRunId',
84-
},
85-
};
87+
const lastEvent = continueAsNewWorkflowExecutionEvent;
88+
const newRunId =
89+
continueAsNewWorkflowExecutionEvent
90+
.workflowExecutionContinuedAsNewEventAttributes.newExecutionRunId;
91+
8692
const workflowInfo = {
8793
cluster: 'testCluster',
8894
workflowId: 'testWorkflowId',
@@ -91,18 +97,13 @@ describe('getWorkflowStatusTagProps', () => {
9197

9298
expect(getWorkflowStatusTagProps(lastEvent, workflowInfo)).toEqual({
9399
status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_CONTINUED_AS_NEW',
94-
link: '/domains/testDomain/testCluster/workflows/testWorkflowId/newRunId',
100+
link: `/domains/testDomain/testCluster/workflows/testWorkflowId/${newRunId}`,
95101
});
96102
});
97103

98104
it('should return CONTINUED_AS_NEW status with undefined link if workflowInfo is incomplete', () => {
99105
mockedGetWorkflowIsCompleted.mockReturnValue(true);
100-
const lastEvent = {
101-
attributes: 'workflowExecutionContinuedAsNewEventAttributes',
102-
workflowExecutionContinuedAsNewEventAttributes: {
103-
newExecutionRunId: 'newRunId',
104-
},
105-
};
106+
const lastEvent = continueAsNewWorkflowExecutionEvent;
106107

107108
expect(getWorkflowStatusTagProps(lastEvent)).toEqual({
108109
status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_CONTINUED_AS_NEW',
@@ -112,7 +113,7 @@ describe('getWorkflowStatusTagProps', () => {
112113

113114
it('should return TIMED_OUT status if workflow timed out', () => {
114115
mockedGetWorkflowIsCompleted.mockReturnValue(true);
115-
const lastEvent = {
116+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
116117
attributes: 'workflowExecutionTimedOutEventAttributes',
117118
};
118119

@@ -123,7 +124,10 @@ describe('getWorkflowStatusTagProps', () => {
123124

124125
it('should return INVALID status for unrecognized attributes', () => {
125126
mockedGetWorkflowIsCompleted.mockReturnValue(true);
126-
const lastEvent = { attributes: 'someUnknownEventAttributes' };
127+
const lastEvent: Pick<HistoryEvent, 'attributes'> = {
128+
// @ts-expect-error testing invalid attributes
129+
attributes: 'someUnknownEventAttributes',
130+
};
127131

128132
expect(getWorkflowStatusTagProps(lastEvent)).toEqual({
129133
status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_INVALID',

src/views/workflow-page/helpers/get-workflow-status-tag-props.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,24 @@
1919
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2020
// THE SOFTWARE.
2121

22+
import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent';
2223
import type { Props as WorkflowStatusTagProps } from '@/views/shared/workflow-status-tag/workflow-status-tag-icon/workflow-status-tag-icon.types';
2324

2425
import getWorkflowIsCompleted from './get-workflow-is-completed';
2526

2627
//TODO: @assem.hafez add type form response to lastEvent
2728
const getWorkflowStatusTagProps = (
28-
lastEvent: any,
29+
lastEvent?: Pick<
30+
HistoryEvent,
31+
'attributes' | 'workflowExecutionContinuedAsNewEventAttributes'
32+
> | null,
2933
workflowInfo?: { cluster: string; workflowId: string; domain: string }
3034
): Pick<WorkflowStatusTagProps, 'status' | 'link'> => {
31-
const isCompleted = getWorkflowIsCompleted(lastEvent?.attributes);
35+
if (!lastEvent || !lastEvent.attributes)
36+
return { status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_INVALID' };
3237

33-
if (!lastEvent || !isCompleted)
38+
const isCompleted = getWorkflowIsCompleted(lastEvent.attributes);
39+
if (!isCompleted)
3440
return { status: 'WORKFLOW_EXECUTION_CLOSE_STATUS_INVALID' };
3541

3642
switch (lastEvent?.attributes) {

0 commit comments

Comments
 (0)