Skip to content

Commit 051ea89

Browse files
committed
address pr feedback, add pause + reset test
1 parent 1aeb11a commit 051ea89

File tree

6 files changed

+59
-88
lines changed

6 files changed

+59
-88
lines changed

packages/client/src/async-completion-client.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,13 @@ export class AsyncCompletionClient extends BaseClient {
252252
} catch (err) {
253253
this.handleError(err);
254254
}
255+
// Note that it is possible for a heartbeat response to have multiple fields
256+
// set as true (i.e. cancelled and pause).
255257
if (cancelRequested) {
256258
throw new ActivityCancelledError('cancelled');
257-
}
258-
if (reset) {
259+
} else if (reset) {
259260
throw new ActivityResetError('reset');
260-
}
261-
if (paused) {
261+
} else if (paused) {
262262
throw new ActivityPausedError('paused');
263263
}
264264
}

packages/core-bridge/Cargo.lock

Lines changed: 0 additions & 74 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/sdk-core

Submodule sdk-core updated 128 files

packages/core-bridge/src/worker.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -672,10 +672,8 @@ mod config {
672672
self.local_activity_task_slot_supplier
673673
.into_slot_supplier(&mut rbo),
674674
);
675-
tuner_holder.nexus_slot_options(
676-
self.nexus_task_slot_supplier
677-
.into_slot_supplier(&mut rbo)
678-
);
675+
tuner_holder
676+
.nexus_slot_options(self.nexus_task_slot_supplier.into_slot_supplier(&mut rbo));
679677
if let Some(rbo) = rbo {
680678
tuner_holder.resource_based_options(rbo);
681679
}

packages/test/src/helpers-integration.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ export function configurableHelpers<T>(
299299
export async function setActivityState(
300300
handle: WorkflowHandle,
301301
activityId: string,
302-
state: 'pause' | 'unpause' | 'reset'
302+
state: 'pause' | 'unpause' | 'reset' | 'pause & reset'
303303
): Promise<void> {
304304
const desc = await handle.describe();
305305
const req = {
@@ -314,9 +314,11 @@ export async function setActivityState(
314314
await handle.client.workflowService.pauseActivity(req);
315315
} else if (state === 'unpause') {
316316
await handle.client.workflowService.unpauseActivity(req);
317+
} else if (state === 'reset') {
318+
await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true });
317319
} else {
318-
const resetReq = { ...req, resetHeartbeat: true };
319-
await handle.client.workflowService.resetActivity(resetReq);
320+
await handle.client.workflowService.pauseActivity(req);
321+
await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true });
320322
}
321323
await waitUntil(async () => {
322324
const { raw } = await handle.describe();
@@ -325,17 +327,30 @@ export async function setActivityState(
325327
// • paused flag is true OR
326328
// • the activity vanished (it completed / retried)
327329
if (state === 'pause') {
328-
return activityInfo ? activityInfo.paused ?? false : true;
330+
if (!activityInfo) {
331+
return true; // Activity vanished (completed/retried)
332+
}
333+
return activityInfo.paused ?? false;
329334
} else if (state === 'unpause') {
330335
// If we are unpausing: success when either
331336
// • paused flag is false OR
332337
// • the activity vanished (already completed)
333338
return activityInfo ? !activityInfo.paused : true;
334-
} else {
339+
} else if (state === 'reset') {
335340
// If we are resetting, success when either
336341
// • heartbeat details have been reset OR
337342
// • the activity vanished (completed / retried)
338343
return activityInfo ? activityInfo.heartbeatDetails === null : true;
344+
} else {
345+
// If we are pausing & resetting, success when either
346+
// • activity is paused AND heartbeat details have been reset OR
347+
// • the activity vanished (completed / retried)
348+
if (!activityInfo) {
349+
return true; // Activity vanished (completed/retried)
350+
}
351+
const isPaused = activityInfo.paused ?? false;
352+
const isHeartbeatReset = activityInfo.heartbeatDetails === null;
353+
return isPaused && isHeartbeatReset;
339354
}
340355
}, 15000);
341356
}

packages/test/src/test-integration-workflows.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,38 @@ test('Activity reset returns expected cancellation details', async (t) => {
15351535
});
15361536
});
15371537

1538+
test('Activity set as both paused and reset returns expected cancellation details', async (t) => {
1539+
const { createWorker, startWorkflow } = helpers(t);
1540+
const worker = await createWorker({
1541+
activities: {
1542+
heartbeatCancellationDetailsActivity,
1543+
},
1544+
});
1545+
1546+
await worker.runUntil(async () => {
1547+
const testActivityId = randomUUID();
1548+
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1549+
1550+
// Wait for it to exist and heartbeat
1551+
await waitUntil(async () => {
1552+
const { raw } = await handle.describe();
1553+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1554+
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1555+
}, 10000);
1556+
1557+
await setActivityState(handle, testActivityId, 'pause & reset');
1558+
const result = await handle.result();
1559+
t.deepEqual(result, {
1560+
cancelRequested: false,
1561+
notFound: false,
1562+
paused: true,
1563+
timedOut: false,
1564+
workerShutdown: false,
1565+
reset: true,
1566+
});
1567+
});
1568+
});
1569+
15381570
const reservedNames = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_QUERY_NAME, ENHANCED_STACK_TRACE_QUERY_NAME];
15391571

15401572
test('Cannot register activities using reserved prefixes', async (t) => {

0 commit comments

Comments
 (0)