From 4a9a3472c406e46c04c8cfe6c08d5c7d44b12bb7 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Tue, 10 Mar 2026 00:53:40 +0100 Subject: [PATCH 1/2] Fix missing load snapshot after manual snapshot update --- golem-common/src/model/mod.rs | 11 ++- golem-common/src/model/worker.rs | 4 +- golem-worker-executor/src/durable_host/mod.rs | 38 ++++++--- .../src/worker/invocation_loop.rs | 19 ++--- golem-worker-executor/src/worker/mod.rs | 35 ++++++--- golem-worker-executor/src/worker/status.rs | 77 ++++++++----------- golem-worker-executor/tests/hot_update.rs | 59 ++++++++++++++ 7 files changed, 162 insertions(+), 81 deletions(-) diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 6e9cdcb75d..0f02bd86a2 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -697,7 +697,13 @@ pub struct AgentStatusRecord { /// The number of encountered error entries grouped by their 'retry_from' index, calculated from /// the last invocation boundary. pub current_retry_count: HashMap, - pub last_snapshot_index: Option, + /// Index of the last manual update snapshot index. Agent will call load_snapshot + /// on this payload before starting replay. + pub last_manual_snapshot_index: Option, + /// Index of the last automatic snapshot index. Must be >= last_manual_snapshot_index. + /// Agent will call load_snapshot on this payload before starting replay. If the load_snapshot + /// fails this will be ignored and a full replay from last_manual_snapshot_index will performed. + pub last_automatic_snapshot_index: Option, } impl Default for AgentStatusRecord { @@ -721,7 +727,8 @@ impl Default for AgentStatusRecord { deleted_regions: DeletedRegions::new(), component_revision_for_replay: ComponentRevision::INITIAL, current_retry_count: HashMap::new(), - last_snapshot_index: None, + last_manual_snapshot_index: None, + last_automatic_snapshot_index: None, } } } diff --git a/golem-common/src/model/worker.rs b/golem-common/src/model/worker.rs index b51d5a469d..b92571641a 100644 --- a/golem-common/src/model/worker.rs +++ b/golem-common/src/model/worker.rs @@ -30,7 +30,9 @@ impl UntypedParsedWorkerCreationLocalAgentConfigEntry { let value_type = component_metadata .find_agent_type_by_name(agent_type_name) - .ok_or("did not find expected agent type in the metadata")? + .ok_or_else(|| { + format!("did not find expected agent type {agent_type_name} in the metadata") + })? .config .into_iter() .find_map(|c| match c { diff --git a/golem-worker-executor/src/durable_host/mod.rs b/golem-worker-executor/src/durable_host/mod.rs index 589f9d99b0..f3107f9fc0 100644 --- a/golem-worker-executor/src/durable_host/mod.rs +++ b/golem-worker-executor/src/durable_host/mod.rs @@ -1173,6 +1173,13 @@ impl> DurableWorkerCtx { OplogEntry::Snapshot { data, mime_type, .. } => (data, mime_type), + OplogEntry::PendingUpdate { + description: + UpdateDescription::SnapshotBased { + payload, mime_type, .. + }, + .. + } => (payload, mime_type), _ => { warn!( "Expected Snapshot entry at oplog index {snapshot_index}, found different entry; falling back to full replay" @@ -2325,11 +2332,25 @@ impl> ExternalOperations for Dur UpdateDescription::Automatic { target_revision, .. } => { - // snapshot update will be succeeded as part of the replay. - let result = Self::resume_replay(store, instance, false).await; - record_resume_worker(start.elapsed()); + let replay_result = async { + if let SnapshotRecoveryResult::Failed = + Self::try_load_snapshot(store, instance).await + { + return Err(WorkerExecutorError::failed_to_resume_worker( + agent_id.clone(), + WorkerExecutorError::runtime("loading snapshot failed"), + )); + }; + // automatic update will be succeeded as part of the replay. + let result = Self::resume_replay(store, instance, false).await?; + + record_resume_worker(start.elapsed()); + + Ok(result) + } + .await; - match result { + match replay_result { Err(error) => { // replay failed. There are two cases here: // 1. We failed before the update has succeeded. In this case we fail the update and retry the replay. @@ -2367,18 +2388,13 @@ impl> ExternalOperations for Dur _ => Err(error), } } - _ => result, + _ => replay_result, } } } } None => match Self::try_load_snapshot(store, instance).await { - SnapshotRecoveryResult::Success => { - let result = Self::resume_replay(store, instance, false).await; - record_resume_worker(start.elapsed()); - result - } - SnapshotRecoveryResult::NotAttempted => { + SnapshotRecoveryResult::Success | SnapshotRecoveryResult::NotAttempted => { let result = Self::resume_replay(store, instance, false).await; record_resume_worker(start.elapsed()); result diff --git a/golem-worker-executor/src/worker/invocation_loop.rs b/golem-worker-executor/src/worker/invocation_loop.rs index 1423f7e0b6..c9a5997937 100644 --- a/golem-worker-executor/src/worker/invocation_loop.rs +++ b/golem-worker-executor/src/worker/invocation_loop.rs @@ -327,22 +327,13 @@ impl InnerInvocationLoop<'_, Ctx> { /// first pending_updates, then pending_invocations async fn drain_pending_from_status(&mut self) -> CommandOutcome { loop { - let status = self.parent.last_known_status.read().await.clone(); + let status = self.parent.get_non_detached_last_known_status().await; // First, try to process a pending update - if let Some(update) = status.pending_updates.front() { - let target_revision = *update.description.target_revision(); - let mut store = self.store.lock().await; - let mut invocation = Invocation { - owned_agent_id: self.owned_agent_id.clone(), - parent: self.parent.clone(), - instance: self.instance, - store: store.deref_mut(), - }; - match invocation.manual_update(target_revision).await { - CommandOutcome::Continue => continue, - other => break other, - } + if status.pending_updates.front().is_some() { + // if the update made it to pending_updates (instead of pending invocations), it is ready + // to be processed on next restart. So just restart here and let the recovery logic take over + break CommandOutcome::BreakInnerLoop(RetryDecision::Immediate); } // Then, try to process a pending invocation diff --git a/golem-worker-executor/src/worker/mod.rs b/golem-worker-executor/src/worker/mod.rs index b82142c053..18a6bb488b 100644 --- a/golem-worker-executor/src/worker/mod.rs +++ b/golem-worker-executor/src/worker/mod.rs @@ -49,7 +49,7 @@ use golem_common::model::component::ComponentRevision; use golem_common::model::component::{ComponentFilePath, PluginPriority}; use golem_common::model::invocation_context::InvocationContextStack; use golem_common::model::oplog::{OplogEntry, OplogIndex, UpdateDescription}; -use golem_common::model::regions::OplogRegion; +use golem_common::model::regions::{DeletedRegionsBuilder, OplogRegion}; use golem_common::model::worker::{RevertWorkerTarget, WorkerCreationLocalAgentConfigEntry}; use golem_common::model::AgentStatus; use golem_common::model::RetryConfig; @@ -2162,6 +2162,29 @@ impl RunningWorker { .component_revision_for_replay, ); + let mut skipped_regions = worker_metadata.last_known_status.skipped_regions; + let mut last_snapshot_index = worker_metadata.last_known_status.last_manual_snapshot_index; + + if let Some(snapshot_idx) = worker_metadata + .last_known_status + .last_automatic_snapshot_index + { + // automatic snapshots are only considered until the first failure + // if there are updates, ignore the automatic snapshot temporarily to catch issues earlier + if pending_update.is_none() + && !parent.snapshot_recovery_disabled.load(Ordering::Acquire) + { + let snapshot_skip = + DeletedRegionsBuilder::from_regions(vec![OplogRegion::from_index_range( + OplogIndex::INITIAL.next()..=snapshot_idx, + )]) + .build(); + skipped_regions.set_override(snapshot_skip); + + last_snapshot_index = Some(snapshot_idx); + } + } + let context = Ctx::create( worker_metadata.created_by, OwnedAgentId::new(worker_metadata.environment_id, &worker_metadata.agent_id), @@ -2184,19 +2207,13 @@ impl RunningWorker { parent.extra_deps(), parent.config(), AgentConfig::new( - worker_metadata.last_known_status.skipped_regions, + skipped_regions, worker_metadata.last_known_status.total_linear_memory_size, component_version_for_replay, worker_metadata.created_by, worker_metadata.config_vars, worker_metadata.local_agent_config, - if pending_update.is_none() - && !parent.snapshot_recovery_disabled.load(Ordering::Acquire) - { - worker_metadata.last_known_status.last_snapshot_index - } else { - None - }, + last_snapshot_index, ), parent.execution_status.clone(), parent.file_loader(), diff --git a/golem-worker-executor/src/worker/status.rs b/golem-worker-executor/src/worker/status.rs index b6b2f032a1..22eaf58cd1 100644 --- a/golem-worker-executor/src/worker/status.rs +++ b/golem-worker-executor/src/worker/status.rs @@ -153,6 +153,8 @@ pub async fn update_status_with_new_entries( component_revision, component_size, component_revision_for_replay, + last_manual_snapshot_index, + last_automatic_snapshot_index, ) = calculate_update_fields( last_known.pending_updates, last_known.failed_updates, @@ -160,6 +162,8 @@ pub async fn update_status_with_new_entries( last_known.component_revision, last_known.component_size, last_known.component_revision_for_replay, + last_known.last_manual_snapshot_index, + last_known.last_automatic_snapshot_index, &deleted_regions, &new_entries, ); @@ -182,12 +186,6 @@ pub async fn update_status_with_new_entries( let active_plugins = calculate_active_plugins(active_plugins, &deleted_regions, &new_entries); - let last_snapshot_index = calculate_last_snapshot_index( - last_known.last_snapshot_index, - &deleted_regions, - &new_entries, - ); - let result = AgentStatusRecord { oplog_idx: new_entries .keys() @@ -211,7 +209,8 @@ pub async fn update_status_with_new_entries( deleted_regions, component_revision_for_replay, current_retry_count, - last_snapshot_index, + last_manual_snapshot_index, + last_automatic_snapshot_index, }; Some(result) @@ -587,6 +586,7 @@ async fn calculate_pending_invocations( result } +#[allow(clippy::type_complexity)] fn calculate_update_fields( initial_pending_updates: VecDeque, initial_failed_updates: Vec, @@ -594,6 +594,8 @@ fn calculate_update_fields( initial_revision: ComponentRevision, initial_component_size: u64, initial_component_revision_for_replay: ComponentRevision, + initial_last_manual_snapshot_index: Option, + initial_last_automatic_snapshot_index: Option, deleted_regions: &DeletedRegions, entries: &BTreeMap, ) -> ( @@ -603,6 +605,8 @@ fn calculate_update_fields( ComponentRevision, u64, ComponentRevision, + Option, + Option, ) { let mut pending_updates = initial_pending_updates; let mut failed_updates = initial_failed_updates; @@ -610,6 +614,8 @@ fn calculate_update_fields( let mut revision = initial_revision; let mut size = initial_component_size; let mut component_revision_for_replay = initial_component_revision_for_replay; + let mut last_manual_snapshot_index = initial_last_manual_snapshot_index; + let mut last_automatic_snapshot_index = initial_last_automatic_snapshot_index; for (oplog_idx, entry) in entries { // Skipping entries in deleted regions (by revert) @@ -663,17 +669,20 @@ fn calculate_update_fields( revision = *target_revision; size = *new_component_size; - let applied_update = pending_updates.pop_front(); - if matches!( - applied_update, - Some(TimestampedUpdateDescription { - description: UpdateDescription::SnapshotBased { .. }, - .. - }) - ) { - component_revision_for_replay = *target_revision + if let Some(TimestampedUpdateDescription { + description: UpdateDescription::SnapshotBased { .. }, + oplog_index: applied_update_oplog_index, + .. + }) = pending_updates.pop_front() + { + component_revision_for_replay = *target_revision; + last_manual_snapshot_index = Some(applied_update_oplog_index); + last_automatic_snapshot_index = None; } } + OplogEntry::Snapshot { .. } => { + last_automatic_snapshot_index = Some(*oplog_idx); + } _ => {} } } @@ -684,6 +693,8 @@ fn calculate_update_fields( revision, size, component_revision_for_replay, + last_manual_snapshot_index, + last_automatic_snapshot_index, ) } @@ -830,31 +841,6 @@ fn calculate_active_plugins( result } -fn calculate_last_snapshot_index( - initial: Option, - deleted_regions: &DeletedRegions, - entries: &BTreeMap, -) -> Option { - let mut result = initial; - - if let Some(idx) = result { - if deleted_regions.is_in_deleted_region(idx) { - result = None; - } - } - - for (idx, entry) in entries { - if deleted_regions.is_in_deleted_region(*idx) { - continue; - } - - if matches!(entry, OplogEntry::Snapshot { .. }) { - result = Some(*idx); - } - } - result -} - fn is_worker_error_retriable( retry_config: &RetryConfig, error: &AgentError, @@ -1627,7 +1613,7 @@ mod test { mime_type: "application/octet-stream".to_string(), }, move |mut status| { - status.last_snapshot_index = Some(oplog_idx); + status.last_automatic_snapshot_index = Some(oplog_idx); status }, ) @@ -1681,7 +1667,8 @@ mod test { status.failed_updates = old_status.failed_updates; status.invocation_results = old_status.invocation_results; status.component_revision_for_replay = old_status.component_revision_for_replay; - status.last_snapshot_index = old_status.last_snapshot_index; + status.last_manual_snapshot_index = old_status.last_manual_snapshot_index; + status.last_automatic_snapshot_index = old_status.last_automatic_snapshot_index; status }) @@ -1765,7 +1752,7 @@ mod test { ) .rounded(); self.add(entry.clone(), move |mut status| { - let _ = status.pending_updates.pop_front(); + let applied_update = status.pending_updates.pop_front(); status.successful_updates.push(SuccessfulUpdateRecord { timestamp: entry.timestamp(), target_revision: *update_description.target_revision(), @@ -1785,6 +1772,8 @@ mod test { } = update_description { status.component_revision_for_replay = target_revision; + status.last_manual_snapshot_index = applied_update.map(|au| au.oplog_index); + status.last_automatic_snapshot_index = None; }; status diff --git a/golem-worker-executor/tests/hot_update.rs b/golem-worker-executor/tests/hot_update.rs index ca3bb099a9..e607cc4f79 100644 --- a/golem-worker-executor/tests/hot_update.rs +++ b/golem-worker-executor/tests/hot_update.rs @@ -1244,3 +1244,62 @@ async fn auto_update_with_disable_wakeup_keeps_worker_interrupted( Ok(()) } + +#[test] +#[tracing::instrument] +async fn agent_can_be_invoked_after_manual_snapshot_update_and_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + #[tagged_as("agent_update_v2")] agent_update_v2: &PrecompiledComponent, + _tracing: &Tracing, +) -> anyhow::Result<()> { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await?; + + let component = executor + .component_dep(&context.default_environment_id, agent_update_v2) + .unique() + .store() + .await?; + + let agent_id = agent_id!("UpdateTest"); + + let worker_id = executor + .start_agent(&component.id, agent_id.clone()) + .await?; + executor.log_output(&worker_id).await?; + + let updated_component = executor + .update_component(&component.id, "it_agent_update_v3_release") + .await?; + + executor + .manual_update_worker(&worker_id, updated_component.revision, false) + .await?; + + executor + .wait_for_component_revision( + &worker_id, + updated_component.revision, + Duration::from_secs(30), + ) + .await?; + + // restart and force the agent to reload the last snapshot + drop(executor); + let executor = start(deps, &context).await?; + + let result = executor + .invoke_and_await_agent(&component, &agent_id, "get", data_value!()) + .await?; + + let metadata = executor.get_worker_metadata(&worker_id).await?; + + executor.check_oplog_is_queryable(&worker_id).await?; + + assert_eq!(result, data_value!(0u64)); + assert_eq!(metadata.component_revision, updated_component.revision); + assert_eq!(update_counts(&metadata), (0, 1, 0)); + + Ok(()) +} From 0b7461a4f636122142c422482b74f8a356af0532 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Tue, 10 Mar 2026 23:07:16 +0100 Subject: [PATCH 2/2] comments --- golem-common/src/model/mod.rs | 4 ++-- golem-worker-executor/src/worker/mod.rs | 8 +++++--- golem-worker-executor/src/worker/status.rs | 20 +++++++++++--------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/golem-common/src/model/mod.rs b/golem-common/src/model/mod.rs index 0f02bd86a2..c246c7593a 100644 --- a/golem-common/src/model/mod.rs +++ b/golem-common/src/model/mod.rs @@ -699,7 +699,7 @@ pub struct AgentStatusRecord { pub current_retry_count: HashMap, /// Index of the last manual update snapshot index. Agent will call load_snapshot /// on this payload before starting replay. - pub last_manual_snapshot_index: Option, + pub last_manual_update_snapshot_index: Option, /// Index of the last automatic snapshot index. Must be >= last_manual_snapshot_index. /// Agent will call load_snapshot on this payload before starting replay. If the load_snapshot /// fails this will be ignored and a full replay from last_manual_snapshot_index will performed. @@ -727,7 +727,7 @@ impl Default for AgentStatusRecord { deleted_regions: DeletedRegions::new(), component_revision_for_replay: ComponentRevision::INITIAL, current_retry_count: HashMap::new(), - last_manual_snapshot_index: None, + last_manual_update_snapshot_index: None, last_automatic_snapshot_index: None, } } diff --git a/golem-worker-executor/src/worker/mod.rs b/golem-worker-executor/src/worker/mod.rs index 18a6bb488b..d9aba6b3e0 100644 --- a/golem-worker-executor/src/worker/mod.rs +++ b/golem-worker-executor/src/worker/mod.rs @@ -2163,14 +2163,16 @@ impl RunningWorker { ); let mut skipped_regions = worker_metadata.last_known_status.skipped_regions; - let mut last_snapshot_index = worker_metadata.last_known_status.last_manual_snapshot_index; + let mut last_snapshot_index = worker_metadata + .last_known_status + .last_manual_update_snapshot_index; + // automatic snapshots are only considered until the first failure. + // additionally, if there are updates, the automatic snapshot is temporarily ignored to catch issues earlier if let Some(snapshot_idx) = worker_metadata .last_known_status .last_automatic_snapshot_index { - // automatic snapshots are only considered until the first failure - // if there are updates, ignore the automatic snapshot temporarily to catch issues earlier if pending_update.is_none() && !parent.snapshot_recovery_disabled.load(Ordering::Acquire) { diff --git a/golem-worker-executor/src/worker/status.rs b/golem-worker-executor/src/worker/status.rs index 22eaf58cd1..df584a6845 100644 --- a/golem-worker-executor/src/worker/status.rs +++ b/golem-worker-executor/src/worker/status.rs @@ -153,7 +153,7 @@ pub async fn update_status_with_new_entries( component_revision, component_size, component_revision_for_replay, - last_manual_snapshot_index, + last_manual_update_snapshot_index, last_automatic_snapshot_index, ) = calculate_update_fields( last_known.pending_updates, @@ -162,7 +162,7 @@ pub async fn update_status_with_new_entries( last_known.component_revision, last_known.component_size, last_known.component_revision_for_replay, - last_known.last_manual_snapshot_index, + last_known.last_manual_update_snapshot_index, last_known.last_automatic_snapshot_index, &deleted_regions, &new_entries, @@ -209,7 +209,7 @@ pub async fn update_status_with_new_entries( deleted_regions, component_revision_for_replay, current_retry_count, - last_manual_snapshot_index, + last_manual_update_snapshot_index, last_automatic_snapshot_index, }; @@ -594,7 +594,7 @@ fn calculate_update_fields( initial_revision: ComponentRevision, initial_component_size: u64, initial_component_revision_for_replay: ComponentRevision, - initial_last_manual_snapshot_index: Option, + initial_last_manual_update_snapshot_index: Option, initial_last_automatic_snapshot_index: Option, deleted_regions: &DeletedRegions, entries: &BTreeMap, @@ -614,7 +614,7 @@ fn calculate_update_fields( let mut revision = initial_revision; let mut size = initial_component_size; let mut component_revision_for_replay = initial_component_revision_for_replay; - let mut last_manual_snapshot_index = initial_last_manual_snapshot_index; + let mut last_manual_update_snapshot_index = initial_last_manual_update_snapshot_index; let mut last_automatic_snapshot_index = initial_last_automatic_snapshot_index; for (oplog_idx, entry) in entries { @@ -676,7 +676,7 @@ fn calculate_update_fields( }) = pending_updates.pop_front() { component_revision_for_replay = *target_revision; - last_manual_snapshot_index = Some(applied_update_oplog_index); + last_manual_update_snapshot_index = Some(applied_update_oplog_index); last_automatic_snapshot_index = None; } } @@ -693,7 +693,7 @@ fn calculate_update_fields( revision, size, component_revision_for_replay, - last_manual_snapshot_index, + last_manual_update_snapshot_index, last_automatic_snapshot_index, ) } @@ -1667,7 +1667,8 @@ mod test { status.failed_updates = old_status.failed_updates; status.invocation_results = old_status.invocation_results; status.component_revision_for_replay = old_status.component_revision_for_replay; - status.last_manual_snapshot_index = old_status.last_manual_snapshot_index; + status.last_manual_update_snapshot_index = + old_status.last_manual_update_snapshot_index; status.last_automatic_snapshot_index = old_status.last_automatic_snapshot_index; status @@ -1772,7 +1773,8 @@ mod test { } = update_description { status.component_revision_for_replay = target_revision; - status.last_manual_snapshot_index = applied_update.map(|au| au.oplog_index); + status.last_manual_update_snapshot_index = + applied_update.map(|au| au.oplog_index); status.last_automatic_snapshot_index = None; };