Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions golem-test-framework/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,29 @@ pub trait TestDsl {
))
}

#[tracing::instrument(level = "info", skip(self, timeout), fields(%worker_id))]
async fn wait_for_component_revision(
&self,
worker_id: &WorkerId,
target_revision: ComponentRevision,
timeout: Duration,
) -> anyhow::Result<WorkerMetadataDto> {
let start = Instant::now();
while start.elapsed() < timeout {
let metadata = self.get_worker_metadata(worker_id).await?;

if metadata.component_revision >= target_revision {
return Ok(metadata);
}

tokio::time::sleep(Duration::from_millis(50)).await;
}

Err(anyhow!(
"Timeout waiting for worker {worker_id} to reach component revision {target_revision}"
))
}

async fn cancel_invocation(
&self,
worker_id: &WorkerId,
Expand Down
9 changes: 2 additions & 7 deletions golem-worker-executor/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
.await?;

info!("Interrupting worker before deletion");
if let Some(mut rx) = worker
worker
.set_interrupting(InterruptKind::Interrupt(Timestamp::now_utc()))
.await
{
info!("Awaiting interruption");
let _ = rx.recv().await;
info!("Interrupted");
}
.await;
info!("Marking worker for deletion");
worker.start_deleting().await?;

Expand Down
8 changes: 8 additions & 0 deletions golem-worker-executor/tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,14 @@ async fn component_env_variables_update(
.auto_update_worker(&worker_id, updated_component.revision, false)
.await?;

executor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative to this there is await_ready_to_process_commands in the worker (not exposed via api currently), which will wait until replay / update is finished

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User-facing APIs like invocation supposed to work during those phases too (recovery, update) so this would only be useful for tests like this; on the other hand this kind of waiting based on worker status is what the CLI is also doing if you want to await an update. So I'd keep it like this

.wait_for_component_revision(
&worker_id,
updated_component.revision,
Duration::from_secs(30),
)
.await?;

let env = executor
.invoke_and_await_agent(&component, &agent_id, "get_environment", data_value!())
.await?
Expand Down
Loading