Skip to content

Commit defc1ad

Browse files
committed
Added a count method to the workflow client, a higher-level user-friendlier option to using the corresponding GRPC method directly.
1 parent 147ec50 commit defc1ad

File tree

4 files changed

+100
-4
lines changed

4 files changed

+100
-4
lines changed

packages/client/src/helpers.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ import { Replace } from '@temporalio/common/lib/type-helpers';
1010
import { optionalTsToDate, requiredTsToDate } from '@temporalio/common/lib/time';
1111
import { decodeMapFromPayloads } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
1212
import { temporal, google } from '@temporalio/proto';
13-
import { RawWorkflowExecutionInfo, WorkflowExecutionInfo, WorkflowExecutionStatusName } from './types';
13+
import {
14+
CountWorkflowExecution,
15+
CountWorkflowExecutionsAggregationGroup,
16+
RawCountWorkflowExecutions,
17+
RawCountWorkflowExecutionsAggregationGroup,
18+
RawWorkflowExecutionInfo,
19+
WorkflowExecutionInfo,
20+
WorkflowExecutionStatusName,
21+
} from './types';
1422

1523
function workflowStatusCodeToName(code: temporal.api.enums.v1.WorkflowExecutionStatus): WorkflowExecutionStatusName {
1624
return workflowStatusCodeToNameInternal(code) ?? 'UNKNOWN';
@@ -81,6 +89,24 @@ export async function executionInfoFromRaw<T>(
8189
};
8290
}
8391

92+
export function countWorkflowExecutionFromRaw(raw: RawCountWorkflowExecutions): CountWorkflowExecution {
93+
return {
94+
// Note: lossy conversion of Long to number
95+
count: raw.count!.toNumber(),
96+
groups: raw.groups!.map((group) => executionCountAggregationGroupFromRaw(group)),
97+
};
98+
}
99+
100+
export function executionCountAggregationGroupFromRaw(
101+
raw: RawCountWorkflowExecutionsAggregationGroup
102+
): CountWorkflowExecutionsAggregationGroup {
103+
return {
104+
// Note: lossy conversion of Long to number
105+
count: raw.count!.toNumber(),
106+
group_values: raw.groupValues!.map((group_value) => searchAttributePayloadConverter.fromPayload(group_value)),
107+
};
108+
}
109+
84110
type ErrorDetailsName = `temporal.api.errordetails.v1.${keyof typeof temporal.api.errordetails.v1}`;
85111

86112
/**

packages/client/src/types.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type * as grpc from '@grpc/grpc-js';
2-
import type { SearchAttributes } from '@temporalio/common';
2+
import type { SearchAttributes, SearchAttributeValue } from '@temporalio/common';
33
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
44
import * as proto from '@temporalio/proto';
55
import { Replace } from '@temporalio/common/lib/type-helpers';
@@ -14,6 +14,9 @@ export type GetWorkflowExecutionHistoryRequest =
1414
export type DescribeWorkflowExecutionResponse =
1515
proto.temporal.api.workflowservice.v1.IDescribeWorkflowExecutionResponse;
1616
export type RawWorkflowExecutionInfo = proto.temporal.api.workflow.v1.IWorkflowExecutionInfo;
17+
export type RawCountWorkflowExecutions = proto.temporal.api.workflowservice.v1.ICountWorkflowExecutionsResponse;
18+
export type RawCountWorkflowExecutionsAggregationGroup =
19+
proto.temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse.IAggregationGroup;
1720
export type TerminateWorkflowExecutionResponse =
1821
proto.temporal.api.workflowservice.v1.ITerminateWorkflowExecutionResponse;
1922
export type RequestCancelWorkflowExecutionResponse =
@@ -52,6 +55,16 @@ export interface WorkflowExecutionInfo {
5255
raw: RawWorkflowExecutionInfo;
5356
}
5457

58+
export interface CountWorkflowExecution {
59+
count: number;
60+
groups: CountWorkflowExecutionsAggregationGroup[];
61+
}
62+
63+
export interface CountWorkflowExecutionsAggregationGroup {
64+
count: number;
65+
group_values: SearchAttributeValue[];
66+
}
67+
5568
export type WorkflowExecutionDescription = Replace<
5669
WorkflowExecutionInfo,
5770
{

packages/client/src/workflow-client.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import {
5858
WorkflowStartUpdateOutput,
5959
} from './interceptors';
6060
import {
61+
CountWorkflowExecution,
6162
DescribeWorkflowExecutionResponse,
6263
encodeQueryRejectCondition,
6364
GetWorkflowExecutionHistoryRequest,
@@ -77,7 +78,7 @@ import {
7778
WorkflowStartOptions,
7879
WorkflowUpdateOptions,
7980
} from './workflow-options';
80-
import { executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
81+
import { countWorkflowExecutionFromRaw, executionInfoFromRaw, rethrowKnownErrorTypes } from './helpers';
8182
import {
8283
BaseClient,
8384
BaseClientOptions,
@@ -1308,6 +1309,28 @@ export class WorkflowClient extends BaseClient {
13081309
};
13091310
}
13101311

1312+
/**
1313+
* Return workflow execution count by given `query`.
1314+
*
1315+
* ⚠️ To use advanced query functionality, as of the 1.18 server release, you must use Elasticsearch based visibility.
1316+
*
1317+
* More info on the concept of "visibility" and the query syntax on the Temporal documentation site:
1318+
* https://docs.temporal.io/visibility
1319+
*/
1320+
public async count(query?: string): Promise<CountWorkflowExecution> {
1321+
let response: temporal.api.workflowservice.v1.CountWorkflowExecutionsResponse;
1322+
try {
1323+
response = await this.workflowService.countWorkflowExecutions({
1324+
namespace: this.options.namespace,
1325+
query,
1326+
});
1327+
} catch (e) {
1328+
this.rethrowGrpcError(e, 'Failed to count workflows', undefined);
1329+
}
1330+
1331+
return countWorkflowExecutionFromRaw(response);
1332+
}
1333+
13111334
protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] {
13121335
if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) {
13131336
// eslint-disable-next-line deprecation/deprecation

packages/test/src/test-integration-workflows.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { randomUUID } from 'crypto';
22
import { ExecutionContext } from 'ava';
33
import { firstValueFrom, Subject } from 'rxjs';
4-
import { WorkflowFailedError } from '@temporalio/client';
4+
import { CountWorkflowExecution, WorkflowFailedError } from '@temporalio/client';
55
import * as activity from '@temporalio/activity';
66
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
77
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -1264,3 +1264,37 @@ export const interceptors: workflow.WorkflowInterceptorsFactory = () => {
12641264
}
12651265
return {};
12661266
};
1267+
1268+
export async function completableWorkflow(completes: boolean): Promise<void> {
1269+
await workflow.condition(() => completes);
1270+
}
1271+
1272+
test('Count workflow executions', async (t) => {
1273+
const { taskQueue, createWorker, executeWorkflow, startWorkflow } = helpers(t);
1274+
const worker = await createWorker();
1275+
const client = t.context.env.client;
1276+
1277+
await worker.runUntil(async () => {
1278+
// Run 3 workflows that complete.
1279+
for (let i = 0; i < 3; i++) {
1280+
await executeWorkflow(completableWorkflow, { args: [true] });
1281+
}
1282+
});
1283+
1284+
// Run 2 workflows that don't complete
1285+
// (use startWorkflow to avoid waiting for workflows to complete, which they never will)
1286+
for (let i = 0; i < 2; i++) {
1287+
await startWorkflow(completableWorkflow, { args: [false] });
1288+
}
1289+
1290+
const actual = await client.workflow.count(`TaskQueue = '${taskQueue}' GROUP BY ExecutionStatus`);
1291+
const expected: CountWorkflowExecution = {
1292+
count: 5,
1293+
groups: [
1294+
{ count: 2, group_values: [['Runningggg']] },
1295+
{ count: 3, group_values: [['Completedddd']] },
1296+
],
1297+
};
1298+
1299+
t.deepEqual(expected, actual);
1300+
});

0 commit comments

Comments
 (0)