Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions golem-worker-executor/src/worker/invocation_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::services::{HasEvents, HasOplog, HasWorker};
use crate::worker::invocation::{
invoke_observed_and_traced, lower_invocation, InvocationMode, InvokeResult,
};
use crate::worker::{QueuedWorkerInvocation, RetryDecision, RunningWorker, Worker, WorkerCommand};
use crate::worker::{
FinalWorkerState, QueuedWorkerInvocation, RetryDecision, RunningWorker, Worker, WorkerCommand,
};
use crate::workerctx::{PublicWorkerIo, WorkerCtx};
use anyhow::anyhow;
use async_lock::Mutex;
Expand Down Expand Up @@ -111,7 +113,15 @@ impl<Ctx: WorkerCtx> InvocationLoop<Ctx> {
match final_decision {
None | Some(RetryDecision::None) => {
debug!("Invocation queue loop notifying parent about being stopped");
self.parent.stop_internal(true, None).await;
self.parent
.stop_internal(
true,
None,
FinalWorkerState::Unloaded {
startup_failure: None,
},
)
.await;
break;
}
Some(RetryDecision::TryStop(ts)) => {
Expand All @@ -122,7 +132,15 @@ impl<Ctx: WorkerCtx> InvocationLoop<Ctx> {
continue;
} else {
debug!("Invocation queue loop notifying parent about being stopped");
self.parent.stop_internal(true, None).await;
self.parent
.stop_internal(
true,
None,
FinalWorkerState::Unloaded {
startup_failure: None,
},
)
.await;
break;
}
}
Expand Down Expand Up @@ -170,7 +188,15 @@ impl<Ctx: WorkerCtx> InvocationLoop<Ctx> {
agent_id: self.owned_agent_id.agent_id(),
result: Err(err.clone()),
});
self.parent.stop_internal(true, Some(err)).await;
self.parent
.stop_internal(
true,
Some(err.clone()),
FinalWorkerState::Unloaded {
startup_failure: Some(err),
},
)
.await;
None
}
}
Expand Down Expand Up @@ -212,7 +238,15 @@ impl<Ctx: WorkerCtx> InvocationLoop<Ctx> {
warn!("Failed to start the worker: {err}");
store.data().set_suspended();

self.parent.stop_internal(true, Some(err)).await;
self.parent
.stop_internal(
true,
Some(err.clone()),
FinalWorkerState::Unloaded {
startup_failure: Some(err),
},
)
.await;
Some(RetryDecision::None) // early return, we can't retry this
}
}
Expand Down Expand Up @@ -405,7 +439,15 @@ impl<Ctx: WorkerCtx> InnerInvocationLoop<'_, Ctx> {
warn!("Failed to resume replay: {err}");
store.data().set_suspended();

self.parent.stop_internal(true, Some(err)).await;
self.parent
.stop_internal(
true,
Some(err.clone()),
FinalWorkerState::Unloaded {
startup_failure: Some(err),
},
)
.await;
CommandOutcome::BreakOuterLoop
}
}
Expand Down
117 changes: 82 additions & 35 deletions golem-worker-executor/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,14 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
WorkerInstance::Running(running) => {
if is_running_agent_idle(running).await {
let stop_result = self
.stop_internal_locked(&mut instance_guard, false, None)
.stop_internal_locked(
&mut instance_guard,
false,
None,
FinalWorkerState::Unloaded {
startup_failure: None,
},
)
.await;

Some(stop_result)
Expand Down Expand Up @@ -447,18 +454,8 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
/// Rejects all new invocations and stops any running execution.
pub async fn start_deleting(&self) -> Result<(), WorkerExecutorError> {
let error = WorkerExecutorError::invalid_request("Worker is being deleted");
let mut instance_guard = self.lock_stopped_worker(Some(error.clone())).await;
match &*instance_guard {
WorkerInstance::Unloaded { .. } => {
*instance_guard = WorkerInstance::Deleting;
// More invocations might have been enqueued since the worker has stopped
self.fail_pending_invocations(error).await;
}
WorkerInstance::Deleting => {}
_ => panic!("impossible status after lock_stopped_worker"),
};

// TODO: Not sure what to do with execution status here.
self.stop_internal(false, Some(error), FinalWorkerState::Deleting)
.await;
Ok(())
}

Expand Down Expand Up @@ -1176,7 +1173,7 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
));
}

let instance_guard = self.lock_stopped_worker(None).await;
let instance_guard = self.lock_stopped_worker().await;
match &*instance_guard {
WorkerInstance::Unloaded { .. } => {}
WorkerInstance::Deleting => {
Expand Down Expand Up @@ -1319,6 +1316,7 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
&self,
called_from_invocation_loop: bool,
fail_pending_invocations: Option<WorkerExecutorError>,
final_state: FinalWorkerState,
) {
let mut instance_guard = self.instance.lock().await;

Expand All @@ -1327,6 +1325,7 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
&mut instance_guard,
called_from_invocation_loop,
fail_pending_invocations,
final_state,
)
.await;

Expand All @@ -1342,6 +1341,7 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
called_from_invocation_loop: bool,
// Only respected when this is the call that triggered the stop
fail_pending_invocations: Option<WorkerExecutorError>,
final_state: FinalWorkerState,
) -> StopResult {
// Temporarily set the instance to unloaded so we can work with the old value.
// This is not visible to anyone as long as we are holding the lock.
Expand All @@ -1354,6 +1354,7 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {

match previous_instance_state {
WorkerInstance::Unloaded { .. } | WorkerInstance::WaitingForPermit(_) => {
**instance_guard = final_state.into_instance();
StopResult::Stopped
}
WorkerInstance::Deleting => {
Expand All @@ -1365,9 +1366,15 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
**instance_guard = previous_instance_state;
StopResult::Stopped
}
WorkerInstance::Stopping(stopping) => StopResult::AlreadyStopping {
notify: stopping.notify.clone(),
},
WorkerInstance::Stopping(mut stopping) => {
// If we're stopping for deletion, upgrade the final state
if matches!(final_state, FinalWorkerState::Deleting) {
stopping.final_state = FinalWorkerState::Deleting;
}
let notify = stopping.notify.clone();
**instance_guard = WorkerInstance::Stopping(stopping);
StopResult::AlreadyStopping { notify }
}
WorkerInstance::Running(running) => {
debug!(
"Stopping running worker ({called_from_invocation_loop}) ({})",
Expand All @@ -1380,24 +1387,20 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
self.fail_pending_invocations(error.clone()).await;
};

// Store the startup failure so that subsequent enqueue attempts
// on this Unloaded worker fail fast instead of hanging forever.
**instance_guard = WorkerInstance::Unloaded {
startup_failure: fail_pending_invocations,
};

// Make sure the oplog is committed
self.oplog.commit(CommitLevel::Always).await;

// when stopping via the invocation loop we can stop immediately, no need to go via the stopping status
if called_from_invocation_loop {
**instance_guard = final_state.into_instance();
StopResult::Stopped
} else {
// drop the running worker, this signals to the invocation loop to start exiting.
let run_loop_handle = running.stop();
let notify = OneShotEvent::new();
**instance_guard = WorkerInstance::Stopping(StoppingWorker {
notify: notify.clone(),
final_state,
});
StopResult::NeedsWaitForLoopExit {
run_loop_handle,
Expand All @@ -1420,10 +1423,17 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
run_loop_handle.await.expect("Failed to join run loop");

let mut instance_guard = self.instance.lock().await;
assert!(matches!(*instance_guard, WorkerInstance::Stopping(_)));
*instance_guard = WorkerInstance::Unloaded {
startup_failure: None,
};
match std::mem::replace(
&mut *instance_guard,
WorkerInstance::Unloaded {
startup_failure: None,
},
) {
WorkerInstance::Stopping(stopping) => {
*instance_guard = stopping.final_state.into_instance();
}
other => panic!("expected Stopping, got {other:?}"),
}
drop(instance_guard);

notify.set();
Expand Down Expand Up @@ -1483,13 +1493,16 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
}

// Lock a worker in either Unloaded or Deleting state.
async fn lock_stopped_worker(
&self,
fail_pending_invocations: Option<WorkerExecutorError>,
) -> MutexGuard<'_, WorkerInstance> {
async fn lock_stopped_worker(&self) -> MutexGuard<'_, WorkerInstance> {
loop {
self.stop_internal(false, fail_pending_invocations.clone())
.await;
self.stop_internal(
false,
None,
FinalWorkerState::Unloaded {
startup_failure: None,
},
)
.await;
let instance_guard = self.instance.lock().await;

if let WorkerInstance::Deleting | WorkerInstance::Unloaded { .. } = &*instance_guard {
Expand All @@ -1504,7 +1517,14 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
delay: Option<Duration>,
oom_retry_count: u32,
) -> Result<bool, WorkerExecutorError> {
this.stop_internal(called_from_invocation_loop, None).await;
this.stop_internal(
called_from_invocation_loop,
None,
FinalWorkerState::Unloaded {
startup_failure: None,
},
)
.await;
if let Some(delay) = delay {
tokio::time::sleep(delay).await;
}
Expand Down Expand Up @@ -1929,7 +1949,14 @@ enum WorkerInstance {

impl WorkerInstance {
fn is_deleting(&self) -> bool {
matches!(self, Self::Deleting)
matches!(
self,
Self::Deleting
| Self::Stopping(StoppingWorker {
final_state: FinalWorkerState::Deleting,
..
})
)
}

fn startup_failure(&self) -> Option<&WorkerExecutorError> {
Expand Down Expand Up @@ -2341,9 +2368,29 @@ impl RunningWorker {
}
}

#[derive(Debug)]
pub(crate) enum FinalWorkerState {
Unloaded {
startup_failure: Option<WorkerExecutorError>,
},
Deleting,
}

impl FinalWorkerState {
fn into_instance(self) -> WorkerInstance {
match self {
FinalWorkerState::Unloaded { startup_failure } => {
WorkerInstance::Unloaded { startup_failure }
}
FinalWorkerState::Deleting => WorkerInstance::Deleting,
}
}
}

#[derive(Debug)]
struct StoppingWorker {
notify: OneShotEvent,
final_state: FinalWorkerState,
}

#[derive(Debug, Clone)]
Expand Down
Loading