diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index 8a481bcc2..36cd894b0 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -42,6 +42,13 @@ export class ActivityCancelledError extends Error {} @SymbolBasedInstanceOfError('ActivityPausedError') export class ActivityPausedError extends Error {} +/** + * Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity + * has been reset. + */ +@SymbolBasedInstanceOfError('ActivityResetError') +export class ActivityResetError extends Error {} + /** * Options used to configure {@link AsyncCompletionClient} */ @@ -219,6 +226,7 @@ export class AsyncCompletionClient extends BaseClient { const payloads = await encodeToPayloads(this.dataConverter, details); let cancelRequested = false; let paused = false; + let reset = false; try { if (taskTokenOrFullActivityId instanceof Uint8Array) { const response = await this.workflowService.recordActivityTaskHeartbeat({ @@ -229,6 +237,7 @@ export class AsyncCompletionClient extends BaseClient { }); cancelRequested = !!response.cancelRequested; paused = !!response.activityPaused; + reset = !!response.activityReset; } else { const response = await this.workflowService.recordActivityTaskHeartbeatById({ identity: this.options.identity, @@ -238,14 +247,18 @@ export class AsyncCompletionClient extends BaseClient { }); cancelRequested = !!response.cancelRequested; paused = !!response.activityPaused; + reset = !!response.activityReset; } } catch (err) { this.handleError(err); } + // Note that it is possible for a heartbeat response to have multiple fields + // set as true (i.e. cancelled and pause). if (cancelRequested) { throw new ActivityCancelledError('cancelled'); - } - if (paused) { + } else if (reset) { + throw new ActivityResetError('reset'); + } else if (paused) { throw new ActivityPausedError('paused'); } } diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 9c8384288..ca6131f92 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -672,10 +672,8 @@ mod config { self.local_activity_task_slot_supplier .into_slot_supplier(&mut rbo), ); - tuner_holder.nexus_slot_options( - self.nexus_task_slot_supplier - .into_slot_supplier(&mut rbo) - ); + tuner_holder + .nexus_slot_options(self.nexus_task_slot_supplier.into_slot_supplier(&mut rbo)); if let Some(rbo) = rbo { tuner_holder.resource_based_options(rbo); } diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index b8cbbf48c..3e73562c1 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -296,7 +296,11 @@ export function configurableHelpers( }; } -export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise { +export async function setActivityState( + handle: WorkflowHandle, + activityId: string, + state: 'pause' | 'unpause' | 'reset' | 'pause & reset' +): Promise { const desc = await handle.describe(); const req = { namespace: handle.client.options.namespace, @@ -306,10 +310,17 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: }, id: activityId, }; - if (pause) { + if (state === 'pause') { await handle.client.workflowService.pauseActivity(req); - } else { + } else if (state === 'unpause') { await handle.client.workflowService.unpauseActivity(req); + } else if (state === 'reset') { + await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }); + } else { + await Promise.all([ + handle.client.workflowService.pauseActivity(req), + handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }), + ]); } await waitUntil(async () => { const { raw } = await handle.describe(); @@ -317,11 +328,32 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: // If we are pausing: success when either // • paused flag is true OR // • the activity vanished (it completed / retried) - if (pause) return activityInfo ? activityInfo.paused ?? false : true; - // If we are unpausing: success when either - // • paused flag is false OR - // • the activity vanished (already completed) - return activityInfo ? !activityInfo.paused : true; + if (state === 'pause') { + if (!activityInfo) { + return true; // Activity vanished (completed/retried) + } + return activityInfo.paused ?? false; + } else if (state === 'unpause') { + // If we are unpausing: success when either + // • paused flag is false OR + // • the activity vanished (already completed) + return activityInfo ? !activityInfo.paused : true; + } else if (state === 'reset') { + // If we are resetting, success when either + // • heartbeat details have been reset OR + // • the activity vanished (completed / retried) + return activityInfo ? activityInfo.heartbeatDetails === null : true; + } else { + // If we are pausing & resetting, success when either + // • activity is paused AND heartbeat details have been reset OR + // • the activity vanished (completed / retried) + if (!activityInfo) { + return true; // Activity vanished (completed/retried) + } + const isPaused = activityInfo.paused ?? false; + const isHeartbeatReset = activityInfo.heartbeatDetails === null; + return isPaused && isHeartbeatReset; + } }, 15000); } diff --git a/packages/test/src/test-integration-split-three.ts b/packages/test/src/test-integration-split-three.ts index 8fa1a29d0..cf553c58f 100644 --- a/packages/test/src/test-integration-split-three.ts +++ b/packages/test/src/test-integration-split-three.ts @@ -156,7 +156,6 @@ test( await worker.runUntil(handle.result()); let firstChild = true; const history = await handle.fetchHistory(); - console.log('events'); for (const event of history?.events ?? []) { switch (event.eventType) { case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED: diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 976849482..00bcf8b63 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -45,7 +45,7 @@ import { hasActivityHeartbeat, helpers, makeTestFunction, - setActivityPauseState, + setActivityState, } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; @@ -1459,7 +1459,7 @@ test('Activity pause returns expected cancellation details', async (t) => { }, 10000); // Now pause the activity - await setActivityPauseState(handle, testActivityId, true); + await setActivityState(handle, testActivityId, 'pause'); // Get the result - should contain pause cancellation details const result = await handle.result(); @@ -1494,15 +1494,79 @@ test('Activity can be cancelled via pause and retry after unpause', async (t) => return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); }, 10000); - await setActivityPauseState(handle, testActivityId, true); + await setActivityState(handle, testActivityId, 'pause'); await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000); - await setActivityPauseState(handle, testActivityId, false); + await setActivityState(handle, testActivityId, 'unpause'); const result = await handle.result(); t.true(result == null); }); }); +test('Activity reset returns expected cancellation details', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); + + // Wait for it to exist and heartbeat + await waitUntil(async () => { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); + return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); + }, 10000); + + await setActivityState(handle, testActivityId, 'reset'); + const result = await handle.result(); + t.deepEqual(result, { + cancelRequested: false, + notFound: false, + paused: false, + timedOut: false, + workerShutdown: false, + reset: true, + }); + }); +}); + +test('Activity set as both paused and reset returns expected cancellation details', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); + + // Wait for it to exist and heartbeat + await waitUntil(async () => { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); + return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); + }, 10000); + + await setActivityState(handle, testActivityId, 'pause & reset'); + const result = await handle.result(); + t.deepEqual(result, { + cancelRequested: false, + notFound: false, + paused: true, + timedOut: false, + workerShutdown: false, + reset: true, + }); + }); +}); + const reservedNames = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_QUERY_NAME, ENHANCED_STACK_TRACE_QUERY_NAME]; test('Cannot register activities using reserved prefixes', async (t) => { diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 7ba463dde..09bea0394 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -149,9 +149,14 @@ export class Activity { (error instanceof CancelledFailure || isAbortError(error)) && this.context.cancellationSignal.aborted ) { - if (this.context.cancellationDetails?.paused) { + if (this.context.cancellationDetails?.cancelRequested) { + this.workerLogger.debug('Activity completed as cancelled', { durationMs }); + } else if (this.context.cancellationDetails?.reset) { + this.workerLogger.debug('Activity reset', { durationMs }); + } else if (this.context.cancellationDetails?.paused) { this.workerLogger.debug('Activity paused', { durationMs }); } else { + // Fallback log - completed as cancelled. this.workerLogger.debug('Activity completed as cancelled', { durationMs }); } } else if (error instanceof CompleteAsyncError) { @@ -204,8 +209,17 @@ export class Activity { } else if (this.cancelReason) { // Either a CancelledFailure that we threw or AbortError from AbortController if (err instanceof CancelledFailure) { - // If cancel due to activity pause, emit an application failure for the pause. - if (this.context.cancellationDetails?.paused) { + // If cancel due to activity pause or reset, emit an application failure. + if (this.context.cancellationDetails?.reset) { + return { + failed: { + failure: await encodeErrorToFailure( + this.dataConverter, + new ApplicationFailure('Activity reset', 'ActivityReset') + ), + }, + }; + } else if (this.context.cancellationDetails?.paused) { return { failed: { failure: await encodeErrorToFailure( diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 067451ef6..9a949235d 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1036,7 +1036,7 @@ export class Worker { details, onError() { // activity must be defined - // empty cancellation details, not corresponding detail for heartbeat detail conversion failure + // empty cancellation details, no corresponding detail for heartbeat detail conversion failure activity?.cancel( 'HEARTBEAT_DETAILS_CONVERSION_FAILED', ActivityCancellationDetails.fromProto(undefined)