@@ -4,6 +4,7 @@ use assert_matches::assert_matches;
44use std:: time:: Duration ;
55use temporal_client:: { WfClientExt , WorkflowClientTrait , WorkflowOptions } ;
66use temporal_sdk:: { CancellableFuture , NexusOperationOptions , WfContext , WfExitValue } ;
7+ use tokio_stream:: StreamExt ;
78use temporal_sdk_core_api:: errors:: PollError ;
89use temporal_sdk_core_protos:: {
910 coresdk:: {
@@ -557,10 +558,10 @@ async fn nexus_cancellation_types(
557558 let endpoint = mk_nexus_endpoint ( & mut starter) . await ;
558559 let schedule_to_close_timeout = Some ( Duration :: from_secs ( 5 ) ) ;
559560
560- let ( cancel_call_completion_tx , cancel_call_completion_rx ) = watch:: channel ( false ) ;
561+ let ( caller_op_future_tx , caller_op_future_rx ) = watch:: channel ( false ) ;
561562 worker. register_wf ( wf_name. to_owned ( ) , move |ctx : WfContext | {
562563 let endpoint = endpoint. clone ( ) ;
563- let cancel_call_completion_tx = cancel_call_completion_tx . clone ( ) ;
564+ let caller_op_future_tx = caller_op_future_tx . clone ( ) ;
564565 async move {
565566 let options = NexusOperationOptions {
566567 endpoint,
@@ -577,42 +578,59 @@ async fn nexus_cancellation_types(
577578 started. cancel ( & ctx) ;
578579
579580 let res = result. await ;
580- cancel_call_completion_tx . send ( true ) . unwrap ( ) ;
581+ caller_op_future_tx . send ( true ) . unwrap ( ) ;
581582
582- // Make sure cancel after completion doesn't cause problems
583+ // Make sure cancel after op completion doesn't cause problems
583584 started. cancel ( & ctx) ;
584585
585586 // We need to wait slightly so that the workflow is not complete at the same time
586- // cancellation is invoked. If it does, the caller workflow will close and the server won't attempt to send the cancellation to the handler
587+ // cancellation is invoked. If it does, the caller workflow will close and the server
588+ // won't attempt to send the cancellation to the handler
587589 ctx. timer ( Duration :: from_millis ( 1 ) ) . await ;
588590 Ok ( res. into ( ) )
589591 }
590592 } ) ;
591593
592594 let ( cancellation_wait_tx, cancellation_wait_rx) = watch:: channel ( false ) ;
593595 let ( cancellation_tx, mut cancellation_rx) = watch:: channel ( false ) ;
596+ let ( handler_exited_tx, mut handler_exited_rx) = watch:: channel ( false ) ;
594597 worker. register_wf ( "async_completer" . to_owned ( ) , move |ctx : WfContext | {
595598 let cancellation_tx = cancellation_tx. clone ( ) ;
596599 let mut cancellation_wait_rx = cancellation_wait_rx. clone ( ) ;
600+ let handler_exited_tx = handler_exited_tx. clone ( ) ;
597601 async move {
602+ // Wait for cancellation
598603 ctx. cancelled ( ) . await ;
599604 cancellation_tx. send ( true ) . unwrap ( ) ;
605+
600606 if cancellation_type == NexusOperationCancellationType :: WaitCancellationCompleted {
601607 cancellation_wait_rx. changed ( ) . await . unwrap ( ) ;
608+ } else if cancellation_type == NexusOperationCancellationType :: WaitCancellationRequested {
609+ // For WAIT_REQUESTED, wait until the caller nexus op future has been resolved. This
610+ // allows the test to verify that it resolved due to
611+ // NexusOperationCancelRequestCompleted (written after cancel handler responds)
612+ // rather than NexusOperationCanceled (written after handler workflow completes as
613+ // cancelled).
614+ let mut signal_chan = ctx. make_signal_channel ( "proceed-to-exit" ) ;
615+ signal_chan. next ( ) . await ;
602616 }
617+
618+ handler_exited_tx. send ( true ) . unwrap ( ) ;
603619 Ok ( WfExitValue :: < ( ) > :: Cancelled )
604620 }
605621 } ) ;
606622 let submitter = worker. get_submitter_handle ( ) ;
607623 let wf_handle = starter. start_with_worker ( wf_name, & mut worker) . await ;
608624 let client = starter. get_client ( ) . await . get_client ( ) . clone ( ) ;
625+ let ( handler_wf_id_tx, mut handler_wf_id_rx) = tokio:: sync:: oneshot:: channel ( ) ;
609626 let nexus_task_handle = async {
610627 let nt = core_worker. poll_nexus_task ( ) . await . unwrap ( ) . unwrap_task ( ) ;
611628 let start_req = assert_matches ! (
612629 nt. request. unwrap( ) . variant. unwrap( ) ,
613630 request:: Variant :: StartOperation ( sr) => sr
614631 ) ;
615632 let completer_id = format ! ( "completer-{}" , rand_6_chars( ) ) ;
633+ let _ = handler_wf_id_tx. send ( completer_id. clone ( ) ) ;
616634 let links = start_req
617635 . links
618636 . iter ( )
@@ -668,16 +686,19 @@ async fn nexus_cancellation_types(
668686 match cancellation_type {
669687 NexusOperationCancellationType :: WaitCancellationCompleted
670688 | NexusOperationCancellationType :: WaitCancellationRequested => {
671- assert ! ( !* cancel_call_completion_rx. borrow( ) ) ;
689+ // The nexus op future should not have been resolved
690+ assert ! ( !* caller_op_future_rx. borrow( ) ) ;
672691 }
673692 NexusOperationCancellationType :: Abandon | NexusOperationCancellationType :: TryCancel => {
674693 wf_handle
675694 . get_workflow_result ( Default :: default ( ) )
676695 . await
677696 . unwrap ( ) ;
678- assert ! ( * cancel_call_completion_rx. borrow( ) )
697+ // The nexus op future should have been resolved
698+ assert ! ( * caller_op_future_rx. borrow( ) )
679699 }
680700 }
701+ let ( cancel_handler_responded_tx, _cancel_handler_responded_rx) = watch:: channel ( false ) ;
681702 if cancellation_type != NexusOperationCancellationType :: Abandon {
682703 let nt = core_worker. poll_nexus_task ( ) . await . unwrap ( ) ;
683704 let nt = nt. unwrap_task ( ) ;
@@ -702,19 +723,22 @@ async fn nexus_cancellation_types(
702723 } )
703724 . await
704725 . unwrap ( ) ;
726+ // Mark that the cancel handler has responded
727+ cancel_handler_responded_tx. send ( true ) . unwrap ( ) ;
705728 }
706729
707- // Confirm the caller WF has not completed even after the handling of the cancel request
730+
731+
732+ // Check that the nexus op future resolves only _after_ the handler WF completes
708733 if cancellation_type == NexusOperationCancellationType :: WaitCancellationCompleted {
709- assert ! ( !* cancel_call_completion_rx . borrow( ) ) ;
734+ assert ! ( !* caller_op_future_rx . borrow( ) ) ;
710735
711- // It only completes after the handler WF terminates
712736 cancellation_wait_tx. send ( true ) . unwrap ( ) ;
713737 wf_handle
714738 . get_workflow_result ( Default :: default ( ) )
715739 . await
716740 . unwrap ( ) ;
717- assert ! ( * cancel_call_completion_rx . borrow( ) ) ;
741+ assert ! ( * caller_op_future_rx . borrow( ) ) ;
718742 }
719743
720744 assert_matches ! (
@@ -724,6 +748,34 @@ async fn nexus_cancellation_types(
724748 } ;
725749
726750 let shutdown_handle = worker. inner_mut ( ) . shutdown_handle ( ) ;
751+
752+ let check_caller_op_future_resolved_then_allow_handler_to_complete = async {
753+ // The caller nexus op future has been resolved
754+ assert ! ( * caller_op_future_rx. borrow( ) ) ;
755+
756+ // Verify the handler workflow has NOT exited yet. This proves that the caller op future
757+ // was resolved as a result of NexusOperationCancelRequestCompleted (written after cancel
758+ // handler responds), as opposed to NexusOperationCanceled (written after handler workflow
759+ // exits).
760+ assert ! ( !* handler_exited_rx. borrow( ) , "Handler should not have exited yet" ) ;
761+
762+ let handler_wf_id = handler_wf_id_rx. try_recv ( )
763+ . expect ( "Should have received handler workflow ID" ) ;
764+ client
765+ . signal_workflow_execution (
766+ handler_wf_id,
767+ "" . to_string ( ) ,
768+ "proceed-to-exit" . to_string ( ) ,
769+ None ,
770+ None ,
771+ )
772+ . await
773+ . unwrap ( ) ;
774+
775+ handler_exited_rx. changed ( ) . await . unwrap ( ) ;
776+ assert ! ( * handler_exited_rx. borrow( ) ) ;
777+ } ;
778+
727779 join ! (
728780 nexus_task_handle,
729781 async { worker. inner_mut( ) . run( ) . await . unwrap( ) } ,
@@ -735,6 +787,9 @@ async fn nexus_cancellation_types(
735787 if cancellation_type == NexusOperationCancellationType :: TryCancel {
736788 cancellation_rx. changed( ) . await . unwrap( ) ;
737789 }
790+ if cancellation_type == NexusOperationCancellationType :: WaitCancellationRequested {
791+ check_caller_op_future_resolved_then_allow_handler_to_complete. await ;
792+ }
738793 shutdown_handle( ) ;
739794 }
740795 ) ;
0 commit comments