@@ -2,7 +2,7 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises';
22import { randomUUID } from 'crypto' ;
33import { ExecutionContext } from 'ava' ;
44import { firstValueFrom , Subject } from 'rxjs' ;
5- import { WorkflowFailedError } from '@temporalio/client' ;
5+ import { WorkflowFailedError , WorkflowHandle } from '@temporalio/client' ;
66import * as activity from '@temporalio/activity' ;
77import { msToNumber , tsToMs } from '@temporalio/common/lib/time' ;
88import { TestWorkflowEnvironment } from '@temporalio/testing' ;
@@ -11,6 +11,7 @@ import * as workflow from '@temporalio/workflow';
1111import { defineQuery , defineSignal } from '@temporalio/workflow' ;
1212import { SdkFlags } from '@temporalio/workflow/lib/flags' ;
1313import {
14+ ActivityCancellationDetails ,
1415 ActivityCancellationType ,
1516 ApplicationFailure ,
1617 defineSearchAttributeKey ,
@@ -22,9 +23,17 @@ import {
2223import { signalSchedulingWorkflow } from './activities/helpers' ;
2324import { activityStartedSignal } from './workflows/definitions' ;
2425import * as workflows from './workflows' ;
25- import { Context , createLocalTestEnvironment , helpers , makeTestFunction } from './helpers-integration' ;
26+ import {
27+ assertPendingActivityExistsEventually ,
28+ Context ,
29+ createLocalTestEnvironment ,
30+ helpers ,
31+ makeTestFunction ,
32+ setActivityPauseState ,
33+ } from './helpers-integration' ;
2634import { overrideSdkInternalFlag } from './mock-internal-flags' ;
2735import { asSdkLoggerSink , loadHistory , RUN_TIME_SKIPPING_TESTS , waitUntil } from './helpers' ;
36+ import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details' ;
2837
2938const test = makeTestFunction ( {
3039 workflowsPath : __filename ,
@@ -1414,3 +1423,118 @@ test('Workflow can return root workflow', async (t) => {
14141423 t . deepEqual ( result , 'empty test-root-workflow-length' ) ;
14151424 } ) ;
14161425} ) ;
1426+
1427+ export async function heartbeatPauseWorkflow (
1428+ activityId : string ,
1429+ catchErr : boolean ,
1430+ maximumAttempts : number
1431+ ) : Promise < Array < ActivityCancellationDetails | undefined > > {
1432+ const { heartbeatCancellationDetailsActivity } = workflow . proxyActivities ( {
1433+ startToCloseTimeout : '5s' ,
1434+ activityId,
1435+ retry : {
1436+ maximumAttempts,
1437+ } ,
1438+ } ) ;
1439+ const { heartbeatCancellationDetailsActivity2 } = workflow . proxyActivities ( {
1440+ startToCloseTimeout : '5s' ,
1441+ activityId : `${ activityId } -2` ,
1442+ retry : {
1443+ maximumAttempts,
1444+ } ,
1445+ } ) ;
1446+ const results = [ ] ;
1447+ results . push (
1448+ await heartbeatCancellationDetailsActivity ( catchErr ) ,
1449+ await heartbeatCancellationDetailsActivity2 ( catchErr )
1450+ ) ;
1451+ return results ;
1452+ }
1453+
1454+ test ( 'Activity pause returns expected cancellation details' , async ( t ) => {
1455+ const { createWorker, startWorkflow } = helpers ( t ) ;
1456+ const worker = await createWorker ( {
1457+ activities : {
1458+ heartbeatCancellationDetailsActivity,
1459+ heartbeatCancellationDetailsActivity2 : heartbeatCancellationDetailsActivity ,
1460+ } ,
1461+ maxHeartbeatThrottleInterval : '200ms' ,
1462+ defaultHeartbeatThrottleInterval : '200ms' ,
1463+ } ) ;
1464+
1465+ await worker . runUntil ( async ( ) => {
1466+ const testActivityId = randomUUID ( ) ;
1467+ const handle = await startWorkflow ( heartbeatPauseWorkflow , { args : [ testActivityId , true , 1 ] } ) ;
1468+
1469+ const activityInfo = await assertPendingActivityExistsEventually ( handle , testActivityId , 5000 ) ;
1470+ t . true ( activityInfo . paused === false ) ;
1471+ await setActivityPauseState ( handle , testActivityId , true ) ;
1472+ const activityInfo2 = await assertPendingActivityExistsEventually ( handle , `${ testActivityId } -2` , 5000 ) ;
1473+ t . true ( activityInfo2 . paused === false ) ;
1474+ await setActivityPauseState ( handle , `${ testActivityId } -2` , true ) ;
1475+ const result = await handle . result ( ) ;
1476+ t . deepEqual ( result [ 0 ] , {
1477+ cancelRequested : false ,
1478+ notFound : false ,
1479+ paused : true ,
1480+ timedOut : false ,
1481+ workerShutdown : false ,
1482+ reset : false ,
1483+ } ) ;
1484+ t . deepEqual ( result [ 1 ] , {
1485+ cancelRequested : false ,
1486+ notFound : false ,
1487+ paused : true ,
1488+ timedOut : false ,
1489+ workerShutdown : false ,
1490+ reset : false ,
1491+ } ) ;
1492+ } ) ;
1493+ } ) ;
1494+
1495+ test ( 'Activity can pause and unpause' , async ( t ) => {
1496+ const { createWorker, startWorkflow } = helpers ( t ) ;
1497+ async function checkHeartbeatDetailsExist ( handle : WorkflowHandle , activityId : string ) {
1498+ const activityInfo = await assertPendingActivityExistsEventually ( handle , activityId , 3000 ) ;
1499+ if ( activityInfo . heartbeatDetails ?. payloads ) {
1500+ for ( const payload of activityInfo . heartbeatDetails ?. payloads || [ ] ) {
1501+ if ( payload . data && payload . data ?. length > 0 ) {
1502+ return true ;
1503+ }
1504+ }
1505+ }
1506+ return false ;
1507+ }
1508+
1509+ const worker = await createWorker ( {
1510+ activities : {
1511+ heartbeatCancellationDetailsActivity,
1512+ heartbeatCancellationDetailsActivity2 : heartbeatCancellationDetailsActivity ,
1513+ } ,
1514+ maxHeartbeatThrottleInterval : '200ms' ,
1515+ defaultHeartbeatThrottleInterval : '200ms' ,
1516+ } ) ;
1517+
1518+ await worker . runUntil ( async ( ) => {
1519+ const testActivityId = randomUUID ( ) ;
1520+ const handle = await startWorkflow ( heartbeatPauseWorkflow , { args : [ testActivityId , false , 2 ] } ) ;
1521+ const activityInfo = await assertPendingActivityExistsEventually ( handle , testActivityId , 5000 ) ;
1522+ t . true ( activityInfo . paused === false ) ;
1523+ await setActivityPauseState ( handle , testActivityId , true ) ;
1524+ await waitUntil ( async ( ) => {
1525+ return await checkHeartbeatDetailsExist ( handle , testActivityId ) ;
1526+ } , 3000 ) ;
1527+ await setActivityPauseState ( handle , testActivityId , false ) ;
1528+ const activityInfo2 = await assertPendingActivityExistsEventually ( handle , `${ testActivityId } -2` , 5000 ) ;
1529+ t . true ( activityInfo2 . paused === false ) ;
1530+ await setActivityPauseState ( handle , `${ testActivityId } -2` , true ) ;
1531+ await waitUntil ( async ( ) => {
1532+ return await checkHeartbeatDetailsExist ( handle , `${ testActivityId } -2` ) ;
1533+ } , 3000 ) ;
1534+ await setActivityPauseState ( handle , `${ testActivityId } -2` , false ) ;
1535+ const result = await handle . result ( ) ;
1536+ // Undefined values are converted to null by data converter.
1537+ t . true ( result [ 0 ] === null ) ;
1538+ t . true ( result [ 1 ] === null ) ;
1539+ } ) ;
1540+ } ) ;
0 commit comments