Skip to content

Commit 199880d

Browse files
authored
Nexus cancellation types: handle NexusOperationCancelRequestCompleted, NexusOperationCancelRequestFailed (#977)
* Handle NexusOperationCancelRequest{Completed, Failed} These events are written by the caller server on receiving a response from the nexus cancel handler. The only cancellation type that is sensitive to them is WaitRequested: when that is in effect, these events cause an activation job to be emitted that resolves the nexus operation future. The fsm macro is modified to allow the same union type to be constructed twice, since Started -> Started|Cancelled is now caused by two different inputs.
1 parent 871b320 commit 199880d

File tree

4 files changed

+187
-17
lines changed

4 files changed

+187
-17
lines changed

core/src/worker/workflow/machines/nexus_operation_state_machine.rs

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use temporal_sdk_core_protos::{
2121
enums::v1::{CommandType, EventType},
2222
failure::v1::{self as failure, Failure, failure::FailureInfo},
2323
history::v1::{
24+
NexusOperationCancelRequestCompletedEventAttributes,
25+
NexusOperationCancelRequestFailedEventAttributes,
2426
NexusOperationCanceledEventAttributes, NexusOperationCompletedEventAttributes,
2527
NexusOperationFailedEventAttributes, NexusOperationStartedEventAttributes,
2628
NexusOperationTimedOutEventAttributes, history_event,
@@ -63,6 +65,12 @@ fsm! {
6365
--(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed;
6466
Started
6567
--(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled;
68+
69+
70+
Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Started;
71+
Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Cancelled;
72+
Started --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes), shared on_cancel_request_failed)--> Started;
73+
6674
Started
6775
--(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut;
6876

@@ -72,12 +80,20 @@ fsm! {
7280
Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled;
7381
Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled;
7482
Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled;
83+
Cancelled --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), on_cancel_request_completed)--> Cancelled;
84+
Cancelled --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Cancelled;
7585
Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled;
7686

7787
// Ignore cancels in all terminal states
7888
Completed --(Cancel)--> Completed;
7989
Failed --(Cancel)--> Failed;
8090
TimedOut --(Cancel)--> TimedOut;
91+
Completed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Completed;
92+
Failed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Failed;
93+
TimedOut --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> TimedOut;
94+
Completed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Completed;
95+
Failed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Failed;
96+
TimedOut --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> TimedOut;
8197
}
8298

8399
#[derive(Debug, derive_more::Display)]
@@ -311,6 +327,45 @@ impl Started {
311327
)])
312328
}
313329

330+
pub(super) fn on_cancel_request_completed(
331+
self,
332+
ss: &mut SharedState,
333+
_: NexusOperationCancelRequestCompletedEventAttributes,
334+
) -> NexusOperationMachineTransition<StartedOrCancelled> {
335+
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
336+
TransitionResult::ok(
337+
[NexusOperationCommand::Cancel(ss.cancelled_failure(
338+
"Nexus operation cancellation request completed".to_owned(),
339+
))],
340+
StartedOrCancelled::Cancelled(Default::default()),
341+
)
342+
} else {
343+
TransitionResult::ok([], StartedOrCancelled::Started(Default::default()))
344+
}
345+
}
346+
347+
pub(super) fn on_cancel_request_failed(
348+
self,
349+
ss: &mut SharedState,
350+
fa: NexusOperationCancelRequestFailedEventAttributes,
351+
) -> NexusOperationMachineTransition<Started> {
352+
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
353+
let message = "Nexus operation cancellation request failed".to_string();
354+
TransitionResult::ok(
355+
[NexusOperationCommand::Fail(ss.failure(
356+
message.clone(),
357+
fa.failure.unwrap_or_else(|| Failure {
358+
message,
359+
..Default::default()
360+
}),
361+
))],
362+
self,
363+
)
364+
} else {
365+
TransitionResult::ok([], self)
366+
}
367+
}
368+
314369
pub(super) fn on_timed_out(
315370
self,
316371
toa: NexusOperationTimedOutEventAttributes,
@@ -356,6 +411,13 @@ impl Cancelled {
356411
NexusOperationMachineTransition::ok([], self)
357412
}
358413

414+
pub(super) fn on_cancel_request_completed(
415+
self,
416+
_: NexusOperationCancelRequestCompletedEventAttributes,
417+
) -> NexusOperationMachineTransition<Cancelled> {
418+
NexusOperationMachineTransition::ok([], self)
419+
}
420+
359421
pub(super) fn on_failed(
360422
self,
361423
ss: &mut SharedState,
@@ -454,6 +516,36 @@ impl TryFrom<HistEventData> for NexusOperationMachineEvents {
454516
}
455517
}
456518
Ok(EventType::NexusOperationCancelRequested) => Self::NexusOperationCancelRequested,
519+
Ok(EventType::NexusOperationCancelRequestCompleted) => {
520+
if let Some(
521+
history_event::Attributes::NexusOperationCancelRequestCompletedEventAttributes(
522+
attrs,
523+
),
524+
) = e.attributes
525+
{
526+
Self::NexusOperationCancelRequestCompleted(attrs)
527+
} else {
528+
return Err(WFMachinesError::Nondeterminism(
529+
"NexusOperationCancelRequestCompleted attributes were unset or malformed"
530+
.to_string(),
531+
));
532+
}
533+
}
534+
Ok(EventType::NexusOperationCancelRequestFailed) => {
535+
if let Some(
536+
history_event::Attributes::NexusOperationCancelRequestFailedEventAttributes(
537+
attrs,
538+
),
539+
) = e.attributes
540+
{
541+
Self::NexusOperationCancelRequestFailed(attrs)
542+
} else {
543+
return Err(WFMachinesError::Nondeterminism(
544+
"NexusOperationCancelRequestFailed attributes were unset or malformed"
545+
.to_string(),
546+
));
547+
}
548+
}
457549
_ => {
458550
return Err(WFMachinesError::Nondeterminism(format!(
459551
"Nexus operation machine does not handle this event: {e:?}"
@@ -599,12 +691,19 @@ impl TryFrom<CommandType> for NexusOperationMachineEvents {
599691

600692
impl SharedState {
601693
fn cancelled_failure(&self, message: String) -> Failure {
602-
Failure {
694+
self.failure(
603695
message,
604-
cause: Some(Box::new(Failure {
696+
Failure {
605697
failure_info: Some(FailureInfo::CanceledFailureInfo(Default::default())),
606698
..Default::default()
607-
})),
699+
},
700+
)
701+
}
702+
703+
fn failure(&self, message: String, cause: Failure) -> Failure {
704+
Failure {
705+
message,
706+
cause: Some(Box::new(cause)),
608707
failure_info: Some(FailureInfo::NexusOperationExecutionFailureInfo(
609708
failure::NexusOperationFailureInfo {
610709
scheduled_event_id: self.scheduled_event_id,

fsm/rustfsm_procmacro/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ impl StateMachineDefinition {
434434
}
435435
};
436436
let mut multi_dest_enums = vec![];
437+
let mut multi_dest_enum_names = HashSet::new();
437438
let state_branches: Vec<_> = statemap.into_iter().map(|(from, transitions)| {
438439
let occupied_current_state = quote! { Some(#state_enum_name::#from(state_data)) };
439440
// Merge transition dest states with the same handler
@@ -468,7 +469,11 @@ impl StateMachineDefinition {
468469
}
469470
}
470471
};
471-
multi_dest_enums.push(multi_dest_enum);
472+
// Deduplicate; two different events may each result in a transition
473+
// set with the same set of dest states
474+
if multi_dest_enum_names.insert(enum_ident.clone()) {
475+
multi_dest_enums.push(multi_dest_enum);
476+
}
472477
quote! {
473478
#transition_result_name<#enum_ident>
474479
}

tests/integ_tests/workflow_tests/nexus.rs

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use tokio::{
2929
join,
3030
sync::{mpsc, watch},
3131
};
32+
use tokio_stream::StreamExt;
3233

3334
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
3435
enum Outcome {
@@ -557,10 +558,10 @@ async fn nexus_cancellation_types(
557558
let endpoint = mk_nexus_endpoint(&mut starter).await;
558559
let schedule_to_close_timeout = Some(Duration::from_secs(5));
559560

560-
let (cancel_call_completion_tx, cancel_call_completion_rx) = watch::channel(false);
561+
let (caller_op_future_tx, caller_op_future_rx) = watch::channel(false);
561562
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
562563
let endpoint = endpoint.clone();
563-
let cancel_call_completion_tx = cancel_call_completion_tx.clone();
564+
let caller_op_future_tx = caller_op_future_tx.clone();
564565
async move {
565566
let options = NexusOperationOptions {
566567
endpoint,
@@ -577,42 +578,60 @@ async fn nexus_cancellation_types(
577578
started.cancel(&ctx);
578579

579580
let res = result.await;
580-
cancel_call_completion_tx.send(true).unwrap();
581+
caller_op_future_tx.send(true).unwrap();
581582

582-
// Make sure cancel after completion doesn't cause problems
583+
// Make sure cancel after op completion doesn't cause problems
583584
started.cancel(&ctx);
584585

585586
// We need to wait slightly so that the workflow is not complete at the same time
586-
// cancellation is invoked. If it does, the caller workflow will close and the server won't attempt to send the cancellation to the handler
587+
// cancellation is invoked. If it does, the caller workflow will close and the server
588+
// won't attempt to send the cancellation to the handler
587589
ctx.timer(Duration::from_millis(1)).await;
588590
Ok(res.into())
589591
}
590592
});
591593

592594
let (cancellation_wait_tx, cancellation_wait_rx) = watch::channel(false);
593595
let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
596+
let (handler_exited_tx, mut handler_exited_rx) = watch::channel(false);
594597
worker.register_wf("async_completer".to_owned(), move |ctx: WfContext| {
595598
let cancellation_tx = cancellation_tx.clone();
596599
let mut cancellation_wait_rx = cancellation_wait_rx.clone();
600+
let handler_exited_tx = handler_exited_tx.clone();
597601
async move {
602+
// Wait for cancellation
598603
ctx.cancelled().await;
599604
cancellation_tx.send(true).unwrap();
605+
600606
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
601607
cancellation_wait_rx.changed().await.unwrap();
608+
} else if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested
609+
{
610+
// For WAIT_REQUESTED, wait until the caller nexus op future has been resolved. This
611+
// allows the test to verify that it resolved due to
612+
// NexusOperationCancelRequestCompleted (written after cancel handler responds)
613+
// rather than NexusOperationCanceled (written after handler workflow completes as
614+
// cancelled).
615+
let mut signal_chan = ctx.make_signal_channel("proceed-to-exit");
616+
signal_chan.next().await;
602617
}
618+
619+
handler_exited_tx.send(true).unwrap();
603620
Ok(WfExitValue::<()>::Cancelled)
604621
}
605622
});
606623
let submitter = worker.get_submitter_handle();
607624
let wf_handle = starter.start_with_worker(wf_name, &mut worker).await;
608625
let client = starter.get_client().await.get_client().clone();
626+
let (handler_wf_id_tx, mut handler_wf_id_rx) = tokio::sync::oneshot::channel();
609627
let nexus_task_handle = async {
610628
let nt = core_worker.poll_nexus_task().await.unwrap().unwrap_task();
611629
let start_req = assert_matches!(
612630
nt.request.unwrap().variant.unwrap(),
613631
request::Variant::StartOperation(sr) => sr
614632
);
615633
let completer_id = format!("completer-{}", rand_6_chars());
634+
let _ = handler_wf_id_tx.send(completer_id.clone());
616635
let links = start_req
617636
.links
618637
.iter()
@@ -668,16 +687,19 @@ async fn nexus_cancellation_types(
668687
match cancellation_type {
669688
NexusOperationCancellationType::WaitCancellationCompleted
670689
| NexusOperationCancellationType::WaitCancellationRequested => {
671-
assert!(!*cancel_call_completion_rx.borrow());
690+
// The nexus op future should not have been resolved
691+
assert!(!*caller_op_future_rx.borrow());
672692
}
673693
NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel => {
674694
wf_handle
675695
.get_workflow_result(Default::default())
676696
.await
677697
.unwrap();
678-
assert!(*cancel_call_completion_rx.borrow())
698+
// The nexus op future should have been resolved
699+
assert!(*caller_op_future_rx.borrow())
679700
}
680701
}
702+
let (cancel_handler_responded_tx, _cancel_handler_responded_rx) = watch::channel(false);
681703
if cancellation_type != NexusOperationCancellationType::Abandon {
682704
let nt = core_worker.poll_nexus_task().await.unwrap();
683705
let nt = nt.unwrap_task();
@@ -702,19 +724,20 @@ async fn nexus_cancellation_types(
702724
})
703725
.await
704726
.unwrap();
727+
// Mark that the cancel handler has responded
728+
cancel_handler_responded_tx.send(true).unwrap();
705729
}
706730

707-
// Confirm the caller WF has not completed even after the handling of the cancel request
731+
// Check that the nexus op future resolves only _after_ the handler WF completes
708732
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
709-
assert!(!*cancel_call_completion_rx.borrow());
733+
assert!(!*caller_op_future_rx.borrow());
710734

711-
// It only completes after the handler WF terminates
712735
cancellation_wait_tx.send(true).unwrap();
713736
wf_handle
714737
.get_workflow_result(Default::default())
715738
.await
716739
.unwrap();
717-
assert!(*cancel_call_completion_rx.borrow());
740+
assert!(*caller_op_future_rx.borrow());
718741
}
719742

720743
assert_matches!(
@@ -724,6 +747,38 @@ async fn nexus_cancellation_types(
724747
};
725748

726749
let shutdown_handle = worker.inner_mut().shutdown_handle();
750+
751+
let check_caller_op_future_resolved_then_allow_handler_to_complete = async {
752+
// The caller nexus op future has been resolved
753+
assert!(*caller_op_future_rx.borrow());
754+
755+
// Verify the handler workflow has not exited yet. This proves that the caller op future
756+
// was resolved as a result of NexusOperationCancelRequestCompleted (written after cancel
757+
// handler responds), as opposed to NexusOperationCanceled (written after handler workflow
758+
// exits).
759+
assert!(
760+
!*handler_exited_rx.borrow(),
761+
"Handler should not have exited yet"
762+
);
763+
764+
let handler_wf_id = handler_wf_id_rx
765+
.try_recv()
766+
.expect("Should have received handler workflow ID");
767+
client
768+
.signal_workflow_execution(
769+
handler_wf_id,
770+
"".to_string(),
771+
"proceed-to-exit".to_string(),
772+
None,
773+
None,
774+
)
775+
.await
776+
.unwrap();
777+
778+
handler_exited_rx.changed().await.unwrap();
779+
assert!(*handler_exited_rx.borrow());
780+
};
781+
727782
join!(
728783
nexus_task_handle,
729784
async { worker.inner_mut().run().await.unwrap() },
@@ -735,6 +790,9 @@ async fn nexus_cancellation_types(
735790
if cancellation_type == NexusOperationCancellationType::TryCancel {
736791
cancellation_rx.changed().await.unwrap();
737792
}
793+
if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested {
794+
check_caller_op_future_resolved_then_allow_handler_to_complete.await;
795+
}
738796
shutdown_handle();
739797
}
740798
);
@@ -765,8 +823,14 @@ async fn nexus_cancellation_types(
765823
);
766824
assert_eq!(f.message, "Nexus operation cancelled after starting");
767825
}
768-
NexusOperationCancellationType::WaitCancellationRequested
769-
| NexusOperationCancellationType::WaitCancellationCompleted => {
826+
NexusOperationCancellationType::WaitCancellationRequested => {
827+
let f = assert_matches!(
828+
res.status,
829+
Some(nexus_operation_result::Status::Cancelled(f)) => f
830+
);
831+
assert_eq!(f.message, "Nexus operation cancellation request completed");
832+
}
833+
NexusOperationCancellationType::WaitCancellationCompleted => {
770834
let f = assert_matches!(
771835
res.status,
772836
Some(nexus_operation_result::Status::Cancelled(f)) => f

tests/runner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ async fn main() -> Result<(), anyhow::Error> {
107107
"frontend.workerVersioningDataAPIs=true".to_owned(),
108108
"--dynamic-config-value".to_owned(),
109109
"system.enableDeploymentVersions=true".to_owned(),
110+
"--dynamic-config-value".to_owned(),
111+
"component.nexusoperations.recordCancelRequestCompletionEvents=true".to_owned(),
110112
"--http-port".to_string(),
111113
"7243".to_string(),
112114
"--search-attribute".to_string(),

0 commit comments

Comments
 (0)