Skip to content

Commit 6e16641

Browse files
committed
Handle NexusOperationCancelRequest{Completed, Failed}
These events are written by the caller server on receiving a response from the nexus cancel handler. The only cancellation type that is sensitive to them is WaitRequested: when that is in effect, these events cause an activation job to be emitted that resolves the nexus operation future. The fsm macro is modified to allow the same union type to be constructed twice, since Started -> Started|Cancelled is now caused by two different inputs.
1 parent 45687c3 commit 6e16641

File tree

4 files changed

+112
-6
lines changed

4 files changed

+112
-6
lines changed

core/src/worker/workflow/machines/nexus_operation_state_machine.rs

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use temporal_sdk_core_protos::{
2121
enums::v1::{CommandType, EventType},
2222
failure::v1::{self as failure, Failure, failure::FailureInfo},
2323
history::v1::{
24+
NexusOperationCancelRequestCompletedEventAttributes,
25+
NexusOperationCancelRequestFailedEventAttributes,
2426
NexusOperationCanceledEventAttributes, NexusOperationCompletedEventAttributes,
2527
NexusOperationFailedEventAttributes, NexusOperationStartedEventAttributes,
2628
NexusOperationTimedOutEventAttributes, history_event,
@@ -63,6 +65,12 @@ fsm! {
6365
--(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed;
6466
Started
6567
--(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled;
68+
69+
70+
Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Started;
71+
Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Cancelled;
72+
Started --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes), shared on_cancel_request_failed)--> Started;
73+
6674
Started
6775
--(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut;
6876

@@ -72,6 +80,8 @@ fsm! {
7280
Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled;
7381
Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled;
7482
Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled;
83+
Cancelled --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), on_cancel_request_completed)--> Cancelled;
84+
Cancelled --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Cancelled;
7585
Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled;
7686

7787
// Ignore cancels in all terminal states
@@ -311,6 +321,45 @@ impl Started {
311321
)])
312322
}
313323

324+
pub(super) fn on_cancel_request_completed(
325+
self,
326+
ss: &SharedState,
327+
_: NexusOperationCancelRequestCompletedEventAttributes,
328+
) -> NexusOperationMachineTransition<StartedOrCancelled> {
329+
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
330+
TransitionResult::ok(
331+
[NexusOperationCommand::Cancel(ss.cancelled_failure(
332+
"Nexus operation cancellation request completed".to_owned(),
333+
))],
334+
StartedOrCancelled::Cancelled(Default::default()),
335+
)
336+
} else {
337+
TransitionResult::ok([], StartedOrCancelled::Started(Default::default()))
338+
}
339+
}
340+
341+
pub(super) fn on_cancel_request_failed(
342+
self,
343+
ss: &SharedState,
344+
fa: NexusOperationCancelRequestFailedEventAttributes,
345+
) -> NexusOperationMachineTransition<Started> {
346+
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
347+
let message = "Nexus operation cancellation request failed".to_string();
348+
TransitionResult::ok(
349+
[NexusOperationCommand::Fail(ss.failure(
350+
message.clone(),
351+
fa.failure.unwrap_or_else(|| Failure {
352+
message,
353+
..Default::default()
354+
}),
355+
))],
356+
self,
357+
)
358+
} else {
359+
TransitionResult::ok([], self)
360+
}
361+
}
362+
314363
pub(super) fn on_timed_out(
315364
self,
316365
toa: NexusOperationTimedOutEventAttributes,
@@ -356,6 +405,13 @@ impl Cancelled {
356405
NexusOperationMachineTransition::ok([], self)
357406
}
358407

408+
pub(super) fn on_cancel_request_completed(
409+
self,
410+
_: NexusOperationCancelRequestCompletedEventAttributes,
411+
) -> NexusOperationMachineTransition<Cancelled> {
412+
NexusOperationMachineTransition::ok([], self)
413+
}
414+
359415
pub(super) fn on_failed(
360416
self,
361417
ss: &mut SharedState,
@@ -454,6 +510,36 @@ impl TryFrom<HistEventData> for NexusOperationMachineEvents {
454510
}
455511
}
456512
Ok(EventType::NexusOperationCancelRequested) => Self::NexusOperationCancelRequested,
513+
Ok(EventType::NexusOperationCancelRequestCompleted) => {
514+
if let Some(
515+
history_event::Attributes::NexusOperationCancelRequestCompletedEventAttributes(
516+
attrs,
517+
),
518+
) = e.attributes
519+
{
520+
Self::NexusOperationCancelRequestCompleted(attrs)
521+
} else {
522+
return Err(WFMachinesError::Nondeterminism(
523+
"NexusOperationCancelRequestCompleted attributes were unset or malformed"
524+
.to_string(),
525+
));
526+
}
527+
}
528+
Ok(EventType::NexusOperationCancelRequestFailed) => {
529+
if let Some(
530+
history_event::Attributes::NexusOperationCancelRequestFailedEventAttributes(
531+
attrs,
532+
),
533+
) = e.attributes
534+
{
535+
Self::NexusOperationCancelRequestFailed(attrs)
536+
} else {
537+
return Err(WFMachinesError::Nondeterminism(
538+
"NexusOperationCancelRequestFailed attributes were unset or malformed"
539+
.to_string(),
540+
));
541+
}
542+
}
457543
_ => {
458544
return Err(WFMachinesError::Nondeterminism(format!(
459545
"Nexus operation machine does not handle this event: {e:?}"
@@ -599,12 +685,19 @@ impl TryFrom<CommandType> for NexusOperationMachineEvents {
599685

600686
impl SharedState {
601687
fn cancelled_failure(&self, message: String) -> Failure {
602-
Failure {
688+
self.failure(
603689
message,
604-
cause: Some(Box::new(Failure {
690+
Failure {
605691
failure_info: Some(FailureInfo::CanceledFailureInfo(Default::default())),
606692
..Default::default()
607-
})),
693+
},
694+
)
695+
}
696+
697+
fn failure(&self, message: String, cause: Failure) -> Failure {
698+
Failure {
699+
message,
700+
cause: Some(Box::new(cause)),
608701
failure_info: Some(FailureInfo::NexusOperationExecutionFailureInfo(
609702
failure::NexusOperationFailureInfo {
610703
scheduled_event_id: self.scheduled_event_id,

fsm/rustfsm_procmacro/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ impl StateMachineDefinition {
434434
}
435435
};
436436
let mut multi_dest_enums = vec![];
437+
let mut multi_dest_enum_names = HashSet::new();
437438
let state_branches: Vec<_> = statemap.into_iter().map(|(from, transitions)| {
438439
let occupied_current_state = quote! { Some(#state_enum_name::#from(state_data)) };
439440
// Merge transition dest states with the same handler
@@ -468,7 +469,11 @@ impl StateMachineDefinition {
468469
}
469470
}
470471
};
471-
multi_dest_enums.push(multi_dest_enum);
472+
// Deduplicate; two different events may each result in a transition
473+
// set with the same set of dest states
474+
if multi_dest_enum_names.insert(enum_ident.clone()) {
475+
multi_dest_enums.push(multi_dest_enum);
476+
}
472477
quote! {
473478
#transition_result_name<#enum_ident>
474479
}

tests/integ_tests/workflow_tests/nexus.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,8 +765,14 @@ async fn nexus_cancellation_types(
765765
);
766766
assert_eq!(f.message, "Nexus operation cancelled after starting");
767767
}
768-
NexusOperationCancellationType::WaitCancellationRequested
769-
| NexusOperationCancellationType::WaitCancellationCompleted => {
768+
NexusOperationCancellationType::WaitCancellationRequested => {
769+
let f = assert_matches!(
770+
res.status,
771+
Some(nexus_operation_result::Status::Cancelled(f)) => f
772+
);
773+
assert_eq!(f.message, "Nexus operation cancellation request completed");
774+
}
775+
NexusOperationCancellationType::WaitCancellationCompleted => {
770776
let f = assert_matches!(
771777
res.status,
772778
Some(nexus_operation_result::Status::Cancelled(f)) => f

tests/runner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ async fn main() -> Result<(), anyhow::Error> {
107107
"frontend.workerVersioningDataAPIs=true".to_owned(),
108108
"--dynamic-config-value".to_owned(),
109109
"system.enableDeploymentVersions=true".to_owned(),
110+
"--dynamic-config-value".to_owned(),
111+
"component.nexusoperations.recordCancelRequestCompletionEvents=true".to_owned(),
110112
"--http-port".to_string(),
111113
"7243".to_string(),
112114
"--search-attribute".to_string(),

0 commit comments

Comments
 (0)