Skip to content

Commit 3b3a6d5

Browse files
committed
init impl for special behaviour for temporal prefixes. Default signal test needs to be fixed, need to add behaviour reserving prefixes from workflows, and waiting for default update to be merged to add behaviour preventing default update handler to be called with reserved names
1 parent caae0a4 commit 3b3a6d5

File tree

9 files changed

+183
-9
lines changed

9 files changed

+183
-9
lines changed

packages/common/src/reserved.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
export const TEMPORAL_RESERVED_PREFIX = '__temporal_';
2+
export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace';
3+
export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace';
4+
5+
export const reservedPrefixes = [
6+
TEMPORAL_RESERVED_PREFIX,
7+
STACK_TRACE_RESERVED_PREFIX,
8+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
9+
];
10+
11+
export function throwIfReservedName(type: string, name: string): void {
12+
const prefix = isReservedName(name);
13+
if (prefix) {
14+
throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`);
15+
}
16+
}
17+
18+
export function isReservedName(name: string): string | undefined {
19+
for (const prefix of reservedPrefixes) {
20+
if (name.startsWith(prefix)) {
21+
return prefix;
22+
}
23+
}
24+
}

packages/test/src/test-integration-split-two.ts

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,16 @@ import { searchAttributePayloadConverter } from '@temporalio/common/lib/converte
1414
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
1515
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';
1616

17-
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
17+
import {
18+
condition,
19+
defineQuery,
20+
defineSignal,
21+
defineUpdate,
22+
setDefaultQueryHandler,
23+
setHandler,
24+
sleep,
25+
} from '@temporalio/workflow';
26+
import { reservedPrefixes } from '@temporalio/common/lib/reserved';
1827
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
1928
import * as activities from './activities';
2029
import * as workflows from './workflows';
@@ -763,3 +772,90 @@ test.serial('default query handler is not used if requested query exists', confi
763772
t.deepEqual(result, { name: definedQuery.name, args });
764773
});
765774
});
775+
776+
test('Cannot register activities using reserved prefixes', configMacro, async (t, config) => {
777+
const { createWorkerWithDefaults } = config;
778+
779+
for (const prefix of reservedPrefixes) {
780+
const activityName = prefix + '_test';
781+
await t.throwsAsync(
782+
createWorkerWithDefaults(t, {
783+
activities: { [activityName]: () => {} },
784+
}),
785+
{
786+
instanceOf: Error,
787+
message: `Cannot register activity name: '${activityName}', with reserved prefix: '${prefix}'`,
788+
}
789+
);
790+
}
791+
});
792+
793+
test('Cannot register task queues using reserved prefixes', configMacro, async (t, config) => {
794+
const { createWorkerWithDefaults } = config;
795+
796+
for (const prefix of reservedPrefixes) {
797+
const taskQueue = prefix + '_test';
798+
799+
await t.throwsAsync(
800+
createWorkerWithDefaults(t, {
801+
taskQueue,
802+
}),
803+
{
804+
instanceOf: Error,
805+
message: `Cannot register task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`,
806+
}
807+
);
808+
}
809+
});
810+
811+
interface HandlerError {
812+
name: string;
813+
message: string;
814+
}
815+
816+
export async function workflowBadPrefixHandler(prefix: string): Promise<HandlerError[]> {
817+
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
818+
const expectedErrors: HandlerError[] = [];
819+
try {
820+
setHandler(defineSignal(prefix + '_signal'), () => {});
821+
} catch (e) {
822+
if (e instanceof Error) {
823+
expectedErrors.push({ name: e.name, message: e.message });
824+
}
825+
}
826+
try {
827+
setHandler(defineUpdate(prefix + '_update'), () => {});
828+
} catch (e) {
829+
if (e instanceof Error) {
830+
expectedErrors.push({ name: e.name, message: e.message });
831+
}
832+
}
833+
try {
834+
setHandler(defineQuery(prefix + '_query'), () => {});
835+
} catch (e) {
836+
if (e instanceof Error) {
837+
expectedErrors.push({ name: e.name, message: e.message });
838+
}
839+
}
840+
return expectedErrors;
841+
}
842+
843+
test('Workflow failure if define signals/updates/queries with reserved prefixes', configMacro, async (t, config) => {
844+
const { env, createWorkerWithDefaults } = config;
845+
const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
846+
const worker = await createWorkerWithDefaults(t);
847+
await worker.runUntil(async () => {
848+
const prefix = reservedPrefixes[0];
849+
// for (const prefix of reservedPrefixes) {
850+
const result = await executeWorkflow(workflowBadPrefixHandler, {
851+
args: [prefix],
852+
});
853+
console.log('result', result);
854+
t.deepEqual(result, [
855+
{ name: 'Error', message: `Cannot register signal name: '${prefix}_signal', with reserved prefix: '${prefix}'` },
856+
{ name: 'Error', message: `Cannot register update name: '${prefix}_update', with reserved prefix: '${prefix}'` },
857+
{ name: 'Error', message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'` },
858+
]);
859+
// }
860+
});
861+
});

packages/test/src/test-workflows.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import { VMWorkflow, VMWorkflowCreator } from '@temporalio/worker/lib/workflow/v
2121
import { SdkFlag, SdkFlags } from '@temporalio/workflow/lib/flags';
2222
import { ReusableVMWorkflow, ReusableVMWorkflowCreator } from '@temporalio/worker/lib/workflow/reusable-vm';
2323
import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
24+
import {
25+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
26+
reservedPrefixes,
27+
STACK_TRACE_RESERVED_PREFIX,
28+
TEMPORAL_RESERVED_PREFIX,
29+
} from '@temporalio/common/lib/reserved';
2430
import * as activityFunctions from './activities';
2531
import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers';
2632
import { ProcessedSignal } from './workflows';

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,4 @@ export * from './workflow-cancellation-scenarios';
9292
export * from './upsert-and-read-memo';
9393
export * from './updates-ordering';
9494
export * from './wait-on-signal-then-activity';
95+
export * from './workflow-with-default-handlers';
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import {
2+
condition,
3+
defineSignal,
4+
setDefaultQueryHandler,
5+
setDefaultSignalHandler,
6+
setHandler,
7+
} from '@temporalio/workflow';
8+
9+
export async function workflowWithDefaultHandlers(): Promise<void> {
10+
const complete = true;
11+
setDefaultQueryHandler(() => {});
12+
setDefaultSignalHandler(() => {});
13+
setHandler(defineSignal('completeSignal'), () => {});
14+
15+
await condition(() => complete);
16+
}

packages/worker/src/worker-options.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow'
1414
import { LoggerSinks } from '@temporalio/workflow';
1515
import { Context } from '@temporalio/activity';
1616
import { native } from '@temporalio/core-bridge';
17+
import { checkExtends } from '@temporalio/common/lib/type-helpers';
18+
import { WorkerOptions as NativeWorkerOptions, WorkerTuner as NativeWorkerTuner } from '@temporalio/core-bridge';
19+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
1720
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
1821
import { NativeConnection } from './connection';
1922
import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors';
@@ -953,6 +956,9 @@ export function compileWorkerOptions(
953956
}
954957

955958
const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function'));
959+
for (const activityName of activities.keys()) {
960+
throwIfReservedName('activity', activityName);
961+
}
956962
const tuner = asNativeTuner(opts.tuner, logger);
957963

958964
return {

packages/worker/src/worker.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs';
5757
import { native } from '@temporalio/core-bridge';
5858
import { coresdk, temporal } from '@temporalio/proto';
5959
import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
60+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
6061
import { Activity, CancelReason, activityLogAttributes } from './activity';
6162
import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection';
6263
import { ActivityExecuteInput } from './interceptors';
@@ -469,6 +470,8 @@ export class Worker {
469470
public static async create(options: WorkerOptions): Promise<Worker> {
470471
const runtime = Runtime.instance();
471472
const logger = LoggerWithComposedMetadata.compose(runtime.logger, {
473+
throwIfReservedName('task queue', options.taskQueue);
474+
const logger = withMetadata(Runtime.instance().logger, {
472475
sdkComponent: SdkComponent.worker,
473476
taskQueue: options.taskQueue ?? 'default',
474477
});

packages/workflow/src/internals.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import {
3131
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
3232
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
3333
import type { coresdk, temporal } from '@temporalio/proto';
34+
import {
35+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
36+
STACK_TRACE_RESERVED_PREFIX,
37+
isReservedName,
38+
throwIfReservedName,
39+
} from '@temporalio/common/lib/reserved';
3440
import { alea, RNG } from './alea';
3541
import { RootCancellationScope } from './cancellation-scope';
3642
import { UpdateScope } from './update-scope';
@@ -260,7 +266,7 @@ export class Activator implements ActivationHandler {
260266
*/
261267
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
262268
[
263-
'__stack_trace',
269+
STACK_TRACE_RESERVED_PREFIX,
264270
{
265271
handler: () => {
266272
return this.getStackTraces()
@@ -271,7 +277,7 @@ export class Activator implements ActivationHandler {
271277
},
272278
],
273279
[
274-
'__enhanced_stack_trace',
280+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
275281
{
276282
handler: (): EnhancedStackTrace => {
277283
const { sourceMap } = this;
@@ -679,17 +685,28 @@ export class Activator implements ActivationHandler {
679685
throw new TypeError('Missing query activation attributes');
680686
}
681687

688+
const queryInput = {
689+
queryName: queryType,
690+
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
691+
queryId,
692+
headers: headers ?? {},
693+
};
694+
695+
// Skip interceptors if this is an internal query.
696+
if (isReservedName(queryType)) {
697+
this.queryWorkflowNextHandler(queryInput).then(
698+
(result) => this.completeQuery(queryId, result),
699+
(reason) => this.failQuery(queryId, reason)
700+
);
701+
return;
702+
}
703+
682704
const execute = composeInterceptors(
683705
this.interceptors.inbound,
684706
'handleQuery',
685707
this.queryWorkflowNextHandler.bind(this)
686708
);
687-
execute({
688-
queryName: queryType,
689-
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
690-
queryId,
691-
headers: headers ?? {},
692-
}).then(
709+
execute(queryInput).then(
693710
(result) => this.completeQuery(queryId, result),
694711
(reason) => this.failQuery(queryId, reason)
695712
);
@@ -847,6 +864,8 @@ export class Activator implements ActivationHandler {
847864
if (fn) {
848865
return await fn(...args);
849866
} else if (this.defaultSignalHandler) {
867+
// Do not call default signal handler with reserved signal name.
868+
throwIfReservedName('signal', signalName);
850869
return await this.defaultSignalHandler(signalName, ...args);
851870
} else {
852871
throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`);

packages/workflow/src/workflow.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten
3333
import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time';
3434
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
3535
import { temporal } from '@temporalio/proto';
36+
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
3637
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
3738
import { UpdateScope } from './update-scope';
3839
import {
@@ -1272,6 +1273,8 @@ export function setHandler<
12721273
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
12731274
): void {
12741275
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
1276+
// Cannot register handler for reserved names
1277+
throwIfReservedName(def.type, def.name);
12751278
const description = options?.description;
12761279
if (def.type === 'update') {
12771280
if (typeof handler === 'function') {

0 commit comments

Comments
 (0)