Skip to content

Commit 7a5a00c

Browse files
authored
Merge branch 'master' into cjh-otel
2 parents 3d26191 + a6b3157 commit 7a5a00c

File tree

6 files changed

+256
-32
lines changed

6 files changed

+256
-32
lines changed

core/src/core_tests/activity_tasks.rs

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,8 +1105,9 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
11051105
assert_matches!(
11061106
cancel.variant,
11071107
Some(activity_task::Variant::Cancel(Cancel {
1108-
reason: r
1109-
})) if r == ActivityCancelReason::WorkerShutdown as i32
1108+
reason,
1109+
details
1110+
})) if reason == ActivityCancelReason::WorkerShutdown as i32 && details.as_ref().is_some_and(|d| d.is_worker_shutdown)
11101111
);
11111112
seen_tts.insert(cancel.task_token);
11121113
}
@@ -1241,3 +1242,163 @@ async fn pass_activity_summary_to_metadata() {
12411242
.unwrap();
12421243
worker.run_until_done().await.unwrap();
12431244
}
1245+
1246+
#[tokio::test]
1247+
async fn heartbeat_response_can_be_paused() {
1248+
let mut mock_client = mock_workflow_client();
1249+
// First heartbeat returns pause only
1250+
mock_client
1251+
.expect_record_activity_heartbeat()
1252+
.times(1)
1253+
.returning(|_, _| {
1254+
Ok(RecordActivityTaskHeartbeatResponse {
1255+
cancel_requested: false,
1256+
activity_paused: true,
1257+
})
1258+
});
1259+
// Second heartbeat returns cancel only
1260+
mock_client
1261+
.expect_record_activity_heartbeat()
1262+
.times(1)
1263+
.returning(|_, _| {
1264+
Ok(RecordActivityTaskHeartbeatResponse {
1265+
cancel_requested: true,
1266+
activity_paused: false,
1267+
})
1268+
});
1269+
// Third heartbeat returns both
1270+
mock_client
1271+
.expect_record_activity_heartbeat()
1272+
.times(1)
1273+
.returning(|_, _| {
1274+
Ok(RecordActivityTaskHeartbeatResponse {
1275+
cancel_requested: true,
1276+
activity_paused: true,
1277+
})
1278+
});
1279+
mock_client
1280+
.expect_cancel_activity_task()
1281+
.times(3)
1282+
.returning(|_, _| Ok(RespondActivityTaskCanceledResponse::default()));
1283+
1284+
let core = mock_worker(MocksHolder::from_client_with_activities(
1285+
mock_client,
1286+
[
1287+
PollActivityTaskQueueResponse {
1288+
task_token: vec![1],
1289+
activity_id: "act1".to_string(),
1290+
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
1291+
..Default::default()
1292+
}
1293+
.into(),
1294+
PollActivityTaskQueueResponse {
1295+
task_token: vec![2],
1296+
activity_id: "act2".to_string(),
1297+
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
1298+
..Default::default()
1299+
}
1300+
.into(),
1301+
PollActivityTaskQueueResponse {
1302+
task_token: vec![3],
1303+
activity_id: "act3".to_string(),
1304+
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
1305+
..Default::default()
1306+
}
1307+
.into(),
1308+
],
1309+
));
1310+
1311+
// The general testing pattern for each of these cases is:
1312+
// 1. Poll for activity task
1313+
// 2. Record activity heartbeat, get mocked heartbeat response
1314+
// 3. Sleep for 10ms (waiting for heartbeat request to be flushed)
1315+
// (i.e. sleep enough for the heartbeat flush interval to have elapsed)
1316+
// 4. Poll for activity task.
1317+
// We expect a cancellation activity task as they are prioritized (i.e. ordered before)
1318+
// regular activity tasks.
1319+
// 5. Assert that the received activity task is indeed a cancellation, with the reason
1320+
// and details we expect.
1321+
// 6. Complete the activity with a cancellation result.
1322+
//
1323+
// Repeat for subsequent test case(s).
1324+
1325+
// Test pause only
1326+
let act = core.poll_activity_task().await.unwrap();
1327+
core.record_activity_heartbeat(ActivityHeartbeat {
1328+
task_token: act.task_token.clone(),
1329+
details: vec![vec![1_u8, 2, 3].into()],
1330+
});
1331+
sleep(Duration::from_millis(10)).await;
1332+
let act = core.poll_activity_task().await.unwrap();
1333+
assert_matches!(
1334+
&act,
1335+
ActivityTask {
1336+
task_token,
1337+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1338+
} if
1339+
task_token == &vec![1] &&
1340+
*reason == ActivityCancelReason::Paused as i32 &&
1341+
details.as_ref().is_some_and(|d| d.is_paused) &&
1342+
details.as_ref().is_some_and(|d| !d.is_cancelled)
1343+
);
1344+
core.complete_activity_task(ActivityTaskCompletion {
1345+
task_token: act.task_token,
1346+
result: Some(ActivityExecutionResult::cancel_from_details(None)),
1347+
})
1348+
.await
1349+
.unwrap();
1350+
1351+
// Test cancel only
1352+
let act = core.poll_activity_task().await.unwrap();
1353+
core.record_activity_heartbeat(ActivityHeartbeat {
1354+
task_token: act.task_token.clone(),
1355+
details: vec![vec![1_u8, 2, 3].into()],
1356+
});
1357+
sleep(Duration::from_millis(10)).await;
1358+
let act = core.poll_activity_task().await.unwrap();
1359+
assert_matches!(
1360+
&act,
1361+
ActivityTask {
1362+
task_token,
1363+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1364+
} if
1365+
task_token == &vec![2] &&
1366+
*reason == ActivityCancelReason::Cancelled as i32 &&
1367+
details.as_ref().is_some_and(|d| !d.is_paused) &&
1368+
details.as_ref().is_some_and(|d| d.is_cancelled)
1369+
);
1370+
core.complete_activity_task(ActivityTaskCompletion {
1371+
task_token: act.task_token,
1372+
result: Some(ActivityExecutionResult::cancel_from_details(None)),
1373+
})
1374+
.await
1375+
.unwrap();
1376+
1377+
// Test both pause and cancel (should prioritize cancel)
1378+
let act = core.poll_activity_task().await.unwrap();
1379+
core.record_activity_heartbeat(ActivityHeartbeat {
1380+
task_token: act.task_token.clone(),
1381+
details: vec![vec![1_u8, 2, 3].into()],
1382+
});
1383+
sleep(Duration::from_millis(10)).await;
1384+
let act = core.poll_activity_task().await.unwrap();
1385+
assert_matches!(
1386+
&act,
1387+
ActivityTask {
1388+
task_token,
1389+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1390+
} if
1391+
task_token == &vec![3] &&
1392+
*reason == ActivityCancelReason::Cancelled as i32 &&
1393+
details.as_ref().is_some_and(|d| d.is_paused) &&
1394+
details.as_ref().is_some_and(|d| d.is_cancelled)
1395+
);
1396+
core.complete_activity_task(ActivityTaskCompletion {
1397+
task_token: act.task_token,
1398+
result: Some(ActivityExecutionResult::cancel_from_details(None)),
1399+
})
1400+
.await
1401+
.unwrap();
1402+
1403+
core.drain_activity_poller_and_shutdown().await;
1404+
}

core/src/worker/activities.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use temporal_sdk_core_protos::{
4040
coresdk::{
4141
ActivityHeartbeat, ActivitySlotInfo,
4242
activity_result::{self as ar, activity_execution_result as aer},
43-
activity_task::{ActivityCancelReason, ActivityTask},
43+
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
4444
},
4545
temporal::api::{
4646
failure::v1::{ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo},
@@ -65,16 +65,19 @@ type OutstandingActMap = Arc<DashMap<TaskToken, RemoteInFlightActInfo>>;
6565
struct PendingActivityCancel {
6666
task_token: TaskToken,
6767
reason: ActivityCancelReason,
68-
/// Set true if we should assume the server has already forgotten about this activity
69-
consider_not_found: bool,
68+
details: ActivityCancellationDetails,
7069
}
7170

7271
impl PendingActivityCancel {
73-
fn new(task_token: TaskToken, reason: ActivityCancelReason) -> Self {
72+
fn new(
73+
task_token: TaskToken,
74+
reason: ActivityCancelReason,
75+
details: ActivityCancellationDetails,
76+
) -> Self {
7477
Self {
7578
task_token,
7679
reason,
77-
consider_not_found: false,
80+
details,
7881
}
7982
}
8083
}
@@ -508,13 +511,14 @@ where
508511
} else {
509512
details.issued_cancel_to_lang = Some(next_pc.reason);
510513
if next_pc.reason == ActivityCancelReason::NotFound
511-
|| next_pc.consider_not_found
514+
|| next_pc.details.is_not_found
512515
{
513516
details.known_not_found = true;
514517
}
515518
Some(Ok(ActivityTask::cancel_from_ids(
516519
next_pc.task_token.0,
517520
next_pc.reason,
521+
next_pc.details,
518522
)))
519523
}
520524
} else {
@@ -566,6 +570,9 @@ where
566570
let _ = cancels_tx.send(PendingActivityCancel::new(
567571
tt,
568572
ActivityCancelReason::WorkerShutdown,
573+
ActivityTask::primary_reason_to_cancellation_details(
574+
ActivityCancelReason::WorkerShutdown,
575+
),
569576
));
570577
} else {
571578
// Fire off task to keep track of local timeouts. We do this so that
@@ -611,11 +618,15 @@ where
611618
"Timing out activity due to elapsed local \
612619
{timeout_type} timer"
613620
);
614-
let _ = cancel_tx.send(PendingActivityCancel {
615-
task_token: tt,
616-
reason: ActivityCancelReason::TimedOut,
617-
consider_not_found: true,
618-
});
621+
let _ = cancel_tx.send(PendingActivityCancel::new(
622+
tt,
623+
ActivityCancelReason::TimedOut,
624+
ActivityCancellationDetails {
625+
is_not_found: true,
626+
is_timed_out: true,
627+
..Default::default()
628+
},
629+
));
619630
}));
620631
outstanding_info.timeout_resetter = resetter;
621632
}
@@ -639,6 +650,9 @@ where
639650
let _ = self.cancels_tx.send(PendingActivityCancel::new(
640651
mapref.key().clone(),
641652
ActivityCancelReason::WorkerShutdown,
653+
ActivityTask::primary_reason_to_cancellation_details(
654+
ActivityCancelReason::WorkerShutdown,
655+
),
642656
));
643657
}
644658
}

core/src/worker/activities/activity_heartbeat_manager.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use std::{
1010
time::{Duration, Instant},
1111
};
1212
use temporal_sdk_core_protos::{
13-
coresdk::{ActivityHeartbeat, IntoPayloadsExt, activity_task::ActivityCancelReason},
13+
coresdk::{
14+
ActivityHeartbeat, IntoPayloadsExt,
15+
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
16+
},
1417
temporal::api::{
1518
common::v1::Payload, workflowservice::v1::RecordActivityTaskHeartbeatResponse,
1619
},
@@ -142,12 +145,23 @@ impl ActivityHeartbeatManager {
142145
.record_activity_heartbeat(tt.clone(), details.into_payloads())
143146
.await
144147
{
145-
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused: _ }) => {
146-
if cancel_requested {
148+
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused }) => {
149+
if cancel_requested || activity_paused {
150+
// Prioritize Cancel over Pause
151+
let reason = if cancel_requested {
152+
ActivityCancelReason::Cancelled
153+
} else {
154+
ActivityCancelReason::Paused
155+
};
147156
cancels_tx
148157
.send(PendingActivityCancel::new(
149158
tt.clone(),
150-
ActivityCancelReason::Cancelled,
159+
reason,
160+
ActivityCancellationDetails {
161+
is_cancelled: cancel_requested,
162+
is_paused: activity_paused,
163+
..Default::default()
164+
}
151165
))
152166
.expect(
153167
"Receive half of heartbeat cancels not blocked",
@@ -164,6 +178,7 @@ impl ActivityHeartbeatManager {
164178
.send(PendingActivityCancel::new(
165179
tt.clone(),
166180
ActivityCancelReason::NotFound,
181+
ActivityTask::primary_reason_to_cancellation_details(ActivityCancelReason::NotFound)
167182
))
168183
.expect("Receive half of heartbeat cancels not blocked");
169184
}

core/src/worker/activities/local_activities.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use temporal_sdk_core_protos::{
2222
coresdk::{
2323
LocalActivitySlotInfo,
2424
activity_result::{Cancellation, Failure as ActFail, Success},
25-
activity_task::{ActivityCancelReason, ActivityTask, Cancel, Start, activity_task},
25+
activity_task::{ActivityCancelReason, ActivityTask, Start, activity_task},
2626
},
2727
temporal::api::{
2828
common::v1::WorkflowExecution,
@@ -629,12 +629,13 @@ impl LocalActivityManager {
629629
};
630630
// We want to generate a cancel task if the reason for failure was a timeout.
631631
let task = if is_timeout {
632-
Some(ActivityTask {
633-
task_token: task_token.clone().0,
634-
variant: Some(activity_task::Variant::Cancel(Cancel {
635-
reason: ActivityCancelReason::TimedOut as i32,
636-
})),
637-
})
632+
Some(ActivityTask::cancel_from_ids(
633+
task_token.clone().0,
634+
ActivityCancelReason::TimedOut,
635+
ActivityTask::primary_reason_to_cancellation_details(
636+
ActivityCancelReason::TimedOut,
637+
),
638+
))
638639
} else {
639640
None
640641
};
@@ -786,12 +787,13 @@ impl LocalActivityManager {
786787
}
787788

788789
self.cancels_req_tx
789-
.send(CancelOrTimeout::Cancel(ActivityTask {
790-
task_token: lai.task_token.0.clone(),
791-
variant: Some(activity_task::Variant::Cancel(Cancel {
792-
reason: ActivityCancelReason::Cancelled as i32,
793-
})),
794-
}))
790+
.send(CancelOrTimeout::Cancel(ActivityTask::cancel_from_ids(
791+
lai.task_token.0.clone(),
792+
ActivityCancelReason::Cancelled,
793+
ActivityTask::primary_reason_to_cancellation_details(
794+
ActivityCancelReason::Cancelled,
795+
),
796+
)))
795797
.expect("Receive half of LA cancel channel cannot be dropped");
796798
None
797799
}

sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,18 @@ message Start {
6767

6868
// Attempt to cancel a running activity
6969
message Cancel {
70+
// Primary cancellation reason
7071
ActivityCancelReason reason = 1;
72+
// Activity cancellation details, surfaces all cancellation reasons.
73+
ActivityCancellationDetails details = 2;
74+
}
75+
76+
message ActivityCancellationDetails {
77+
bool is_not_found = 1;
78+
bool is_cancelled = 2;
79+
bool is_paused = 3;
80+
bool is_timed_out = 4;
81+
bool is_worker_shutdown = 5;
7182
}
7283

7384
enum ActivityCancelReason {
@@ -79,6 +90,8 @@ enum ActivityCancelReason {
7990
TIMED_OUT = 2;
8091
// Core is shutting down and the graceful timeout has elapsed
8192
WORKER_SHUTDOWN = 3;
93+
// Activity was paused
94+
PAUSED = 4;
8295
}
8396

8497

0 commit comments

Comments
 (0)