diff --git a/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json b/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json new file mode 100644 index 000000000..c65354f92 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'\n FROM queue_jobs\n WHERE queue_job_id = $4\n AND status = 'failed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "3355b3b5729d8240297a5ac8111ce891e626a82dcb78ff85f2b815d9329ff936" +} diff --git a/crates/storage-pg/.sqlx/query-3c7960a2eb2edd71bc71177fc0fb2e83858c9944893b8f3a0f0131e8a9b7a494.json b/crates/storage-pg/.sqlx/query-3c7960a2eb2edd71bc71177fc0fb2e83858c9944893b8f3a0f0131e8a9b7a494.json new file mode 100644 index 000000000..a45aacc7a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-3c7960a2eb2edd71bc71177fc0fb2e83858c9944893b8f3a0f0131e8a9b7a494.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_jobs\n SET status = 'available'\n WHERE\n status = 'scheduled'\n AND scheduled_at <= $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "3c7960a2eb2edd71bc71177fc0fb2e83858c9944893b8f3a0f0131e8a9b7a494" +} diff --git a/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json b/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json deleted file mode 100644 index 2962db553..000000000 --- a/crates/storage-pg/.sqlx/query-47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, attempt)\n SELECT $1, queue_name, payload, metadata, $2, attempt + 1\n FROM queue_jobs\n WHERE queue_job_id = $3\n AND status = 'failed'\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "47e74a8fc614653ffaa60930fafa8e4d1682263079ec09c38a20c059580adb38" -} diff --git a/crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json b/crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json new file mode 100644 index 000000000..f87d2dff4 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)\n VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Jsonb", + "Jsonb", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "d6c4cc9b04086f1b6ffad30d8a859e9fc0bf8a1fe9002dc3854ae28e65fc7943" +} diff --git a/crates/storage-pg/migrations/20241122130349_queue_job_scheduled.sql b/crates/storage-pg/migrations/20241122130349_queue_job_scheduled.sql new file mode 100644 index 000000000..e7aff6a04 --- /dev/null +++ b/crates/storage-pg/migrations/20241122130349_queue_job_scheduled.sql @@ -0,0 +1,11 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Add a new status for scheduled jobs +ALTER TYPE "queue_job_status" ADD VALUE 'scheduled'; + +ALTER TABLE "queue_jobs" + -- When the job is scheduled to run + ADD COLUMN "scheduled_at" TIMESTAMP WITH TIME ZONE; diff --git a/crates/storage-pg/migrations/20241122133435_queue_job_scheduled_index.sql b/crates/storage-pg/migrations/20241122133435_queue_job_scheduled_index.sql new file mode 100644 index 000000000..f8a7422e2 --- /dev/null +++ b/crates/storage-pg/migrations/20241122133435_queue_job_scheduled_index.sql @@ -0,0 +1,9 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Add a partial index on scheduled jobs +CREATE INDEX "queue_jobs_scheduled_at_idx" + ON "queue_jobs" ("scheduled_at") + WHERE "status" = 'scheduled'; diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs index 769a9eb49..e6be988c9 100644 --- a/crates/storage-pg/src/queue/job.rs +++ b/crates/storage-pg/src/queue/job.rs @@ -7,6 +7,7 @@ //! [`QueueJobRepository`]. use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; use mas_storage::{ queue::{Job, QueueJobRepository, Worker}, Clock, @@ -117,6 +118,50 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { Ok(()) } + #[tracing::instrument( + name = "db.queue_job.schedule_later", + fields( + queue_job.id, + queue_job.queue_name = queue_name, + queue_job.scheduled_at = %scheduled_at, + db.query.text, + ), + skip_all, + err, + )] + async fn schedule_later( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + scheduled_at: DateTime, + ) -> Result<(), Self::Error> { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("queue_job.id", tracing::field::display(id)); + + sqlx::query!( + r#" + INSERT INTO queue_jobs + (queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status) + VALUES ($1, $2, $3, $4, $5, $6, 'scheduled') + "#, + Uuid::from(id), + queue_name, + payload, + metadata, + created_at, + scheduled_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + #[tracing::instrument( name = "db.queue_job.reserve", skip_all, @@ -264,8 +309,10 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { rng: &mut (dyn RngCore + Send), clock: &dyn Clock, id: Ulid, + delay: Duration, ) -> Result<(), Self::Error> { let now = clock.now(); + let scheduled_at = now + delay; let new_id = Ulid::from_datetime_with_source(now.into(), rng); // Create a new job with the same payload and metadata, but a new ID and @@ -274,14 +321,15 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { let res = sqlx::query!( r#" INSERT INTO queue_jobs - (queue_job_id, queue_name, payload, metadata, created_at, attempt) - SELECT $1, queue_name, payload, metadata, $2, attempt + 1 + (queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status) + SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled' FROM queue_jobs - WHERE queue_job_id = $3 + WHERE queue_job_id = $4 AND status = 'failed' "#, Uuid::from(new_id), now, + scheduled_at, Uuid::from(id), ) .traced() @@ -308,4 +356,32 @@ impl QueueJobRepository for PgQueueJobRepository<'_> { Ok(()) } + + #[tracing::instrument( + name = "db.queue_job.schedule_available_jobs", + skip_all, + fields( + db.query.text, + ), + err + )] + async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_jobs + SET status = 'available' + WHERE + status = 'scheduled' + AND scheduled_at <= $1 + "#, + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + let count = res.rows_affected(); + Ok(usize::try_from(count).unwrap_or(usize::MAX)) + } } diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs index 9a24fa649..5bbc5f75b 100644 --- a/crates/storage/src/queue/job.rs +++ b/crates/storage/src/queue/job.rs @@ -6,6 +6,7 @@ //! Repository to interact with jobs in the job queue use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; use opentelemetry::trace::TraceContextExt; use rand_core::RngCore; use serde::{Deserialize, Serialize}; @@ -105,6 +106,30 @@ pub trait QueueJobRepository: Send + Sync { metadata: serde_json::Value, ) -> Result<(), Self::Error>; + /// Schedule a job to be executed at a later date by a worker. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `queue_name` - The name of the queue to schedule the job on + /// * `payload` - The payload of the job + /// * `metadata` - Arbitrary metadata about the job scheduled immediately. + /// * `scheduled_at` - The date and time to schedule the job for + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule_later( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + scheduled_at: DateTime, + ) -> Result<(), Self::Error>; + /// Reserve multiple jobs from multiple queues /// /// # Parameters @@ -171,7 +196,18 @@ pub trait QueueJobRepository: Send + Sync { rng: &mut (dyn RngCore + Send), clock: &dyn Clock, id: Ulid, + delay: Duration, ) -> Result<(), Self::Error>; + + /// Mark all scheduled jobs past their scheduled date as available to be + /// executed. + /// + /// Returns the number of jobs that were marked as available. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; } repository_impl!(QueueJobRepository: @@ -184,6 +220,16 @@ repository_impl!(QueueJobRepository: metadata: serde_json::Value, ) -> Result<(), Self::Error>; + async fn schedule_later( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + scheduled_at: DateTime, + ) -> Result<(), Self::Error>; + async fn reserve( &mut self, clock: &dyn Clock, @@ -205,7 +251,10 @@ repository_impl!(QueueJobRepository: rng: &mut (dyn RngCore + Send), clock: &dyn Clock, id: Ulid, + delay: Duration, ) -> Result<(), Self::Error>; + + async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result; ); /// Extension trait for [`QueueJobRepository`] to help adding a job to the queue @@ -230,6 +279,26 @@ pub trait QueueJobRepositoryExt: QueueJobRepository { clock: &dyn Clock, job: J, ) -> Result<(), Self::Error>; + + /// Schedule a job to be executed at a later date by a worker. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `job` - The job to schedule + /// * `scheduled_at` - The date and time to schedule the job for + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule_job_later( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + job: J, + scheduled_at: DateTime, + ) -> Result<(), Self::Error>; } #[async_trait] @@ -263,4 +332,32 @@ where self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata) .await } + + #[tracing::instrument( + name = "db.queue_job.schedule_job_later", + fields( + queue_job.queue_name = J::QUEUE_NAME, + ), + skip_all, + )] + async fn schedule_job_later( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + job: J, + scheduled_at: DateTime, + ) -> Result<(), Self::Error> { + // Grab the span context from the current span + let span = tracing::Span::current(); + let ctx = span.context(); + let span = ctx.span(); + let span_context = span.span_context(); + + let metadata = JobMetadata::new(span_context); + let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata"); + + let payload = serde_json::to_value(job).expect("Could not serialize job"); + self.schedule_later(rng, clock, J::QUEUE_NAME, payload, metadata, scheduled_at) + .await + } } diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index ce8504116..fb2fa7151 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -160,6 +160,14 @@ const MAX_JOBS_TO_FETCH: usize = 5; // How many attempts a job should be retried const MAX_ATTEMPTS: usize = 5; +/// Returns the delay to wait before retrying a job +/// +/// Uses an exponential backoff: 1s, 2s, 4s, 8s, 16s +fn retry_delay(attempt: usize) -> Duration { + let attempt = u32::try_from(attempt).unwrap_or(u32::MAX); + Duration::milliseconds(2_i64.saturating_pow(attempt) * 1000) +} + type JobResult = Result<(), JobError>; type JobFactory = Arc Box + Send + Sync>; @@ -516,6 +524,17 @@ impl QueueWorker { // TODO: mark tasks those workers had as lost + // Mark all the scheduled jobs as available + let scheduled = repo + .queue_job() + .schedule_available_jobs(&self.clock) + .await?; + match scheduled { + 0 => {} + 1 => tracing::warn!("One scheduled job marked as available"), + n => tracing::warn!("{n} scheduled jobs marked as available"), + } + // Release the leader lock let txn = repo .into_inner() @@ -669,17 +688,19 @@ impl JobTracker { JobErrorDecision::Retry => { if context.attempt < MAX_ATTEMPTS { + let delay = retry_delay(context.attempt); tracing::warn!( error = &e as &dyn std::error::Error, job.id = %context.id, job.queue_name = %context.queue_name, job.attempt = %context.attempt, - "Job failed, will retry" + "Job failed, will retry in {}s", + delay.num_seconds() ); - // TODO: retry with an exponential backoff, once we know how to - // schedule jobs in the future - repo.queue_job().retry(&mut *rng, clock, context.id).await?; + repo.queue_job() + .retry(&mut *rng, clock, context.id, delay) + .await?; } else { tracing::error!( error = &e as &dyn std::error::Error, @@ -707,15 +728,19 @@ impl JobTracker { .await?; if context.attempt < MAX_ATTEMPTS { + let delay = retry_delay(context.attempt); tracing::warn!( error = &e as &dyn std::error::Error, job.id = %context.id, job.queue_name = %context.queue_name, job.attempt = %context.attempt, - "Job crashed, will retry" + "Job crashed, will retry in {}s", + delay.num_seconds() ); - repo.queue_job().retry(&mut *rng, clock, context.id).await?; + repo.queue_job() + .retry(&mut *rng, clock, context.id, delay) + .await?; } else { tracing::error!( error = &e as &dyn std::error::Error,