Skip to content

Commit 0d05950

Browse files
committed
improvements
1 parent 3b3a6d5 commit 0d05950

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

packages/common/src/reserved.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,21 @@ export const reservedPrefixes = [
88
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
99
];
1010

11+
export class ReservedPrefixError extends Error {
12+
constructor(type: string, name: string, prefix: string) {
13+
super(`Cannot use ${type} name: '${name}', with reserved prefix: '${prefix}'`);
14+
this.name = 'ReservedPrefixError';
15+
}
16+
}
17+
1118
export function throwIfReservedName(type: string, name: string): void {
12-
const prefix = isReservedName(name);
19+
const prefix = maybeGetReservedPrefix(name);
1320
if (prefix) {
14-
throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`);
21+
throw new ReservedPrefixError(type, name, prefix);
1522
}
1623
}
1724

18-
export function isReservedName(name: string): string | undefined {
25+
export function maybeGetReservedPrefix(name: string): string | undefined {
1926
for (const prefix of reservedPrefixes) {
2027
if (name.startsWith(prefix)) {
2128
return prefix;

packages/test/src/test-integration-split-two.ts

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import asyncRetry from 'async-retry';
33
import { v4 as uuid4 } from 'uuid';
44
import * as iface from '@temporalio/proto';
5-
import { WorkflowContinuedAsNewError, WorkflowFailedError } from '@temporalio/client';
5+
import { WorkflowContinuedAsNewError, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
66
import {
77
ApplicationFailure,
88
defaultPayloadConverter,
@@ -20,6 +20,7 @@ import {
2020
defineSignal,
2121
defineUpdate,
2222
setDefaultQueryHandler,
23+
setDefaultSignalHandler,
2324
setHandler,
2425
sleep,
2526
} from '@temporalio/workflow';
@@ -859,3 +860,71 @@ test('Workflow failure if define signals/updates/queries with reserved prefixes'
859860
// }
860861
});
861862
});
863+
864+
export async function workflowWithDefaultHandlers(): Promise<void> {
865+
let unblocked = false;
866+
setHandler(defineSignal('unblock'), () => {
867+
unblocked = true;
868+
});
869+
870+
setDefaultQueryHandler(() => {});
871+
setDefaultSignalHandler(() => {});
872+
setDefaultUpdateHandler({
873+
handler: () => {},
874+
});
875+
876+
await condition(() => unblocked);
877+
}
878+
879+
test('Default handlers fail WFT given reserved prefix', configMacro, async (t, config) => {
880+
const { env, createWorkerWithDefaults } = config;
881+
const { startWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
882+
const worker = await createWorkerWithDefaults(t);
883+
884+
const assertWftFailure = async (
885+
handle: WorkflowHandle,
886+
name: string,
887+
prefix: string,
888+
handlerType: 'query' | 'signal' | 'update'
889+
) => {
890+
await asyncRetry(
891+
async () => {
892+
const history = await handle.fetchHistory();
893+
const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes);
894+
if (wftFailedEvent === undefined) {
895+
throw new Error('No WFT failed event found');
896+
}
897+
const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {};
898+
if (!failure) {
899+
return t.fail('Expected failure in workflowTaskFailedEventAttributes');
900+
}
901+
t.is(failure.message, `Cannot use ${handlerType} name: '${name}', with reserved prefix: '${prefix}'`);
902+
},
903+
{ minTimeout: 300, factor: 1, retries: 10 }
904+
);
905+
};
906+
907+
await worker.runUntil(async () => {
908+
for (const prefix of reservedPrefixes) {
909+
// Test Query
910+
let handle = await startWorkflow(workflowWithDefaultHandlers);
911+
const queryName = `${prefix}_query`;
912+
await t.throwsAsync(handle.query(queryName), undefined, `Query ${queryName} should fail`);
913+
await assertWftFailure(handle, queryName, prefix, 'query');
914+
await handle.terminate();
915+
// Test Signal
916+
handle = await startWorkflow(workflowWithDefaultHandlers);
917+
const signalName = `${prefix}_signal`;
918+
await handle.signal(signalName);
919+
await assertWftFailure(handle, signalName, prefix, 'signal');
920+
await handle.terminate();
921+
922+
// Test Update
923+
handle = await startWorkflow(workflowWithDefaultHandlers);
924+
const updateName = `${prefix}_update`;
925+
await t.throwsAsync(handle.executeUpdate(updateName), undefined, `Update ${updateName} should fail`);
926+
await assertWftFailure(handle, updateName, prefix, 'update');
927+
await handle.terminate();
928+
}
929+
});
930+
});

packages/workflow/src/internals.ts

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflo
3333
import type { coresdk, temporal } from '@temporalio/proto';
3434
import {
3535
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
36+
ReservedPrefixError,
3637
STACK_TRACE_RESERVED_PREFIX,
37-
isReservedName,
38+
maybeGetReservedPrefix,
3839
throwIfReservedName,
3940
} from '@temporalio/common/lib/reserved';
4041
import { alea, RNG } from './alea';
@@ -685,28 +686,27 @@ export class Activator implements ActivationHandler {
685686
throw new TypeError('Missing query activation attributes');
686687
}
687688

688-
const queryInput = {
689-
queryName: queryType,
690-
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
691-
queryId,
692-
headers: headers ?? {},
693-
};
694-
695-
// Skip interceptors if this is an internal query.
696-
if (isReservedName(queryType)) {
697-
this.queryWorkflowNextHandler(queryInput).then(
698-
(result) => this.completeQuery(queryId, result),
699-
(reason) => this.failQuery(queryId, reason)
700-
);
701-
return;
689+
const reservedPrefix = maybeGetReservedPrefix(queryType)
690+
if (reservedPrefix) {
691+
// Must have (internal) query handler for reserved query.
692+
if (!this.queryHandlers.has(queryType)) {
693+
throw new ReservedPrefixError('query', queryType, reservedPrefix);
694+
}
702695
}
703696

697+
// Skip interceptors if it is an internal query
698+
let interceptors = reservedPrefix ? [] : this.interceptors.inbound
704699
const execute = composeInterceptors(
705-
this.interceptors.inbound,
700+
interceptors,
706701
'handleQuery',
707702
this.queryWorkflowNextHandler.bind(this)
708703
);
709-
execute(queryInput).then(
704+
execute({
705+
queryName: queryType,
706+
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
707+
queryId,
708+
headers: headers ?? {},
709+
}).then(
710710
(result) => this.completeQuery(queryId, result),
711711
(reason) => this.failQuery(queryId, reason)
712712
);
@@ -737,6 +737,11 @@ export class Activator implements ActivationHandler {
737737

738738
// If we don't have an entry from either source, buffer and return
739739
if (entry === null) {
740+
const reservedPrefix = maybeGetReservedPrefix(name);
741+
if (reservedPrefix) {
742+
// Must have (internal) update handler for reserved update.
743+
throw new ReservedPrefixError('update', name, reservedPrefix);
744+
}
740745
this.bufferedUpdates.push(activation);
741746
return;
742747
}
@@ -864,8 +869,6 @@ export class Activator implements ActivationHandler {
864869
if (fn) {
865870
return await fn(...args);
866871
} else if (this.defaultSignalHandler) {
867-
// Do not call default signal handler with reserved signal name.
868-
throwIfReservedName('signal', signalName);
869872
return await this.defaultSignalHandler(signalName, ...args);
870873
} else {
871874
throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`);
@@ -878,6 +881,14 @@ export class Activator implements ActivationHandler {
878881
throw new TypeError('Missing activation signalName');
879882
}
880883

884+
const reservedPrefix = maybeGetReservedPrefix(signalName);
885+
if (reservedPrefix) {
886+
if (!this.signalHandlers.has(signalName)) {
887+
// Must have (internal) signal handler for reserved signal.
888+
throw new ReservedPrefixError('signal', signalName, reservedPrefix);
889+
}
890+
}
891+
881892
if (!this.signalHandlers.has(signalName) && !this.defaultSignalHandler) {
882893
this.bufferedSignals.push(activation);
883894
return;

0 commit comments

Comments
 (0)