Skip to content
348 changes: 177 additions & 171 deletions golem-worker-executor-base/src/durable_host/mod.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions golem-worker-executor-base/src/services/oplog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,12 @@ impl OpenOplogEntry {
}

#[derive(Clone)]
struct OpenOplogs {
pub struct OpenOplogs {
oplogs: Cache<WorkerId, (), OpenOplogEntry, ()>,
}

impl OpenOplogs {
fn new(name: &'static str) -> Self {
pub fn new(name: &'static str) -> Self {
Self {
oplogs: Cache::new(
None,
Expand All @@ -383,7 +383,7 @@ impl OpenOplogs {
}
}

async fn get_or_open(
pub async fn get_or_open(
&self,
worker_id: &WorkerId,
constructor: impl OplogConstructor + Send + 'static,
Expand Down Expand Up @@ -446,7 +446,7 @@ impl Debug for OpenOplogs {
}

#[async_trait]
trait OplogConstructor: Clone {
pub trait OplogConstructor: Clone {
async fn create_oplog(
self,
close: Box<dyn FnOnce() + Send + Sync>,
Expand Down
26 changes: 26 additions & 0 deletions golem-worker-executor-base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,24 @@ impl<Ctx: WorkerCtx> Worker<Ctx> {
}
}

pub async fn resume_replay(&self) -> Result<(), GolemError> {
match &*self.instance.lock().await {
WorkerInstance::Running(running) => {
running
.sender
.send(WorkerCommand::ResumeReplay)
.map_err(|_| GolemError::unknown("Failed to resume command"))
//Probably just `expect(..)` would be better
}
WorkerInstance::Unloaded | WorkerInstance::WaitingForPermit(_) => {
debug!("Worker is initializing, persisting pending invocation");
Err(GolemError::invalid_request(
"Explicit resume is not supported for uninitialized workers",
))
}
}
}

pub async fn invoke(
&self,
idempotency_key: IdempotencyKey,
Expand Down Expand Up @@ -1485,6 +1503,13 @@ impl RunningWorker {
while let Some(cmd) = receiver.recv().await {
waiting_for_command.store(false, Ordering::Release);
match cmd {
WorkerCommand::ResumeReplay => {
let mut store = store.lock().await;

Ctx::resume_replay(&mut *store, &instance)
.await
.expect("resume_replay failed");
}
WorkerCommand::Invocation => {
let message = active
.write()
Expand Down Expand Up @@ -1989,6 +2014,7 @@ pub enum RetryDecision {
#[derive(Debug)]
enum WorkerCommand {
Invocation,
ResumeReplay,
Interrupt(InterruptKind),
}

Expand Down
7 changes: 6 additions & 1 deletion golem-worker-executor-base/src/workerctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use golem_common::model::{
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::wasmtime::ResourceStore;
use golem_wasm_rpc::Value;
use wasmtime::component::{Component, Linker};
use wasmtime::component::{Component, Instance, Linker};
use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync};
use wasmtime_wasi::WasiView;
use wasmtime_wasi_http::WasiHttpView;
Expand Down Expand Up @@ -366,6 +366,11 @@ pub trait ExternalOperations<Ctx: WorkerCtx> {
metadata: &Option<WorkerMetadata>,
) -> Result<WorkerStatusRecord, GolemError>;

async fn resume_replay(
store: &mut (impl AsContextMut<Data = Ctx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError>;

/// Prepares a wasmtime instance after it has been created, but before it can be invoked.
/// This can be used to restore the previous state of the worker but by general it can be no-op.
///
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ impl ExternalOperations<TestWorkerCtx> for TestWorkerCtx {
.await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = TestWorkerCtx> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<TestWorkerCtx>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down
7 changes: 7 additions & 0 deletions golem-worker-executor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ impl ExternalOperations<Context> for Context {
DurableWorkerCtx::<Context>::compute_latest_worker_status(this, worker_id, metadata).await
}

async fn resume_replay(
store: &mut (impl AsContextMut<Data = Context> + Send),
instance: &Instance,
) -> Result<RetryDecision, GolemError> {
DurableWorkerCtx::<Context>::resume_replay(store, instance).await
}

async fn prepare_instance(
worker_id: &WorkerId,
instance: &Instance,
Expand Down
Loading