Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 101 additions & 89 deletions core/src/worker/workflow/machines/local_activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ fsm! {
// is replaying), and then immediately scheduled and transitions to either requesting that lang
// execute the activity, or waiting for the marker from history.
Executing --(Schedule, shared on_schedule) --> RequestSent;
Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent;
ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved;
Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead;

// Execution path =============================================================================
RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated;
Expand All @@ -66,32 +65,28 @@ fsm! {
--> MarkerCommandRecorded;

// Replay path ================================================================================
// LAs on the replay path always need to eventually see the marker
WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;
WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;
// If we are told to cancel while waiting for the marker, we still need to wait for the marker.
WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;

// Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling
// the LA and the marker being recorded, peekahead might not always resolve the LA *before*
// scheduling it. This transition accounts for that.
WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent;
WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingMarkerEvent;
WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead;
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType),
on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent;

// LAs on the replay path always need to eventually see the marker
ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded)
--> MarkerCommandRecorded;

// It is entirely possible to have started the LA while replaying, only to find that we have
// reached a new WFT and there still was no marker. In such cases we need to execute the LA.
// This can easily happen if upon first execution, the worker does WFT heartbeating but then
// dies for some reason.
WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// If the activity is pre resolved we still expect to see marker recorded event at some point,
// even though we already resolved the activity.
WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData),
shared on_marker_recorded) --> MarkerCommandRecorded;
// Ignore cancellations when waiting for the marker after being pre-resolved
WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved;
WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType))
--> WaitingMarkerEventPreResolved;
WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent;

// Ignore cancellation in final state
MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded;
Expand Down Expand Up @@ -151,22 +146,12 @@ impl From<CompleteLocalActivityData> for ResolveDat {
pub(super) fn new_local_activity(
mut attrs: ValidScheduleLA,
replaying_when_invoked: bool,
maybe_pre_resolved: Option<ResolveDat>,
wf_time: Option<SystemTime>,
internal_flags: InternalFlagsRef,
) -> Result<(LocalActivityMachine, Vec<MachineResponse>), WFMachinesError> {
let initial_state = if replaying_when_invoked {
if let Some(dat) = maybe_pre_resolved {
ReplayingPreResolved { dat }.into()
} else {
Replaying {}.into()
}
Replaying {}.into()
} else {
if maybe_pre_resolved.is_some() {
return Err(WFMachinesError::Nondeterminism(
"Local activity cannot be created as pre-resolved while not replaying".to_string(),
));
}
Executing {}.into()
};

Expand Down Expand Up @@ -202,15 +187,13 @@ impl LocalActivityMachine {
/// command-event processing - instead simply applying the event to this machine and then
/// skipping over the rest. If this machine is in the `ResultNotified` state, that means
/// command handling should proceed as normal (ie: The command needs to be matched and removed).
/// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]`
/// states, which will return true.
///
/// Attempting the check in any other state likely means a bug in the SDK.
pub(super) fn marker_should_get_special_handling(&self) -> Result<bool, WFMachinesError> {
match self.state() {
LocalActivityMachineState::ResultNotified(_) => Ok(false),
LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true),
LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true),
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true),
LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true),
_ => Err(WFMachinesError::Fatal(format!(
"Attempted to check for LA marker handling in invalid state {}",
self.state()
Expand All @@ -223,18 +206,29 @@ impl LocalActivityMachine {
pub(super) fn will_accept_resolve_marker(&self) -> bool {
matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
)
}

/// Must be called if the workflow encounters a non-replay workflow task
pub(super) fn encountered_non_replay_wft(
&mut self,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
if !matches!(
self.state(),
LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_)
) {
panic!(
"Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}",
self.shared_state.attrs.seq,
self.state(),
);
}

// This only applies to the waiting-for-marker state. It can safely be ignored in the others
if !matches!(
self.state(),
LocalActivityMachineState::WaitingMarkerEvent(_)
LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_)
) {
return Ok(vec![]);
}
Expand Down Expand Up @@ -448,31 +442,10 @@ impl MarkerCommandRecorded {
#[derive(Default, Clone)]
pub(super) struct Replaying {}
impl Replaying {
pub(super) fn on_schedule(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
TransitionResult::ok(
[],
WaitingMarkerEvent {
already_resolved: false,
},
)
}
}

#[derive(Clone)]
pub(super) struct ReplayingPreResolved {
dat: ResolveDat,
}
impl ReplayingPreResolved {
pub(super) fn on_schedule(
self,
) -> LocalActivityMachineTransition<WaitingMarkerEventPreResolved> {
TransitionResult::ok(
[
LocalActivityCommand::FakeMarker,
LocalActivityCommand::Resolved(self.dat),
],
WaitingMarkerEventPreResolved {},
)
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {})
}
}

Expand Down Expand Up @@ -559,35 +532,35 @@ impl ResultNotified {
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEvent {
already_resolved: bool,
}

impl WaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(
shared,
&dat,
TransitionResult::commands(if self.already_resolved {
vec![]
} else {
vec![LocalActivityCommand::Resolved(dat.into())]
})
)
}
pub(super) struct WaitingResolveFromMarkerLookAhead {}

impl WaitingResolveFromMarkerLookAhead {
// FIXME(JWH): I dont think this transition exists any longer.
// pub(super) fn on_marker_recorded(
// self,
// shared: &mut SharedState,
// dat: CompleteLocalActivityData,
// ) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
// verify_marker_dat!(
// shared,
// &dat,
// TransitionResult::commands(if self.already_resolved {
// vec![]
// } else {
// vec![LocalActivityCommand::Resolved(dat.into())]
// })
// )
// }
fn on_handle_result(
self,
dat: ResolveDat,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
TransitionResult::ok(
[LocalActivityCommand::Resolved(dat)],
WaitingMarkerEvent {
already_resolved: true,
},
[
LocalActivityCommand::FakeMarker,
LocalActivityCommand::Resolved(dat),
],
ResolvedFromMarkerLookAheadWaitingMarkerEvent {},
)
}
pub(super) fn on_started_non_replay_wft(
Expand All @@ -601,7 +574,9 @@ impl WaitingMarkerEvent {
)])
}

fn on_cancel_requested(self) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
Expand All @@ -610,22 +585,59 @@ impl WaitingMarkerEvent {
fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<WaitingMarkerEvent> {
) -> LocalActivityMachineTransition<WaitingResolveFromMarkerLookAhead> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

#[derive(Default, Clone)]
pub(super) struct WaitingMarkerEventPreResolved {}
impl WaitingMarkerEventPreResolved {
pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {}

impl ResolvedFromMarkerLookAheadWaitingMarkerEvent {
pub(super) fn on_marker_recorded(
self,
shared: &mut SharedState,
dat: CompleteLocalActivityData,
) -> LocalActivityMachineTransition<MarkerCommandRecorded> {
verify_marker_dat!(shared, &dat, TransitionResult::default())
verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![]))
}
// fn on_handle_result(
// self,
// dat: ResolveDat,
// ) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// TransitionResult::ok(
// [LocalActivityCommand::Resolved(dat)],
// ResolvedFromMarkerLookAheadWaitingMarkerEvent {},
// )
// }
// pub(super) fn on_started_non_replay_wft(
// self,
// dat: &mut SharedState,
// ) -> LocalActivityMachineTransition<RequestSent> {
// // We aren't really "replaying" anymore for our purposes, and want to record the marker.
// dat.replaying_when_invoked = false;
// TransitionResult::commands([LocalActivityCommand::RequestActivityExecution(
// dat.attrs.clone(),
// )])
// }

fn on_cancel_requested(
self,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// We still "request a cancel" even though we know the local activity should not be running
// because the data might be in the pre-resolved list.
TransitionResult::ok([LocalActivityCommand::RequestCancel], self)
}

fn on_no_wait_cancel(
self,
_: ActivityCancellationType,
) -> LocalActivityMachineTransition<ResolvedFromMarkerLookAheadWaitingMarkerEvent> {
// Markers are always recorded when cancelling, so this is the same as a normal cancel on
// the replay path
self.on_cancel_requested()
}
}

Expand Down
Loading
Loading