Skip to content

Commit 4a9a347

Browse files
committed
Fix missing load snapshot after manual snapshot update
1 parent 4ecb6f9 commit 4a9a347

File tree

7 files changed

+162
-81
lines changed

7 files changed

+162
-81
lines changed

golem-common/src/model/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,13 @@ pub struct AgentStatusRecord {
697697
/// The number of encountered error entries grouped by their 'retry_from' index, calculated from
698698
/// the last invocation boundary.
699699
pub current_retry_count: HashMap<OplogIndex, u32>,
700-
pub last_snapshot_index: Option<OplogIndex>,
700+
/// Index of the last manual update snapshot index. Agent will call load_snapshot
701+
/// on this payload before starting replay.
702+
pub last_manual_snapshot_index: Option<OplogIndex>,
703+
/// Index of the last automatic snapshot index. Must be >= last_manual_snapshot_index.
704+
/// Agent will call load_snapshot on this payload before starting replay. If the load_snapshot
705+
/// fails this will be ignored and a full replay from last_manual_snapshot_index will performed.
706+
pub last_automatic_snapshot_index: Option<OplogIndex>,
701707
}
702708

703709
impl Default for AgentStatusRecord {
@@ -721,7 +727,8 @@ impl Default for AgentStatusRecord {
721727
deleted_regions: DeletedRegions::new(),
722728
component_revision_for_replay: ComponentRevision::INITIAL,
723729
current_retry_count: HashMap::new(),
724-
last_snapshot_index: None,
730+
last_manual_snapshot_index: None,
731+
last_automatic_snapshot_index: None,
725732
}
726733
}
727734
}

golem-common/src/model/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ impl UntypedParsedWorkerCreationLocalAgentConfigEntry {
3030

3131
let value_type = component_metadata
3232
.find_agent_type_by_name(agent_type_name)
33-
.ok_or("did not find expected agent type in the metadata")?
33+
.ok_or_else(|| {
34+
format!("did not find expected agent type {agent_type_name} in the metadata")
35+
})?
3436
.config
3537
.into_iter()
3638
.find_map(|c| match c {

golem-worker-executor/src/durable_host/mod.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,13 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> DurableWorkerCtx<Ctx> {
11731173
OplogEntry::Snapshot {
11741174
data, mime_type, ..
11751175
} => (data, mime_type),
1176+
OplogEntry::PendingUpdate {
1177+
description:
1178+
UpdateDescription::SnapshotBased {
1179+
payload, mime_type, ..
1180+
},
1181+
..
1182+
} => (payload, mime_type),
11761183
_ => {
11771184
warn!(
11781185
"Expected Snapshot entry at oplog index {snapshot_index}, found different entry; falling back to full replay"
@@ -2325,11 +2332,25 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
23252332
UpdateDescription::Automatic {
23262333
target_revision, ..
23272334
} => {
2328-
// snapshot update will be succeeded as part of the replay.
2329-
let result = Self::resume_replay(store, instance, false).await;
2330-
record_resume_worker(start.elapsed());
2335+
let replay_result = async {
2336+
if let SnapshotRecoveryResult::Failed =
2337+
Self::try_load_snapshot(store, instance).await
2338+
{
2339+
return Err(WorkerExecutorError::failed_to_resume_worker(
2340+
agent_id.clone(),
2341+
WorkerExecutorError::runtime("loading snapshot failed"),
2342+
));
2343+
};
2344+
// automatic update will be succeeded as part of the replay.
2345+
let result = Self::resume_replay(store, instance, false).await?;
2346+
2347+
record_resume_worker(start.elapsed());
2348+
2349+
Ok(result)
2350+
}
2351+
.await;
23312352

2332-
match result {
2353+
match replay_result {
23332354
Err(error) => {
23342355
// replay failed. There are two cases here:
23352356
// 1. We failed before the update has succeeded. In this case we fail the update and retry the replay.
@@ -2367,18 +2388,13 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
23672388
_ => Err(error),
23682389
}
23692390
}
2370-
_ => result,
2391+
_ => replay_result,
23712392
}
23722393
}
23732394
}
23742395
}
23752396
None => match Self::try_load_snapshot(store, instance).await {
2376-
SnapshotRecoveryResult::Success => {
2377-
let result = Self::resume_replay(store, instance, false).await;
2378-
record_resume_worker(start.elapsed());
2379-
result
2380-
}
2381-
SnapshotRecoveryResult::NotAttempted => {
2397+
SnapshotRecoveryResult::Success | SnapshotRecoveryResult::NotAttempted => {
23822398
let result = Self::resume_replay(store, instance, false).await;
23832399
record_resume_worker(start.elapsed());
23842400
result

golem-worker-executor/src/worker/invocation_loop.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -327,22 +327,13 @@ impl<Ctx: WorkerCtx> InnerInvocationLoop<'_, Ctx> {
327327
/// first pending_updates, then pending_invocations
328328
async fn drain_pending_from_status(&mut self) -> CommandOutcome {
329329
loop {
330-
let status = self.parent.last_known_status.read().await.clone();
330+
let status = self.parent.get_non_detached_last_known_status().await;
331331

332332
// First, try to process a pending update
333-
if let Some(update) = status.pending_updates.front() {
334-
let target_revision = *update.description.target_revision();
335-
let mut store = self.store.lock().await;
336-
let mut invocation = Invocation {
337-
owned_agent_id: self.owned_agent_id.clone(),
338-
parent: self.parent.clone(),
339-
instance: self.instance,
340-
store: store.deref_mut(),
341-
};
342-
match invocation.manual_update(target_revision).await {
343-
CommandOutcome::Continue => continue,
344-
other => break other,
345-
}
333+
if status.pending_updates.front().is_some() {
334+
// if the update made it to pending_updates (instead of pending invocations), it is ready
335+
// to be processed on next restart. So just restart here and let the recovery logic take over
336+
break CommandOutcome::BreakInnerLoop(RetryDecision::Immediate);
346337
}
347338

348339
// Then, try to process a pending invocation

golem-worker-executor/src/worker/mod.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use golem_common::model::component::ComponentRevision;
4949
use golem_common::model::component::{ComponentFilePath, PluginPriority};
5050
use golem_common::model::invocation_context::InvocationContextStack;
5151
use golem_common::model::oplog::{OplogEntry, OplogIndex, UpdateDescription};
52-
use golem_common::model::regions::OplogRegion;
52+
use golem_common::model::regions::{DeletedRegionsBuilder, OplogRegion};
5353
use golem_common::model::worker::{RevertWorkerTarget, WorkerCreationLocalAgentConfigEntry};
5454
use golem_common::model::AgentStatus;
5555
use golem_common::model::RetryConfig;
@@ -2162,6 +2162,29 @@ impl RunningWorker {
21622162
.component_revision_for_replay,
21632163
);
21642164

2165+
let mut skipped_regions = worker_metadata.last_known_status.skipped_regions;
2166+
let mut last_snapshot_index = worker_metadata.last_known_status.last_manual_snapshot_index;
2167+
2168+
if let Some(snapshot_idx) = worker_metadata
2169+
.last_known_status
2170+
.last_automatic_snapshot_index
2171+
{
2172+
// automatic snapshots are only considered until the first failure
2173+
// if there are updates, ignore the automatic snapshot temporarily to catch issues earlier
2174+
if pending_update.is_none()
2175+
&& !parent.snapshot_recovery_disabled.load(Ordering::Acquire)
2176+
{
2177+
let snapshot_skip =
2178+
DeletedRegionsBuilder::from_regions(vec![OplogRegion::from_index_range(
2179+
OplogIndex::INITIAL.next()..=snapshot_idx,
2180+
)])
2181+
.build();
2182+
skipped_regions.set_override(snapshot_skip);
2183+
2184+
last_snapshot_index = Some(snapshot_idx);
2185+
}
2186+
}
2187+
21652188
let context = Ctx::create(
21662189
worker_metadata.created_by,
21672190
OwnedAgentId::new(worker_metadata.environment_id, &worker_metadata.agent_id),
@@ -2184,19 +2207,13 @@ impl RunningWorker {
21842207
parent.extra_deps(),
21852208
parent.config(),
21862209
AgentConfig::new(
2187-
worker_metadata.last_known_status.skipped_regions,
2210+
skipped_regions,
21882211
worker_metadata.last_known_status.total_linear_memory_size,
21892212
component_version_for_replay,
21902213
worker_metadata.created_by,
21912214
worker_metadata.config_vars,
21922215
worker_metadata.local_agent_config,
2193-
if pending_update.is_none()
2194-
&& !parent.snapshot_recovery_disabled.load(Ordering::Acquire)
2195-
{
2196-
worker_metadata.last_known_status.last_snapshot_index
2197-
} else {
2198-
None
2199-
},
2216+
last_snapshot_index,
22002217
),
22012218
parent.execution_status.clone(),
22022219
parent.file_loader(),

golem-worker-executor/src/worker/status.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,17 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(
153153
component_revision,
154154
component_size,
155155
component_revision_for_replay,
156+
last_manual_snapshot_index,
157+
last_automatic_snapshot_index,
156158
) = calculate_update_fields(
157159
last_known.pending_updates,
158160
last_known.failed_updates,
159161
last_known.successful_updates,
160162
last_known.component_revision,
161163
last_known.component_size,
162164
last_known.component_revision_for_replay,
165+
last_known.last_manual_snapshot_index,
166+
last_known.last_automatic_snapshot_index,
163167
&deleted_regions,
164168
&new_entries,
165169
);
@@ -182,12 +186,6 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(
182186

183187
let active_plugins = calculate_active_plugins(active_plugins, &deleted_regions, &new_entries);
184188

185-
let last_snapshot_index = calculate_last_snapshot_index(
186-
last_known.last_snapshot_index,
187-
&deleted_regions,
188-
&new_entries,
189-
);
190-
191189
let result = AgentStatusRecord {
192190
oplog_idx: new_entries
193191
.keys()
@@ -211,7 +209,8 @@ pub async fn update_status_with_new_entries<T: HasOplogService + Sync>(
211209
deleted_regions,
212210
component_revision_for_replay,
213211
current_retry_count,
214-
last_snapshot_index,
212+
last_manual_snapshot_index,
213+
last_automatic_snapshot_index,
215214
};
216215

217216
Some(result)
@@ -587,13 +586,16 @@ async fn calculate_pending_invocations<T: HasOplogService + Sync>(
587586
result
588587
}
589588

589+
#[allow(clippy::type_complexity)]
590590
fn calculate_update_fields(
591591
initial_pending_updates: VecDeque<TimestampedUpdateDescription>,
592592
initial_failed_updates: Vec<FailedUpdateRecord>,
593593
initial_successful_updates: Vec<SuccessfulUpdateRecord>,
594594
initial_revision: ComponentRevision,
595595
initial_component_size: u64,
596596
initial_component_revision_for_replay: ComponentRevision,
597+
initial_last_manual_snapshot_index: Option<OplogIndex>,
598+
initial_last_automatic_snapshot_index: Option<OplogIndex>,
597599
deleted_regions: &DeletedRegions,
598600
entries: &BTreeMap<OplogIndex, OplogEntry>,
599601
) -> (
@@ -603,13 +605,17 @@ fn calculate_update_fields(
603605
ComponentRevision,
604606
u64,
605607
ComponentRevision,
608+
Option<OplogIndex>,
609+
Option<OplogIndex>,
606610
) {
607611
let mut pending_updates = initial_pending_updates;
608612
let mut failed_updates = initial_failed_updates;
609613
let mut successful_updates = initial_successful_updates;
610614
let mut revision = initial_revision;
611615
let mut size = initial_component_size;
612616
let mut component_revision_for_replay = initial_component_revision_for_replay;
617+
let mut last_manual_snapshot_index = initial_last_manual_snapshot_index;
618+
let mut last_automatic_snapshot_index = initial_last_automatic_snapshot_index;
613619

614620
for (oplog_idx, entry) in entries {
615621
// Skipping entries in deleted regions (by revert)
@@ -663,17 +669,20 @@ fn calculate_update_fields(
663669
revision = *target_revision;
664670
size = *new_component_size;
665671

666-
let applied_update = pending_updates.pop_front();
667-
if matches!(
668-
applied_update,
669-
Some(TimestampedUpdateDescription {
670-
description: UpdateDescription::SnapshotBased { .. },
671-
..
672-
})
673-
) {
674-
component_revision_for_replay = *target_revision
672+
if let Some(TimestampedUpdateDescription {
673+
description: UpdateDescription::SnapshotBased { .. },
674+
oplog_index: applied_update_oplog_index,
675+
..
676+
}) = pending_updates.pop_front()
677+
{
678+
component_revision_for_replay = *target_revision;
679+
last_manual_snapshot_index = Some(applied_update_oplog_index);
680+
last_automatic_snapshot_index = None;
675681
}
676682
}
683+
OplogEntry::Snapshot { .. } => {
684+
last_automatic_snapshot_index = Some(*oplog_idx);
685+
}
677686
_ => {}
678687
}
679688
}
@@ -684,6 +693,8 @@ fn calculate_update_fields(
684693
revision,
685694
size,
686695
component_revision_for_replay,
696+
last_manual_snapshot_index,
697+
last_automatic_snapshot_index,
687698
)
688699
}
689700

@@ -830,31 +841,6 @@ fn calculate_active_plugins(
830841
result
831842
}
832843

833-
fn calculate_last_snapshot_index(
834-
initial: Option<OplogIndex>,
835-
deleted_regions: &DeletedRegions,
836-
entries: &BTreeMap<OplogIndex, OplogEntry>,
837-
) -> Option<OplogIndex> {
838-
let mut result = initial;
839-
840-
if let Some(idx) = result {
841-
if deleted_regions.is_in_deleted_region(idx) {
842-
result = None;
843-
}
844-
}
845-
846-
for (idx, entry) in entries {
847-
if deleted_regions.is_in_deleted_region(*idx) {
848-
continue;
849-
}
850-
851-
if matches!(entry, OplogEntry::Snapshot { .. }) {
852-
result = Some(*idx);
853-
}
854-
}
855-
result
856-
}
857-
858844
fn is_worker_error_retriable(
859845
retry_config: &RetryConfig,
860846
error: &AgentError,
@@ -1627,7 +1613,7 @@ mod test {
16271613
mime_type: "application/octet-stream".to_string(),
16281614
},
16291615
move |mut status| {
1630-
status.last_snapshot_index = Some(oplog_idx);
1616+
status.last_automatic_snapshot_index = Some(oplog_idx);
16311617
status
16321618
},
16331619
)
@@ -1681,7 +1667,8 @@ mod test {
16811667
status.failed_updates = old_status.failed_updates;
16821668
status.invocation_results = old_status.invocation_results;
16831669
status.component_revision_for_replay = old_status.component_revision_for_replay;
1684-
status.last_snapshot_index = old_status.last_snapshot_index;
1670+
status.last_manual_snapshot_index = old_status.last_manual_snapshot_index;
1671+
status.last_automatic_snapshot_index = old_status.last_automatic_snapshot_index;
16851672

16861673
status
16871674
})
@@ -1765,7 +1752,7 @@ mod test {
17651752
)
17661753
.rounded();
17671754
self.add(entry.clone(), move |mut status| {
1768-
let _ = status.pending_updates.pop_front();
1755+
let applied_update = status.pending_updates.pop_front();
17691756
status.successful_updates.push(SuccessfulUpdateRecord {
17701757
timestamp: entry.timestamp(),
17711758
target_revision: *update_description.target_revision(),
@@ -1785,6 +1772,8 @@ mod test {
17851772
} = update_description
17861773
{
17871774
status.component_revision_for_replay = target_revision;
1775+
status.last_manual_snapshot_index = applied_update.map(|au| au.oplog_index);
1776+
status.last_automatic_snapshot_index = None;
17881777
};
17891778

17901779
status

0 commit comments

Comments
 (0)