Skip to content

Commit 823174c

Browse files
committed
Make the worker heartbeat take a worker reference
1 parent c92692b commit 823174c

File tree

3 files changed

+7
-14
lines changed

3 files changed

+7
-14
lines changed

crates/storage-pg/src/queue/worker.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,7 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
7979
),
8080
err,
8181
)]
82-
async fn heartbeat(
83-
&mut self,
84-
clock: &dyn Clock,
85-
worker: Worker,
86-
) -> Result<Worker, Self::Error> {
82+
async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> {
8783
let now = clock.now();
8884
let res = sqlx::query!(
8985
r#"
@@ -101,7 +97,7 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
10197
// If no row was updated, the worker was shutdown so we return an error
10298
DatabaseError::ensure_affected_rows(&res, 1)?;
10399

104-
Ok(worker)
100+
Ok(())
105101
}
106102

107103
#[tracing::instrument(

crates/storage/src/queue/worker.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,11 @@ pub trait QueueWorkerRepository: Send + Sync {
4040

4141
/// Send a heartbeat for the given worker.
4242
///
43-
/// Returns the updated worker.
44-
///
4543
/// # Errors
4644
///
4745
/// Returns an error if the underlying repository fails or if the worker was
4846
/// shutdown.
49-
async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker)
50-
-> Result<Worker, Self::Error>;
47+
async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
5148

5249
/// Mark the given worker as shutdown.
5350
///
@@ -102,8 +99,8 @@ repository_impl!(QueueWorkerRepository:
10299
async fn heartbeat(
103100
&mut self,
104101
clock: &dyn Clock,
105-
worker: Worker,
106-
) -> Result<Worker, Self::Error>;
102+
worker: &Worker,
103+
) -> Result<(), Self::Error>;
107104

108105
async fn shutdown(
109106
&mut self,

crates/tasks/src/new_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> {
1515
let mut rng = state.rng();
1616
let clock = state.clock();
1717

18-
let mut worker = repo.queue_worker().register(&mut rng, &clock).await?;
18+
let worker = repo.queue_worker().register(&mut rng, &clock).await?;
1919
span.record("worker.id", tracing::field::display(worker.id));
2020
repo.save().await?;
2121

@@ -44,7 +44,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> {
4444
// on a logged table
4545
if now - last_heartbeat >= chrono::Duration::minutes(1) {
4646
tracing::info!("Sending heartbeat");
47-
worker = repo.queue_worker().heartbeat(&clock, worker).await?;
47+
repo.queue_worker().heartbeat(&clock, &worker).await?;
4848
last_heartbeat = now;
4949
}
5050

0 commit comments

Comments
 (0)