Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.

-- Add a new status for scheduled jobs
ALTER TYPE "queue_job_status" ADD VALUE 'scheduled';

ALTER TABLE "queue_jobs"
-- When the job is scheduled to run
ADD COLUMN "scheduled_at" TIMESTAMP WITH TIME ZONE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.

-- Add a partial index on scheduled jobs
CREATE INDEX "queue_jobs_scheduled_at_idx"
ON "queue_jobs" ("scheduled_at")
WHERE "status" = 'scheduled';
82 changes: 79 additions & 3 deletions crates/storage-pg/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! [`QueueJobRepository`].
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
Expand Down Expand Up @@ -117,6 +118,50 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
Ok(())
}

#[tracing::instrument(
name = "db.queue_job.schedule_later",
fields(
queue_job.id,
queue_job.queue_name = queue_name,
queue_job.scheduled_at = %scheduled_at,
db.query.text,
),
skip_all,
err,
)]
async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record("queue_job.id", tracing::field::display(id));

sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)
VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')
"#,
Uuid::from(id),
queue_name,
payload,
metadata,
created_at,
scheduled_at,
)
.traced()
.execute(&mut *self.conn)
.await?;

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.reserve",
skip_all,
Expand Down Expand Up @@ -264,8 +309,10 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error> {
let now = clock.now();
let scheduled_at = now + delay;
let new_id = Ulid::from_datetime_with_source(now.into(), rng);

// Create a new job with the same payload and metadata, but a new ID and
Expand All @@ -274,14 +321,15 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
let res = sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, attempt)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1
(queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'
FROM queue_jobs
WHERE queue_job_id = $3
WHERE queue_job_id = $4
AND status = 'failed'
"#,
Uuid::from(new_id),
now,
scheduled_at,
Uuid::from(id),
)
.traced()
Expand All @@ -308,4 +356,32 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.schedule_available_jobs",
skip_all,
fields(
db.query.text,
),
err
)]
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET status = 'available'
WHERE
status = 'scheduled'
AND scheduled_at <= $1
"#,
now,
)
.traced()
.execute(&mut *self.conn)
.await?;

let count = res.rows_affected();
Ok(usize::try_from(count).unwrap_or(usize::MAX))
}
}
97 changes: 97 additions & 0 deletions crates/storage/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! Repository to interact with jobs in the job queue
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use opentelemetry::trace::TraceContextExt;
use rand_core::RngCore;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -105,6 +106,30 @@ pub trait QueueJobRepository: Send + Sync {
metadata: serde_json::Value,
) -> Result<(), Self::Error>;

/// Schedule a job to be executed at a later date by a worker.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `queue_name` - The name of the queue to schedule the job on
/// * `payload` - The payload of the job
/// * `metadata` - Arbitrary metadata about the job scheduled immediately.
/// * `scheduled_at` - The date and time to schedule the job for
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;

/// Reserve multiple jobs from multiple queues
///
/// # Parameters
Expand Down Expand Up @@ -171,7 +196,18 @@ pub trait QueueJobRepository: Send + Sync {
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error>;

/// Mark all scheduled jobs past their scheduled date as available to be
/// executed.
///
/// Returns the number of jobs that were marked as available.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
}

repository_impl!(QueueJobRepository:
Expand All @@ -184,6 +220,16 @@ repository_impl!(QueueJobRepository:
metadata: serde_json::Value,
) -> Result<(), Self::Error>;

async fn schedule_later(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
queue_name: &str,
payload: serde_json::Value,
metadata: serde_json::Value,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;

async fn reserve(
&mut self,
clock: &dyn Clock,
Expand All @@ -205,7 +251,10 @@ repository_impl!(QueueJobRepository:
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
delay: Duration,
) -> Result<(), Self::Error>;

async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
);

/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
Expand All @@ -230,6 +279,26 @@ pub trait QueueJobRepositoryExt: QueueJobRepository {
clock: &dyn Clock,
job: J,
) -> Result<(), Self::Error>;

/// Schedule a job to be executed at a later date by a worker.
///
/// # Parameters
///
/// * `rng` - The random number generator used to generate a new job ID
/// * `clock` - The clock used to generate timestamps
/// * `job` - The job to schedule
/// * `scheduled_at` - The date and time to schedule the job for
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn schedule_job_later<J: InsertableJob>(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
job: J,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error>;
}

#[async_trait]
Expand Down Expand Up @@ -263,4 +332,32 @@ where
self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
.await
}

#[tracing::instrument(
name = "db.queue_job.schedule_job_later",
fields(
queue_job.queue_name = J::QUEUE_NAME,
),
skip_all,
)]
async fn schedule_job_later<J: InsertableJob>(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
job: J,
scheduled_at: DateTime<Utc>,
) -> Result<(), Self::Error> {
// Grab the span context from the current span
let span = tracing::Span::current();
let ctx = span.context();
let span = ctx.span();
let span_context = span.span_context();

let metadata = JobMetadata::new(span_context);
let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");

let payload = serde_json::to_value(job).expect("Could not serialize job");
self.schedule_later(rng, clock, J::QUEUE_NAME, payload, metadata, scheduled_at)
.await
}
}
Loading