Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions golem-common/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,13 @@ pub struct AgentStatusRecord {
/// The number of encountered error entries grouped by their 'retry_from' index, calculated from
/// the last invocation boundary.
pub current_retry_count: HashMap<OplogIndex, u32>,
pub last_snapshot_index: Option<OplogIndex>,
/// Index of the last manual update snapshot index. Agent will call load_snapshot
/// on this payload before starting replay.
pub last_manual_update_snapshot_index: Option<OplogIndex>,
/// Index of the last automatic snapshot index. Must be >= last_manual_snapshot_index.
/// Agent will call load_snapshot on this payload before starting replay. If the load_snapshot
/// fails this will be ignored and a full replay from last_manual_snapshot_index will performed.
pub last_automatic_snapshot_index: Option<OplogIndex>,
}

impl Default for AgentStatusRecord {
Expand All @@ -721,7 +727,8 @@ impl Default for AgentStatusRecord {
deleted_regions: DeletedRegions::new(),
component_revision_for_replay: ComponentRevision::INITIAL,
current_retry_count: HashMap::new(),
last_snapshot_index: None,
last_manual_update_snapshot_index: None,
last_automatic_snapshot_index: None,
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion golem-common/src/model/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ impl UntypedParsedWorkerCreationLocalAgentConfigEntry {

let value_type = component_metadata
.find_agent_type_by_name(agent_type_name)
.ok_or("did not find expected agent type in the metadata")?
.ok_or_else(|| {
format!("did not find expected agent type {agent_type_name} in the metadata")
})?
.config
.into_iter()
.find_map(|c| match c {
Expand Down
38 changes: 27 additions & 11 deletions golem-worker-executor/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,13 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> DurableWorkerCtx<Ctx> {
OplogEntry::Snapshot {
data, mime_type, ..
} => (data, mime_type),
OplogEntry::PendingUpdate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this fixes the main issue, and now that we have the snapshot-based recovery this is probably a nice way to fix it but very confused that there was no machinery for this earlier.

I think it was supposed to be something like this:

  • once a manual update succeeds, it adds a skipped region from the beginning to the update point so that oplog part is ignored
  • so on next recovery, we reach the OplogEntry::PendingUpdate { SnapshotBased } again, and the recovery code calls load-snapshot so our state is restored
  • the rest of the oplog gets replayed

So I don't fully understand why this did not work, and whether it is a problem or not that now we have two ways to recover from a snapshot-based update. (As I think you did not touch the old one)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this was always broken or at least broken for a very long time.

PendingUpdate is a hint entry and I don't see any logic in replay that would replay load-snapshot. This is surprising to me too, maybe I missed something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of how it work(ed) on main:

I think the part that's hard to see/understand is the skipped region logic that now 100% lives in the worker status calculation. Previously it was explicitly set in various points of the above update logic, making it easier to follow (but the current way of calculating everything directly from the oplog is definitely the correct way)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are saying is exactly correct, but it's not doing this part in any following replays as pending_updates will not contain the update anymore (as it now has a successful update entry)

here we call load-snapshot (the agent sdk is supposed to internally intialize the agent as part of this) (https://github.com/golemcloud/golem/blob/main/golem-worker-executor/src/durable_host/mod.rs#L1048-L1055)

description:
UpdateDescription::SnapshotBased {
payload, mime_type, ..
},
..
} => (payload, mime_type),
_ => {
warn!(
"Expected Snapshot entry at oplog index {snapshot_index}, found different entry; falling back to full replay"
Expand Down Expand Up @@ -2325,11 +2332,25 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
UpdateDescription::Automatic {
target_revision, ..
} => {
// snapshot update will be succeeded as part of the replay.
let result = Self::resume_replay(store, instance, false).await;
record_resume_worker(start.elapsed());
let replay_result = async {
if let SnapshotRecoveryResult::Failed =
Self::try_load_snapshot(store, instance).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstand this part - but for automatic updates we should never use snapshots, but replay always from the beginning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only using snapshots created by manual updates, not the automatic snapshots (the logic for that lives in the WorkerConfig creation).

I don't think we can replay the parts of the oplog that were skipped as part of a manual oplog with a new component version in general (as manual snapshot updates can freely break oplog backwards compat). So I think only replaying from the last succesful manual snapshot update is correct here.

{
return Err(WorkerExecutorError::failed_to_resume_worker(
agent_id.clone(),
WorkerExecutorError::runtime("loading snapshot failed"),
));
};
// automatic update will be succeeded as part of the replay.
let result = Self::resume_replay(store, instance, false).await?;

record_resume_worker(start.elapsed());

Ok(result)
}
.await;

match result {
match replay_result {
Err(error) => {
// replay failed. There are two cases here:
// 1. We failed before the update has succeeded. In this case we fail the update and retry the replay.
Expand Down Expand Up @@ -2367,18 +2388,13 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
_ => Err(error),
}
}
_ => result,
_ => replay_result,
}
}
}
}
None => match Self::try_load_snapshot(store, instance).await {
SnapshotRecoveryResult::Success => {
let result = Self::resume_replay(store, instance, false).await;
record_resume_worker(start.elapsed());
result
}
SnapshotRecoveryResult::NotAttempted => {
SnapshotRecoveryResult::Success | SnapshotRecoveryResult::NotAttempted => {
let result = Self::resume_replay(store, instance, false).await;
record_resume_worker(start.elapsed());
result
Expand Down
19 changes: 5 additions & 14 deletions golem-worker-executor/src/worker/invocation_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,22 +327,13 @@ impl<Ctx: WorkerCtx> InnerInvocationLoop<'_, Ctx> {
/// first pending_updates, then pending_invocations
async fn drain_pending_from_status(&mut self) -> CommandOutcome {
loop {
let status = self.parent.last_known_status.read().await.clone();
let status = self.parent.get_non_detached_last_known_status().await;

// First, try to process a pending update
if let Some(update) = status.pending_updates.front() {
let target_revision = *update.description.target_revision();
let mut store = self.store.lock().await;
let mut invocation = Invocation {
owned_agent_id: self.owned_agent_id.clone(),
parent: self.parent.clone(),
instance: self.instance,
store: store.deref_mut(),
};
match invocation.manual_update(target_revision).await {
CommandOutcome::Continue => continue,
other => break other,
}
if status.pending_updates.front().is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that there is always an AgentInvocation::ManualUpdate enqueued and processed for saving the snapshot before we reach here? Probably this logic became a bit obscure through all the refactorings.
What if there are multiple manual updates enqueued? When we perform the first, and restart, what enqueues the thing in the command queue?

Copy link
Contributor Author

@mschuwalow mschuwalow Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. pending_updates is only based on the pending_update oplog entry, which for manual snapshot updates is only created during the following path:

agent receives update request via grpc -> agent writes a manual update pending_agent_invocation oplog entry -> agent processes the invocation -> agent writes a pending_update oplog entry

For automatic updates the pending_update oplog entry is written immediately

// if the update made it to pending_updates (instead of pending invocations), it is ready
// to be processed on next restart. So just restart here and let the recovery logic take over
break CommandOutcome::BreakInnerLoop(RetryDecision::Immediate);
}

// Then, try to process a pending invocation
Expand Down
37 changes: 28 additions & 9 deletions golem-worker-executor/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use golem_common::model::component::ComponentRevision;
use golem_common::model::component::{ComponentFilePath, PluginPriority};
use golem_common::model::invocation_context::InvocationContextStack;
use golem_common::model::oplog::{OplogEntry, OplogIndex, UpdateDescription};
use golem_common::model::regions::OplogRegion;
use golem_common::model::regions::{DeletedRegionsBuilder, OplogRegion};
use golem_common::model::worker::{RevertWorkerTarget, WorkerCreationLocalAgentConfigEntry};
use golem_common::model::AgentStatus;
use golem_common::model::RetryConfig;
Expand Down Expand Up @@ -2162,6 +2162,31 @@ impl RunningWorker {
.component_revision_for_replay,
);

let mut skipped_regions = worker_metadata.last_known_status.skipped_regions;
let mut last_snapshot_index = worker_metadata
.last_known_status
.last_manual_update_snapshot_index;

// automatic snapshots are only considered until the first failure.
// additionally, if there are updates, the automatic snapshot is temporarily ignored to catch issues earlier
if let Some(snapshot_idx) = worker_metadata
.last_known_status
.last_automatic_snapshot_index
{
if pending_update.is_none()
&& !parent.snapshot_recovery_disabled.load(Ordering::Acquire)
{
let snapshot_skip =
DeletedRegionsBuilder::from_regions(vec![OplogRegion::from_index_range(
OplogIndex::INITIAL.next()..=snapshot_idx,
)])
.build();
skipped_regions.set_override(snapshot_skip);

last_snapshot_index = Some(snapshot_idx);
}
}

let context = Ctx::create(
worker_metadata.created_by,
OwnedAgentId::new(worker_metadata.environment_id, &worker_metadata.agent_id),
Expand All @@ -2184,19 +2209,13 @@ impl RunningWorker {
parent.extra_deps(),
parent.config(),
AgentConfig::new(
worker_metadata.last_known_status.skipped_regions,
skipped_regions,
worker_metadata.last_known_status.total_linear_memory_size,
component_version_for_replay,
worker_metadata.created_by,
worker_metadata.config_vars,
worker_metadata.local_agent_config,
if pending_update.is_none()
&& !parent.snapshot_recovery_disabled.load(Ordering::Acquire)
{
worker_metadata.last_known_status.last_snapshot_index
} else {
None
},
last_snapshot_index,
),
parent.execution_status.clone(),
parent.file_loader(),
Expand Down
79 changes: 35 additions & 44 deletions golem-worker-executor/src/worker/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,17 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(
component_revision,
component_size,
component_revision_for_replay,
last_manual_update_snapshot_index,
last_automatic_snapshot_index,
) = calculate_update_fields(
last_known.pending_updates,
last_known.failed_updates,
last_known.successful_updates,
last_known.component_revision,
last_known.component_size,
last_known.component_revision_for_replay,
last_known.last_manual_update_snapshot_index,
last_known.last_automatic_snapshot_index,
&deleted_regions,
&new_entries,
);
Expand All @@ -182,12 +186,6 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(

let active_plugins = calculate_active_plugins(active_plugins, &deleted_regions, &new_entries);

let last_snapshot_index = calculate_last_snapshot_index(
last_known.last_snapshot_index,
&deleted_regions,
&new_entries,
);

let result = AgentStatusRecord {
oplog_idx: new_entries
.keys()
Expand All @@ -211,7 +209,8 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(
deleted_regions,
component_revision_for_replay,
current_retry_count,
last_snapshot_index,
last_manual_update_snapshot_index,
last_automatic_snapshot_index,
};

Some(result)
Expand Down Expand Up @@ -587,13 +586,16 @@ async fn calculate_pending_invocations<T: HasOplogService + Sync>(
result
}

#[allow(clippy::type_complexity)]
fn calculate_update_fields(
initial_pending_updates: VecDeque<TimestampedUpdateDescription>,
initial_failed_updates: Vec<FailedUpdateRecord>,
initial_successful_updates: Vec<SuccessfulUpdateRecord>,
initial_revision: ComponentRevision,
initial_component_size: u64,
initial_component_revision_for_replay: ComponentRevision,
initial_last_manual_update_snapshot_index: Option<OplogIndex>,
initial_last_automatic_snapshot_index: Option<OplogIndex>,
deleted_regions: &DeletedRegions,
entries: &BTreeMap<OplogIndex, OplogEntry>,
) -> (
Expand All @@ -603,13 +605,17 @@ fn calculate_update_fields(
ComponentRevision,
u64,
ComponentRevision,
Option<OplogIndex>,
Option<OplogIndex>,
) {
let mut pending_updates = initial_pending_updates;
let mut failed_updates = initial_failed_updates;
let mut successful_updates = initial_successful_updates;
let mut revision = initial_revision;
let mut size = initial_component_size;
let mut component_revision_for_replay = initial_component_revision_for_replay;
let mut last_manual_update_snapshot_index = initial_last_manual_update_snapshot_index;
let mut last_automatic_snapshot_index = initial_last_automatic_snapshot_index;

for (oplog_idx, entry) in entries {
// Skipping entries in deleted regions (by revert)
Expand Down Expand Up @@ -663,17 +669,20 @@ fn calculate_update_fields(
revision = *target_revision;
size = *new_component_size;

let applied_update = pending_updates.pop_front();
if matches!(
applied_update,
Some(TimestampedUpdateDescription {
description: UpdateDescription::SnapshotBased { .. },
..
})
) {
component_revision_for_replay = *target_revision
if let Some(TimestampedUpdateDescription {
description: UpdateDescription::SnapshotBased { .. },
oplog_index: applied_update_oplog_index,
..
}) = pending_updates.pop_front()
{
component_revision_for_replay = *target_revision;
last_manual_update_snapshot_index = Some(applied_update_oplog_index);
last_automatic_snapshot_index = None;
}
}
OplogEntry::Snapshot { .. } => {
last_automatic_snapshot_index = Some(*oplog_idx);
}
_ => {}
}
}
Expand All @@ -684,6 +693,8 @@ fn calculate_update_fields(
revision,
size,
component_revision_for_replay,
last_manual_update_snapshot_index,
last_automatic_snapshot_index,
)
}

Expand Down Expand Up @@ -830,31 +841,6 @@ fn calculate_active_plugins(
result
}

fn calculate_last_snapshot_index(
initial: Option<OplogIndex>,
deleted_regions: &DeletedRegions,
entries: &BTreeMap<OplogIndex, OplogEntry>,
) -> Option<OplogIndex> {
let mut result = initial;

if let Some(idx) = result {
if deleted_regions.is_in_deleted_region(idx) {
result = None;
}
}

for (idx, entry) in entries {
if deleted_regions.is_in_deleted_region(*idx) {
continue;
}

if matches!(entry, OplogEntry::Snapshot { .. }) {
result = Some(*idx);
}
}
result
}

fn is_worker_error_retriable(
retry_config: &RetryConfig,
error: &AgentError,
Expand Down Expand Up @@ -1627,7 +1613,7 @@ mod test {
mime_type: "application/octet-stream".to_string(),
},
move |mut status| {
status.last_snapshot_index = Some(oplog_idx);
status.last_automatic_snapshot_index = Some(oplog_idx);
status
},
)
Expand Down Expand Up @@ -1681,7 +1667,9 @@ mod test {
status.failed_updates = old_status.failed_updates;
status.invocation_results = old_status.invocation_results;
status.component_revision_for_replay = old_status.component_revision_for_replay;
status.last_snapshot_index = old_status.last_snapshot_index;
status.last_manual_update_snapshot_index =
old_status.last_manual_update_snapshot_index;
status.last_automatic_snapshot_index = old_status.last_automatic_snapshot_index;

status
})
Expand Down Expand Up @@ -1765,7 +1753,7 @@ mod test {
)
.rounded();
self.add(entry.clone(), move |mut status| {
let _ = status.pending_updates.pop_front();
let applied_update = status.pending_updates.pop_front();
status.successful_updates.push(SuccessfulUpdateRecord {
timestamp: entry.timestamp(),
target_revision: *update_description.target_revision(),
Expand All @@ -1785,6 +1773,9 @@ mod test {
} = update_description
{
status.component_revision_for_replay = target_revision;
status.last_manual_update_snapshot_index =
applied_update.map(|au| au.oplog_index);
status.last_automatic_snapshot_index = None;
};

status
Expand Down
Loading
Loading