Skip to content

Commit c98db3a

Browse files
committed
Improve test
1 parent 6936e36 commit c98db3a

File tree

1 file changed

+69
-11
lines changed
  • tests/integ_tests/workflow_tests

1 file changed

+69
-11
lines changed

tests/integ_tests/workflow_tests/nexus.rs

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use tokio::{
2929
join,
3030
sync::{mpsc, watch},
3131
};
32+
use tokio_stream::StreamExt;
3233

3334
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
3435
enum Outcome {
@@ -557,10 +558,10 @@ async fn nexus_cancellation_types(
557558
let endpoint = mk_nexus_endpoint(&mut starter).await;
558559
let schedule_to_close_timeout = Some(Duration::from_secs(5));
559560

560-
let (cancel_call_completion_tx, cancel_call_completion_rx) = watch::channel(false);
561+
let (caller_op_future_tx, caller_op_future_rx) = watch::channel(false);
561562
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
562563
let endpoint = endpoint.clone();
563-
let cancel_call_completion_tx = cancel_call_completion_tx.clone();
564+
let caller_op_future_tx = caller_op_future_tx.clone();
564565
async move {
565566
let options = NexusOperationOptions {
566567
endpoint,
@@ -577,42 +578,60 @@ async fn nexus_cancellation_types(
577578
started.cancel(&ctx);
578579

579580
let res = result.await;
580-
cancel_call_completion_tx.send(true).unwrap();
581+
caller_op_future_tx.send(true).unwrap();
581582

582-
// Make sure cancel after completion doesn't cause problems
583+
// Make sure cancel after op completion doesn't cause problems
583584
started.cancel(&ctx);
584585

585586
// We need to wait slightly so that the workflow is not complete at the same time
586-
// cancellation is invoked. If it does, the caller workflow will close and the server won't attempt to send the cancellation to the handler
587+
// cancellation is invoked. If it does, the caller workflow will close and the server
588+
// won't attempt to send the cancellation to the handler
587589
ctx.timer(Duration::from_millis(1)).await;
588590
Ok(res.into())
589591
}
590592
});
591593

592594
let (cancellation_wait_tx, cancellation_wait_rx) = watch::channel(false);
593595
let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
596+
let (handler_exited_tx, mut handler_exited_rx) = watch::channel(false);
594597
worker.register_wf("async_completer".to_owned(), move |ctx: WfContext| {
595598
let cancellation_tx = cancellation_tx.clone();
596599
let mut cancellation_wait_rx = cancellation_wait_rx.clone();
600+
let handler_exited_tx = handler_exited_tx.clone();
597601
async move {
602+
// Wait for cancellation
598603
ctx.cancelled().await;
599604
cancellation_tx.send(true).unwrap();
605+
600606
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
601607
cancellation_wait_rx.changed().await.unwrap();
608+
} else if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested
609+
{
610+
// For WAIT_REQUESTED, wait until the caller nexus op future has been resolved. This
611+
// allows the test to verify that it resolved due to
612+
// NexusOperationCancelRequestCompleted (written after cancel handler responds)
613+
// rather than NexusOperationCanceled (written after handler workflow completes as
614+
// cancelled).
615+
let mut signal_chan = ctx.make_signal_channel("proceed-to-exit");
616+
signal_chan.next().await;
602617
}
618+
619+
handler_exited_tx.send(true).unwrap();
603620
Ok(WfExitValue::<()>::Cancelled)
604621
}
605622
});
606623
let submitter = worker.get_submitter_handle();
607624
let wf_handle = starter.start_with_worker(wf_name, &mut worker).await;
608625
let client = starter.get_client().await.get_client().clone();
626+
let (handler_wf_id_tx, mut handler_wf_id_rx) = tokio::sync::oneshot::channel();
609627
let nexus_task_handle = async {
610628
let nt = core_worker.poll_nexus_task().await.unwrap().unwrap_task();
611629
let start_req = assert_matches!(
612630
nt.request.unwrap().variant.unwrap(),
613631
request::Variant::StartOperation(sr) => sr
614632
);
615633
let completer_id = format!("completer-{}", rand_6_chars());
634+
let _ = handler_wf_id_tx.send(completer_id.clone());
616635
let links = start_req
617636
.links
618637
.iter()
@@ -668,16 +687,19 @@ async fn nexus_cancellation_types(
668687
match cancellation_type {
669688
NexusOperationCancellationType::WaitCancellationCompleted
670689
| NexusOperationCancellationType::WaitCancellationRequested => {
671-
assert!(!*cancel_call_completion_rx.borrow());
690+
// The nexus op future should not have been resolved
691+
assert!(!*caller_op_future_rx.borrow());
672692
}
673693
NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel => {
674694
wf_handle
675695
.get_workflow_result(Default::default())
676696
.await
677697
.unwrap();
678-
assert!(*cancel_call_completion_rx.borrow())
698+
// The nexus op future should have been resolved
699+
assert!(*caller_op_future_rx.borrow())
679700
}
680701
}
702+
let (cancel_handler_responded_tx, _cancel_handler_responded_rx) = watch::channel(false);
681703
if cancellation_type != NexusOperationCancellationType::Abandon {
682704
let nt = core_worker.poll_nexus_task().await.unwrap();
683705
let nt = nt.unwrap_task();
@@ -702,19 +724,20 @@ async fn nexus_cancellation_types(
702724
})
703725
.await
704726
.unwrap();
727+
// Mark that the cancel handler has responded
728+
cancel_handler_responded_tx.send(true).unwrap();
705729
}
706730

707-
// Confirm the caller WF has not completed even after the handling of the cancel request
731+
// Check that the nexus op future resolves only _after_ the handler WF completes
708732
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
709-
assert!(!*cancel_call_completion_rx.borrow());
733+
assert!(!*caller_op_future_rx.borrow());
710734

711-
// It only completes after the handler WF terminates
712735
cancellation_wait_tx.send(true).unwrap();
713736
wf_handle
714737
.get_workflow_result(Default::default())
715738
.await
716739
.unwrap();
717-
assert!(*cancel_call_completion_rx.borrow());
740+
assert!(*caller_op_future_rx.borrow());
718741
}
719742

720743
assert_matches!(
@@ -724,6 +747,38 @@ async fn nexus_cancellation_types(
724747
};
725748

726749
let shutdown_handle = worker.inner_mut().shutdown_handle();
750+
751+
let check_caller_op_future_resolved_then_allow_handler_to_complete = async {
752+
// The caller nexus op future has been resolved
753+
assert!(*caller_op_future_rx.borrow());
754+
755+
// Verify the handler workflow has not exited yet. This proves that the caller op future
756+
// was resolved as a result of NexusOperationCancelRequestCompleted (written after cancel
757+
// handler responds), as opposed to NexusOperationCanceled (written after handler workflow
758+
// exits).
759+
assert!(
760+
!*handler_exited_rx.borrow(),
761+
"Handler should not have exited yet"
762+
);
763+
764+
let handler_wf_id = handler_wf_id_rx
765+
.try_recv()
766+
.expect("Should have received handler workflow ID");
767+
client
768+
.signal_workflow_execution(
769+
handler_wf_id,
770+
"".to_string(),
771+
"proceed-to-exit".to_string(),
772+
None,
773+
None,
774+
)
775+
.await
776+
.unwrap();
777+
778+
handler_exited_rx.changed().await.unwrap();
779+
assert!(*handler_exited_rx.borrow());
780+
};
781+
727782
join!(
728783
nexus_task_handle,
729784
async { worker.inner_mut().run().await.unwrap() },
@@ -735,6 +790,9 @@ async fn nexus_cancellation_types(
735790
if cancellation_type == NexusOperationCancellationType::TryCancel {
736791
cancellation_rx.changed().await.unwrap();
737792
}
793+
if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested {
794+
check_caller_op_future_resolved_then_allow_handler_to_complete.await;
795+
}
738796
shutdown_handle();
739797
}
740798
);

0 commit comments

Comments
 (0)