diff --git a/core/src/worker/workflow/machines/local_activity_state_machine.rs b/core/src/worker/workflow/machines/local_activity_state_machine.rs index ca22afc36..8fa232e29 100644 --- a/core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -48,8 +48,7 @@ fsm! { // is replaying), and then immediately scheduled and transitions to either requesting that lang // execute the activity, or waiting for the marker from history. Executing --(Schedule, shared on_schedule) --> RequestSent; - Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent; - ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved; + Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead; // Execution path ============================================================================= RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated; @@ -66,32 +65,28 @@ fsm! { --> MarkerCommandRecorded; // Replay path ================================================================================ - // LAs on the replay path always need to eventually see the marker - WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) - --> MarkerCommandRecorded; + WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; // If we are told to cancel while waiting for the marker, we still need to wait for the marker. - WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + // Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling // the LA and the marker being recorded, peekahead might not always resolve the LA *before* // scheduling it. This transition accounts for that. - WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent; - WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), - on_no_wait_cancel) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + + // LAs on the replay path always need to eventually see the marker + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) + --> MarkerCommandRecorded; // It is entirely possible to have started the LA while replaying, only to find that we have // reached a new WFT and there still was no marker. In such cases we need to execute the LA. // This can easily happen if upon first execution, the worker does WFT heartbeating but then // dies for some reason. - WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; - - // If the activity is pre resolved we still expect to see marker recorded event at some point, - // even though we already resolved the activity. - WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData), - shared on_marker_recorded) --> MarkerCommandRecorded; - // Ignore cancellations when waiting for the marker after being pre-resolved - WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved; - WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType)) - --> WaitingMarkerEventPreResolved; + WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; // Ignore cancellation in final state MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded; @@ -151,22 +146,12 @@ impl From for ResolveDat { pub(super) fn new_local_activity( mut attrs: ValidScheduleLA, replaying_when_invoked: bool, - maybe_pre_resolved: Option, wf_time: Option, internal_flags: InternalFlagsRef, ) -> Result<(LocalActivityMachine, Vec), WFMachinesError> { let initial_state = if replaying_when_invoked { - if let Some(dat) = maybe_pre_resolved { - ReplayingPreResolved { dat }.into() - } else { - Replaying {}.into() - } + Replaying {}.into() } else { - if maybe_pre_resolved.is_some() { - return Err(WFMachinesError::Nondeterminism( - "Local activity cannot be created as pre-resolved while not replaying".to_string(), - )); - } Executing {}.into() }; @@ -202,15 +187,13 @@ impl LocalActivityMachine { /// command-event processing - instead simply applying the event to this machine and then /// skipping over the rest. If this machine is in the `ResultNotified` state, that means /// command handling should proceed as normal (ie: The command needs to be matched and removed). - /// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]` - /// states, which will return true. /// /// Attempting the check in any other state likely means a bug in the SDK. pub(super) fn marker_should_get_special_handling(&self) -> Result { match self.state() { LocalActivityMachineState::ResultNotified(_) => Ok(false), - LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true), - LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true), + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true), _ => Err(WFMachinesError::Fatal(format!( "Attempted to check for LA marker handling in invalid state {}", self.state() @@ -223,7 +206,7 @@ impl LocalActivityMachine { pub(super) fn will_accept_resolve_marker(&self) -> bool { matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) } @@ -231,10 +214,21 @@ impl LocalActivityMachine { pub(super) fn encountered_non_replay_wft( &mut self, ) -> Result, WFMachinesError> { + if !matches!( + self.state(), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) + ) { + panic!( + "Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}", + self.shared_state.attrs.seq, + self.state(), + ); + } + // This only applies to the waiting-for-marker state. It can safely be ignored in the others if !matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) { return Ok(vec![]); } @@ -448,31 +442,10 @@ impl MarkerCommandRecorded { #[derive(Default, Clone)] pub(super) struct Replaying {} impl Replaying { - pub(super) fn on_schedule(self) -> LocalActivityMachineTransition { - TransitionResult::ok( - [], - WaitingMarkerEvent { - already_resolved: false, - }, - ) - } -} - -#[derive(Clone)] -pub(super) struct ReplayingPreResolved { - dat: ResolveDat, -} -impl ReplayingPreResolved { pub(super) fn on_schedule( self, - ) -> LocalActivityMachineTransition { - TransitionResult::ok( - [ - LocalActivityCommand::FakeMarker, - LocalActivityCommand::Resolved(self.dat), - ], - WaitingMarkerEventPreResolved {}, - ) + ) -> LocalActivityMachineTransition { + TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {}) } } @@ -559,35 +532,35 @@ impl ResultNotified { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEvent { - already_resolved: bool, -} - -impl WaitingMarkerEvent { - pub(super) fn on_marker_recorded( - self, - shared: &mut SharedState, - dat: CompleteLocalActivityData, - ) -> LocalActivityMachineTransition { - verify_marker_dat!( - shared, - &dat, - TransitionResult::commands(if self.already_resolved { - vec![] - } else { - vec![LocalActivityCommand::Resolved(dat.into())] - }) - ) - } +pub(super) struct WaitingResolveFromMarkerLookAhead {} + +impl WaitingResolveFromMarkerLookAhead { + // FIXME(JWH): I dont think this transition exists any longer. + // pub(super) fn on_marker_recorded( + // self, + // shared: &mut SharedState, + // dat: CompleteLocalActivityData, + // ) -> LocalActivityMachineTransition { + // verify_marker_dat!( + // shared, + // &dat, + // TransitionResult::commands(if self.already_resolved { + // vec![] + // } else { + // vec![LocalActivityCommand::Resolved(dat.into())] + // }) + // ) + // } fn on_handle_result( self, dat: ResolveDat, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { TransitionResult::ok( - [LocalActivityCommand::Resolved(dat)], - WaitingMarkerEvent { - already_resolved: true, - }, + [ + LocalActivityCommand::FakeMarker, + LocalActivityCommand::Resolved(dat), + ], + ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, ) } pub(super) fn on_started_non_replay_wft( @@ -601,7 +574,9 @@ impl WaitingMarkerEvent { )]) } - fn on_cancel_requested(self) -> LocalActivityMachineTransition { + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { // We still "request a cancel" even though we know the local activity should not be running // because the data might be in the pre-resolved list. TransitionResult::ok([LocalActivityCommand::RequestCancel], self) @@ -610,7 +585,7 @@ impl WaitingMarkerEvent { fn on_no_wait_cancel( self, _: ActivityCancellationType, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { // Markers are always recorded when cancelling, so this is the same as a normal cancel on // the replay path self.on_cancel_requested() @@ -618,14 +593,51 @@ impl WaitingMarkerEvent { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEventPreResolved {} -impl WaitingMarkerEventPreResolved { +pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {} + +impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { pub(super) fn on_marker_recorded( self, shared: &mut SharedState, dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { - verify_marker_dat!(shared, &dat, TransitionResult::default()) + verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) + } + // fn on_handle_result( + // self, + // dat: ResolveDat, + // ) -> LocalActivityMachineTransition { + // TransitionResult::ok( + // [LocalActivityCommand::Resolved(dat)], + // ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, + // ) + // } + // pub(super) fn on_started_non_replay_wft( + // self, + // dat: &mut SharedState, + // ) -> LocalActivityMachineTransition { + // // We aren't really "replaying" anymore for our purposes, and want to record the marker. + // dat.replaying_when_invoked = false; + // TransitionResult::commands([LocalActivityCommand::RequestActivityExecution( + // dat.attrs.clone(), + // )]) + // } + + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { + // We still "request a cancel" even though we know the local activity should not be running + // because the data might be in the pre-resolved list. + TransitionResult::ok([LocalActivityCommand::RequestCancel], self) + } + + fn on_no_wait_cancel( + self, + _: ActivityCancellationType, + ) -> LocalActivityMachineTransition { + // Markers are always recorded when cancelling, so this is the same as a normal cancel on + // the replay path + self.on_cancel_requested() } } diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 4ae721bef..653c51fad 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -337,6 +337,40 @@ impl WorkflowMachines { Ok(()) } + /// Apply local activity resolutions that have been queued up while looking ahead at the next WFT. + /// Pending resolutions are always applied in order (i.e. the order of markers in history, which + /// corresponds to the order that those LA completed execution). + pub(crate) fn apply_pending_local_activity_resolutions(&mut self) -> Result<()> { + while let Some(next_res_seq) = self.local_activity_data.peek_next_pending_resolution_seq() { + if let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(next_res_seq)) { + let next_res = self + .local_activity_data + .take_next_pending_resolution(next_res_seq); + + let mach = self.machine_mut(mk); + if let Machines::LocalActivityMachine(ref mut lam) = *mach { + let more_responses = lam.try_resolve_with_dat(next_res.into())?; + self.process_machine_responses(mk, more_responses)?; + } else { + // FIXME(JWH): Not sure about this. Is it enough that machine_key returned something, + // to deduce that machine_mut _should_ return something that is a LocalActivityMachine? + // Or are there case where that supposition is incorrect? + panic!( + "Machine was not a local activity machine when it should have been during resolution: {:?}", + next_res_seq + ); + } + } else { + // That's probably ok, the LA might just have not been created from the + // lang side yet. Stop applying resolutions for now, we'll try again later. + // Or fail for real if there are left over resolutions at the end of the WFT. + break; + } + } + + Ok(()) // FIXME(JWH): Unneeded Result? + } + /// Let this workflow know that something we've been waiting locally on has resolved, like a /// local activity or side effect /// @@ -770,6 +804,13 @@ impl WorkflowMachines { if let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(la_dat.marker_dat.seq)) { + // FIXME(JWH): This case already handle LA completion in correct (history recorded) order + // for LA that completes during a later WFT. We could easily move this to the + // "peeked marker" collection above, avoiding an extra code path, which could + // potentially avoid another case of misordering of LA completions involving + // one LA completing during a later WFT and one LA completing during the same WFT. + // Can this actually happen? If so, is there is a risk of breaking behavior in + // unifying these cases? delayed_actions.push(DelayedAction::WakeLa(mk, Box::new(la_dat))); } else { self.local_activity_data.insert_peeked_marker(la_dat); @@ -1221,24 +1262,24 @@ impl WorkflowMachines { self.local_activity_data.enqueue(act); } MachineResponse::RequestCancelLocalActivity(seq) => { + // FIXME(JWH): reconsider this case // We might already know about the status from a pre-resolution. Apply it if so. // We need to do this because otherwise we might need to perform additional // activations during replay that didn't happen during execution, just like // we sometimes pre-resolve activities when first requested. - if let Some(preres) = self.local_activity_data.take_preresolution(seq) { - if let Machines::LocalActivityMachine(lam) = self.machine_mut(smk) { - let more_responses = lam.try_resolve_with_dat(preres)?; - self.process_machine_responses(smk, more_responses)?; - } else { - panic!( - "A non local-activity machine returned a request cancel LA response" - ); - } - } + // if let Some(preres) = self.local_activity_data.take_preresolution(seq) { + // if let Machines::LocalActivityMachine(lam) = self.machine_mut(smk) { + // let more_responses = lam.try_resolve_with_dat(preres)?; + // self.process_machine_responses(smk, more_responses)?; + // } else { + // panic!( + // "A non local-activity machine returned a request cancel LA response" + // ); + // } + // } // If it's in the request queue, just rip it out. - else if let Some(removed_act) = - self.local_activity_data.remove_from_queue(seq) - { + // else + if let Some(removed_act) = self.local_activity_data.remove_from_queue(seq) { // We removed it. Notify the machine that the activity cancelled. if let Machines::LocalActivityMachine(lam) = self.machine_mut(smk) { let more_responses = lam.try_resolve( @@ -1360,7 +1401,6 @@ impl WorkflowMachines { let (la, mach_resp) = new_local_activity( attrs, self.replaying, - self.local_activity_data.take_preresolution(seq), self.current_wf_time, self.observed_internal_flags.clone(), )?; diff --git a/core/src/worker/workflow/machines/workflow_machines/local_acts.rs b/core/src/worker/workflow/machines/workflow_machines/local_acts.rs index 29b20f0fd..1975562b5 100644 --- a/core/src/worker/workflow/machines/workflow_machines/local_acts.rs +++ b/core/src/worker/workflow/machines/workflow_machines/local_acts.rs @@ -17,9 +17,8 @@ pub(super) struct LocalActivityData { cancel_requests: Vec, /// Seq #s of local activities which we have sent to be executed but have not yet resolved executing: HashSet, - /// Maps local activity sequence numbers to their resolutions as found when looking ahead at - /// next WFT - preresolutions: HashMap, + /// Queued local activity resolutions that were found by looking ahead at the next WFT. + pending_resolutions: Vec, /// Set true if the workflow is terminating am_terminating: bool, } @@ -77,14 +76,33 @@ impl LocalActivityData { self.executing.len() + self.new_requests.len() } + // FIXME(JWH): reconsider the need for this pub(super) fn insert_peeked_marker(&mut self, dat: CompleteLocalActivityData) { - self.preresolutions.insert(dat.marker_dat.seq, dat.into()); + self.pending_resolutions.push(dat.into()); } - pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { - self.preresolutions.remove(&seq) + pub(super) fn peek_next_pending_resolution_seq(&self) -> Option { + self.pending_resolutions.first().map(|r| r.marker_dat.seq) } + pub(super) fn take_next_pending_resolution(&mut self, seq: u32) -> CompleteLocalActivityData { + if self.pending_resolutions.is_empty() { + panic!("take_next_pending_resolution: No pending resolutions to take"); + } + let lad = self.pending_resolutions.remove(0); + if lad.marker_dat.seq != seq { + panic!( + "take_next_pending_resolution: Provided seq does not match the next pending resolution" + ); + } + lad + } + + // FIXME(JWH): reconsider the need for this + // pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { + // self.preresolutions.remove(&seq) + // } + pub(super) fn remove_from_queue(&mut self, seq: u32) -> Option { self.new_requests .iter() diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index f50f62d4a..4bd302df2 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -723,6 +723,8 @@ impl ManagedRun { if !completion.activation_was_eviction && !self.am_broken { self.wfm.apply_next_task_if_ready()?; } + + self.wfm.apply_pending_local_activity_resolutions()?; let new_local_acts = self.wfm.drain_queued_local_activities(); self.sink_la_requests(new_local_acts)?; @@ -1445,6 +1447,11 @@ impl WorkflowManager { self.machines.prepare_for_wft_response() } + /// Apply local activity resolutions that have been queued up. + fn apply_pending_local_activity_resolutions(&mut self) -> Result<()> { + self.machines.apply_pending_local_activity_resolutions() + } + /// Remove and return all queued local activities. Once this is called, they need to be /// dispatched for execution. fn drain_queued_local_activities(&mut self) -> Vec { diff --git a/sdk-core-protos/src/canned_histories.rs b/sdk-core-protos/src/canned_histories.rs index 66b389db9..d46296186 100644 --- a/sdk-core-protos/src/canned_histories.rs +++ b/sdk-core-protos/src/canned_histories.rs @@ -1321,6 +1321,23 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_MARKER_RECORDED (LA 2 result) +/// 7: EVENT_TYPE_MARKER_RECORDED (LA 1 result) +/// 8: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED +pub fn parallel_las_job_order_hist() -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_local_activity_result_marker(2, "2", b"hi2".into()); + t.add_local_activity_result_marker(1, "1", b"hi1".into()); + t.add_workflow_task_scheduled_and_started(); + t +} + /// Useful for one-of needs to write a crafted history to a file. Writes it as serialized proto /// binary to the provided path. pub fn write_hist_to_binfile( diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index fe1507563..71a0db7e1 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -710,7 +710,7 @@ async fn third_weird_la_nondeterminism_repro() { /// activity cancellation, that would be wrong because, during execution, the LA resolution is /// always going to take _longer_ than the instantaneous cancel effect. /// -/// This affect applies regardless of how you choose to interleave cancellations and LAs. Ultimately +/// This effect applies regardless of how you choose to interleave cancellations and LAs. Ultimately /// all cancellations will happen at once (in the order they are submitted) while the LA executions /// are queued (because this all happens synchronously in the workflow machines). If you were to /// _wait_ on an LA, and then cancel something else, and then run another LA, such that all commands @@ -2589,6 +2589,80 @@ async fn two_sequential_las( worker.run().await.unwrap(); } +async fn parallel_las_job_order_wf(ctx: WfContext) -> WorkflowResult<()> { + tokio::join!( + ctx.local_activity(LocalActivityOptions { + activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), + input: 100.as_json_payload().unwrap(), + ..Default::default() + }), + ctx.local_activity(LocalActivityOptions { + activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), + input: 1.as_json_payload().unwrap(), + ..Default::default() + }) + ); + Ok(().into()) +} + +#[rstest] +#[tokio::test] +async fn parallel_las_job_order(#[values(true, false)] replay: bool) { + let t = canned_histories::parallel_las_job_order_hist(); + let mut mock_cfg = if replay { + MockPollCfg::from_resps(t, [ResponseType::AllHistory]) + } else { + MockPollCfg::from_hist_builder(t) + }; + + let mut aai = ActivationAssertionsInterceptor::default(); + // Verify ResolveActivity jobs are received in completion order (seq 2 first, then seq 1) + // This catches the bug where they might be sent in request order instead + aai.skip_one().then(move |a| { + assert_matches!( + a.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra1)) + }, WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra2)) + }] => {assert_eq!(ra1.seq, 2); assert_eq!(ra2.seq, 1)} + ); + }); + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts.then(move |wft| { + let commands = &wft.commands; + if !replay { + assert_eq!(commands.len(), 3); + assert_eq!(commands[0].command_type(), CommandType::RecordMarker); + assert_eq!(commands[1].command_type(), CommandType::RecordMarker); + assert_matches!( + commands[2].command_type(), + CommandType::CompleteWorkflowExecution + ); + } else { + assert_eq!(commands.len(), 1); + assert_matches!( + commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + } + }); + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.set_worker_interceptor(aai); + worker.register_wf(DEFAULT_WORKFLOW_TYPE, parallel_las_job_order_wf); + worker.register_activity( + DEFAULT_ACTIVITY_TYPE, + move |_ctx: ActContext, sleep_ms: u64| async move { + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + Ok("Resolved") + }, + ); + worker.run().await.unwrap(); +} + async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),