Skip to content

Commit 90791f3

Browse files
committed
Improve test
1 parent 6936e36 commit 90791f3

File tree

1 file changed

+66
-11
lines changed
  • tests/integ_tests/workflow_tests

1 file changed

+66
-11
lines changed

tests/integ_tests/workflow_tests/nexus.rs

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use assert_matches::assert_matches;
44
use std::time::Duration;
55
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions};
66
use temporal_sdk::{CancellableFuture, NexusOperationOptions, WfContext, WfExitValue};
7+
use tokio_stream::StreamExt;
78
use temporal_sdk_core_api::errors::PollError;
89
use temporal_sdk_core_protos::{
910
coresdk::{
@@ -557,10 +558,10 @@ async fn nexus_cancellation_types(
557558
let endpoint = mk_nexus_endpoint(&mut starter).await;
558559
let schedule_to_close_timeout = Some(Duration::from_secs(5));
559560

560-
let (cancel_call_completion_tx, cancel_call_completion_rx) = watch::channel(false);
561+
let (caller_op_future_tx, caller_op_future_rx) = watch::channel(false);
561562
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
562563
let endpoint = endpoint.clone();
563-
let cancel_call_completion_tx = cancel_call_completion_tx.clone();
564+
let caller_op_future_tx = caller_op_future_tx.clone();
564565
async move {
565566
let options = NexusOperationOptions {
566567
endpoint,
@@ -577,42 +578,59 @@ async fn nexus_cancellation_types(
577578
started.cancel(&ctx);
578579

579580
let res = result.await;
580-
cancel_call_completion_tx.send(true).unwrap();
581+
caller_op_future_tx.send(true).unwrap();
581582

582-
// Make sure cancel after completion doesn't cause problems
583+
// Make sure cancel after op completion doesn't cause problems
583584
started.cancel(&ctx);
584585

585586
// We need to wait slightly so that the workflow is not complete at the same time
586-
// cancellation is invoked. If it does, the caller workflow will close and the server won't attempt to send the cancellation to the handler
587+
// cancellation is invoked. If it does, the caller workflow will close and the server
588+
// won't attempt to send the cancellation to the handler
587589
ctx.timer(Duration::from_millis(1)).await;
588590
Ok(res.into())
589591
}
590592
});
591593

592594
let (cancellation_wait_tx, cancellation_wait_rx) = watch::channel(false);
593595
let (cancellation_tx, mut cancellation_rx) = watch::channel(false);
596+
let (handler_exited_tx, mut handler_exited_rx) = watch::channel(false);
594597
worker.register_wf("async_completer".to_owned(), move |ctx: WfContext| {
595598
let cancellation_tx = cancellation_tx.clone();
596599
let mut cancellation_wait_rx = cancellation_wait_rx.clone();
600+
let handler_exited_tx = handler_exited_tx.clone();
597601
async move {
602+
// Wait for cancellation
598603
ctx.cancelled().await;
599604
cancellation_tx.send(true).unwrap();
605+
600606
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
601607
cancellation_wait_rx.changed().await.unwrap();
608+
} else if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested {
609+
// For WAIT_REQUESTED, wait until the caller nexus op future has been resolved. This
610+
// allows the test to verify that it resolved due to
611+
// NexusOperationCancelRequestCompleted (written after cancel handler responds)
612+
// rather than NexusOperationCanceled (written after handler workflow completes as
613+
// cancelled).
614+
let mut signal_chan = ctx.make_signal_channel("proceed-to-exit");
615+
signal_chan.next().await;
602616
}
617+
618+
handler_exited_tx.send(true).unwrap();
603619
Ok(WfExitValue::<()>::Cancelled)
604620
}
605621
});
606622
let submitter = worker.get_submitter_handle();
607623
let wf_handle = starter.start_with_worker(wf_name, &mut worker).await;
608624
let client = starter.get_client().await.get_client().clone();
625+
let (handler_wf_id_tx, mut handler_wf_id_rx) = tokio::sync::oneshot::channel();
609626
let nexus_task_handle = async {
610627
let nt = core_worker.poll_nexus_task().await.unwrap().unwrap_task();
611628
let start_req = assert_matches!(
612629
nt.request.unwrap().variant.unwrap(),
613630
request::Variant::StartOperation(sr) => sr
614631
);
615632
let completer_id = format!("completer-{}", rand_6_chars());
633+
let _ = handler_wf_id_tx.send(completer_id.clone());
616634
let links = start_req
617635
.links
618636
.iter()
@@ -668,16 +686,19 @@ async fn nexus_cancellation_types(
668686
match cancellation_type {
669687
NexusOperationCancellationType::WaitCancellationCompleted
670688
| NexusOperationCancellationType::WaitCancellationRequested => {
671-
assert!(!*cancel_call_completion_rx.borrow());
689+
// The nexus op future should not have been resolved
690+
assert!(!*caller_op_future_rx.borrow());
672691
}
673692
NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel => {
674693
wf_handle
675694
.get_workflow_result(Default::default())
676695
.await
677696
.unwrap();
678-
assert!(*cancel_call_completion_rx.borrow())
697+
// The nexus op future should have been resolved
698+
assert!(*caller_op_future_rx.borrow())
679699
}
680700
}
701+
let (cancel_handler_responded_tx, _cancel_handler_responded_rx) = watch::channel(false);
681702
if cancellation_type != NexusOperationCancellationType::Abandon {
682703
let nt = core_worker.poll_nexus_task().await.unwrap();
683704
let nt = nt.unwrap_task();
@@ -702,19 +723,22 @@ async fn nexus_cancellation_types(
702723
})
703724
.await
704725
.unwrap();
726+
// Mark that the cancel handler has responded
727+
cancel_handler_responded_tx.send(true).unwrap();
705728
}
706729

707-
// Confirm the caller WF has not completed even after the handling of the cancel request
730+
731+
732+
// Check that the nexus op future resolves only _after_ the handler WF completes
708733
if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted {
709-
assert!(!*cancel_call_completion_rx.borrow());
734+
assert!(!*caller_op_future_rx.borrow());
710735

711-
// It only completes after the handler WF terminates
712736
cancellation_wait_tx.send(true).unwrap();
713737
wf_handle
714738
.get_workflow_result(Default::default())
715739
.await
716740
.unwrap();
717-
assert!(*cancel_call_completion_rx.borrow());
741+
assert!(*caller_op_future_rx.borrow());
718742
}
719743

720744
assert_matches!(
@@ -724,6 +748,34 @@ async fn nexus_cancellation_types(
724748
};
725749

726750
let shutdown_handle = worker.inner_mut().shutdown_handle();
751+
752+
let check_caller_op_future_resolved_then_allow_handler_to_complete = async {
753+
// The caller nexus op future has been resolved
754+
assert!(*caller_op_future_rx.borrow());
755+
756+
// Verify the handler workflow has not exited yet. This proves that the caller op future
757+
// was resolved as a result of NexusOperationCancelRequestCompleted (written after cancel
758+
// handler responds), as opposed to NexusOperationCanceled (written after handler workflow
759+
// exits).
760+
assert!(!*handler_exited_rx.borrow(), "Handler should not have exited yet");
761+
762+
let handler_wf_id = handler_wf_id_rx.try_recv()
763+
.expect("Should have received handler workflow ID");
764+
client
765+
.signal_workflow_execution(
766+
handler_wf_id,
767+
"".to_string(),
768+
"proceed-to-exit".to_string(),
769+
None,
770+
None,
771+
)
772+
.await
773+
.unwrap();
774+
775+
handler_exited_rx.changed().await.unwrap();
776+
assert!(*handler_exited_rx.borrow());
777+
};
778+
727779
join!(
728780
nexus_task_handle,
729781
async { worker.inner_mut().run().await.unwrap() },
@@ -735,6 +787,9 @@ async fn nexus_cancellation_types(
735787
if cancellation_type == NexusOperationCancellationType::TryCancel {
736788
cancellation_rx.changed().await.unwrap();
737789
}
790+
if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested {
791+
check_caller_op_future_resolved_then_allow_handler_to_complete.await;
792+
}
738793
shutdown_handle();
739794
}
740795
);

0 commit comments

Comments
 (0)