Skip to content

Commit d496966

Browse files
committed
use supplied data converter
1 parent 031e295 commit d496966

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

packages/client/src/schedule-helpers.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import { Headers } from '@temporalio/common/lib/interceptors';
1515
import {
1616
decodeArrayFromPayloads,
1717
decodeMapFromPayloads,
18+
decodeOptionalSinglePayload,
1819
encodeMapToPayloads,
20+
encodeOptionalToPayload,
1921
encodeToPayloads,
2022
} from '@temporalio/common/lib/internal-non-workflow';
2123
import { temporal } from '@temporalio/proto';
@@ -245,7 +247,6 @@ export async function encodeScheduleAction(
245247
action: CompiledScheduleAction,
246248
headers: Headers
247249
): Promise<temporal.api.schedule.v1.IScheduleAction> {
248-
const jsonConverter = new JsonPayloadConverter();
249250
return {
250251
startWorkflow: {
251252
workflowId: action.workflowId,
@@ -270,8 +271,8 @@ export async function encodeScheduleAction(
270271
: undefined,
271272
header: { fields: headers },
272273
userMetadata: {
273-
summary: jsonConverter.toPayload(action.staticSummary),
274-
details: jsonConverter.toPayload(action.staticDetails),
274+
summary: await encodeOptionalToPayload(dataConverter, action?.staticSummary),
275+
details: await encodeOptionalToPayload(dataConverter, action?.staticDetails),
275276
},
276277
},
277278
};
@@ -322,7 +323,6 @@ export async function decodeScheduleAction(
322323
pb: temporal.api.schedule.v1.IScheduleAction
323324
): Promise<ScheduleDescriptionAction> {
324325
if (pb.startWorkflow) {
325-
const jsonConverter = new JsonPayloadConverter();
326326
const userMetadata = pb.startWorkflow?.userMetadata;
327327
return {
328328
type: 'startWorkflow',
@@ -340,8 +340,8 @@ export async function decodeScheduleAction(
340340
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout),
341341
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout),
342342
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout),
343-
staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined,
344-
staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined,
343+
staticSummary: await decodeOptionalSinglePayload(dataConverter, userMetadata?.summary) ?? undefined,
344+
staticDetails: await decodeOptionalSinglePayload(dataConverter, userMetadata?.details) ?? undefined,
345345
};
346346
}
347347
throw new TypeError('Unsupported schedule action');

packages/client/src/workflow-client.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import {
2222
decodeRetryState,
2323
encodeWorkflowIdConflictPolicy,
2424
WorkflowIdConflictPolicy,
25-
JsonPayloadConverter,
2625
} from '@temporalio/common';
2726
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
2827
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
@@ -32,7 +31,9 @@ import {
3231
decodeArrayFromPayloads,
3332
decodeFromPayloadsAtIndex,
3433
decodeOptionalFailureToOptionalError,
34+
decodeOptionalSinglePayload,
3535
encodeMapToPayloads,
36+
encodeOptionalToPayload,
3637
encodeToPayloads,
3738
filterNullAndUndefined,
3839
} from '@temporalio/common/lib/internal-non-workflow';
@@ -1197,7 +1198,6 @@ export class WorkflowClient extends BaseClient {
11971198
protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise<string> {
11981199
const { identity } = this.options;
11991200
const { options, workflowType, signalName, signalArgs, headers } = input;
1200-
const jsonConverter = new JsonPayloadConverter();
12011201
const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = {
12021202
namespace: this.options.namespace,
12031203
identity,
@@ -1228,8 +1228,8 @@ export class WorkflowClient extends BaseClient {
12281228
cronSchedule: options.cronSchedule,
12291229
header: { fields: headers },
12301230
userMetadata: {
1231-
summary: jsonConverter.toPayload(options?.staticSummary),
1232-
details: jsonConverter.toPayload(options?.staticDetails),
1231+
summary: await encodeOptionalToPayload(this.dataConverter, options?.staticSummary),
1232+
details: await encodeOptionalToPayload(this.dataConverter, options?.staticDetails),
12331233
},
12341234
};
12351235
try {
@@ -1271,7 +1271,6 @@ export class WorkflowClient extends BaseClient {
12711271
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
12721272
const { options: opts, workflowType, headers } = input;
12731273
const { identity, namespace } = this.options;
1274-
const jsonConverter = new JsonPayloadConverter();
12751274
return {
12761275
namespace,
12771276
identity,
@@ -1300,8 +1299,8 @@ export class WorkflowClient extends BaseClient {
13001299
cronSchedule: opts.cronSchedule,
13011300
header: { fields: headers },
13021301
userMetadata: {
1303-
summary: jsonConverter.toPayload(opts?.staticSummary),
1304-
details: jsonConverter.toPayload(opts?.staticDetails),
1302+
summary: await encodeOptionalToPayload(this.dataConverter, opts?.staticSummary),
1303+
details: await encodeOptionalToPayload(this.dataConverter, opts?.staticDetails),
13051304
},
13061305
};
13071306
}
@@ -1436,12 +1435,11 @@ export class WorkflowClient extends BaseClient {
14361435
workflowExecution: { workflowId, runId },
14371436
});
14381437
const info = await executionInfoFromRaw(raw.workflowExecutionInfo ?? {}, this.client.dataConverter, raw);
1439-
const jsonConverter = new JsonPayloadConverter();
14401438
const userMetadata = raw.executionConfig?.userMetadata;
14411439
return {
14421440
...info,
1443-
staticDetails: userMetadata?.details ? jsonConverter.fromPayload(userMetadata.details) : undefined,
1444-
staticSummary: userMetadata?.summary ? jsonConverter.fromPayload(userMetadata.summary) : undefined,
1441+
staticDetails: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.details) ?? undefined,
1442+
staticSummary: await decodeOptionalSinglePayload(this.client.dataConverter, userMetadata?.summary) ?? undefined,
14451443
raw,
14461444
};
14471445
},

packages/common/src/internal-non-workflow/codec-helpers.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ export async function decodeOptionalSingle(
7272
return await decodeSingle(codecs, payload);
7373
}
7474

75+
/** Run {@link PayloadCodec.decode} and convert from a single Payload */
76+
export async function decodeOptionalSinglePayload<T>(
77+
dataConverter: LoadedDataConverter,
78+
payload?: Payload | null | undefined
79+
): Promise<T | null | undefined> {
80+
const { payloadCodecs, payloadConverter } = dataConverter;
81+
const decoded = await decodeOptionalSingle(payloadCodecs, payload);
82+
if (decoded == null) return decoded;
83+
return payloadConverter.fromPayload(decoded);
84+
}
85+
7586
/**
7687
* Run {@link PayloadConverter.toPayload} on value, and then encode it.
7788
*/
@@ -80,6 +91,20 @@ export async function encodeToPayload(converter: LoadedDataConverter, value: unk
8091
return await encodeSingle(payloadCodecs, payloadConverter.toPayload(value));
8192
}
8293

94+
/**
95+
* Run {@link PayloadConverter.toPayload} on an optional value, and then encode it.
96+
*/
97+
export async function encodeOptionalToPayload(
98+
converter: LoadedDataConverter,
99+
value: unknown
100+
): Promise<Payload | null | undefined> {
101+
if (value == null) return value;
102+
103+
const { payloadConverter, payloadCodecs } = converter;
104+
const payload = payloadConverter.toPayload(value);
105+
return await encodeSingle(payloadCodecs, payload);
106+
}
107+
83108
/**
84109
* Decode `payloads` and then return {@link arrayFromPayloads}`.
85110
*/

0 commit comments

Comments
 (0)