Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions examples/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use testcontainers_modules::postgres::Postgres;
use tracing::{debug, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use workers::{
ArchivalPolicy, ArchiveQuery, BackgroundJob, Runner, archived_job_count, get_archived_jobs,
ArchivalPolicy, ArchiveQuery, BackgroundJob, Queue, Runner, archived_job_count,
get_archived_jobs,
};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -127,14 +128,14 @@ async fn main() -> Result<()> {

// Create runner with archiving enabled for important jobs
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::new("notifications_payments")
.register::<NotificationJob>()
.register::<PaymentJob>()
.num_workers(2)
.poll_interval(Duration::from_millis(100))
.archive(ArchivalPolicy::Always) // Enable archiving for audit trail
})
.archive(ArchivalPolicy::Always), // Enable archiving for audit trail
)
.shutdown_when_queue_empty();

// Enqueue some notification jobs
Expand Down
10 changes: 5 additions & 5 deletions examples/archive_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use testcontainers_modules::postgres::Postgres;
use tracing::info;
use workers::{
ArchivalPolicy, ArchiveCleanerBuilder, BackgroundJob, CleanupConfiguration, CleanupPolicy,
Runner, archived_job_count,
Queue, Runner, archived_job_count,
};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -76,11 +76,11 @@ async fn main() -> Result<()> {
let (pool, _container) = setup_database().await?;

let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::new("spline_reticulator")
.register::<ReticulateSplineJob>()
.archive(ArchivalPolicy::Always)
})
.archive(ArchivalPolicy::Always),
)
.shutdown_when_queue_empty();

ArchiveCleanerBuilder::new()
Expand Down
55 changes: 39 additions & 16 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::worker::Worker;
use crate::{BackgroundJob, schema};
use futures_util::future::join_all;
use sqlx::PgPool;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -25,7 +24,7 @@ pub struct Unconfigured;
/// The core runner responsible for locking and running jobs
pub struct Runner<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
connection_pool: PgPool,
queues: HashMap<String, Queue<Context, Configured>>,
queues: Vec<Queue<Context, Configured>>,
context: Context,
shutdown_when_queue_empty: bool,
_state: PhantomData<State>,
Expand All @@ -36,7 +35,15 @@ impl<Context: std::fmt::Debug + Clone + Sync + Send, State: std::fmt::Debug> std
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Runner")
.field("queues", &self.queues.keys().collect::<Vec<_>>())
.field(
"queues",
&self
.queues
.iter()
.enumerate()
.map(|(qid, q)| q.name.clone().unwrap_or_else(|| qid.to_string()))
.collect::<Vec<_>>(),
)
.field("context", &self.context)
.field("shutdown_when_queue_empty", &self.shutdown_when_queue_empty)
.finish()
Expand All @@ -48,7 +55,7 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
pub fn new(connection_pool: PgPool, context: Context) -> Self {
Self {
connection_pool,
queues: HashMap::new(),
queues: Vec::new(),
context,
shutdown_when_queue_empty: false,
_state: PhantomData,
Expand All @@ -57,15 +64,9 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
}

impl<Context: Clone + Send + Sync + 'static, State> Runner<Context, State> {
/// Configure a queue
pub fn configure_queue(
mut self,
queue_name: &str,
config_fn: impl FnOnce(Queue<Context>) -> Queue<Context, Configured>,
) -> Runner<Context, Configured> {
self.queues
.insert(queue_name.into(), config_fn(Queue::default()));

/// TODO: Tentative API - replace `configure_queue` if accepted
pub fn add_queue(mut self, queue: Queue<Context, Configured>) -> Runner<Context, Configured> {
self.queues.push(queue);
Runner {
connection_pool: self.connection_pool,
queues: self.queues,
Expand All @@ -88,9 +89,15 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context, Configured> {
/// This returns a `RunningRunner` which can be used to wait for the workers to shutdown.
pub fn start(&self) -> RunHandle {
let mut handles = Vec::new();
for (queue_name, queue) in &self.queues {
for i in 1..=queue.num_workers {
let name = format!("background-worker-{queue_name}-{i}");
for (queue_index, queue) in self.queues.iter().enumerate() {
for i in 0..queue.num_workers {
let name = format!(
"queue-{queue_name}-worker-{i}",
queue_name = queue
.name
.clone()
.unwrap_or_else(|| queue_index.to_string()),
);
info!(worker.name = %name, "Starting worker…");

let worker = Worker {
Expand Down Expand Up @@ -156,6 +163,8 @@ impl<Context> std::fmt::Debug for ArchivalPolicy<Context> {
/// Configuration and state for a job queue
#[derive(Debug)]
pub struct Queue<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
/// Queue name can be set for clearer log output
pub name: Option<String>,
job_registry: JobRegistry<Context>,
num_workers: usize,
poll_interval: Duration,
Expand All @@ -167,6 +176,7 @@ pub struct Queue<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
impl<Context: Clone + Send + Sync + 'static> Default for Queue<Context, Unconfigured> {
fn default() -> Self {
Self {
name: None,
job_registry: JobRegistry::default(),
num_workers: 1,
poll_interval: DEFAULT_POLL_INTERVAL,
Expand All @@ -176,6 +186,18 @@ impl<Context: Clone + Send + Sync + 'static> Default for Queue<Context, Unconfig
}
}
}
impl<Context: Clone + Send + Sync + 'static> Queue<Context> {
/// Make a new, named queue
/// The name is used only in logging.
///
/// Use `Queue::default()` if you don't need a name.
pub fn new(name: &str) -> Self {
Self {
name: Some(name.to_string()),
..Default::default()
}
}
}

impl<Context: Clone + Send + Sync + 'static, State> Queue<Context, State> {
/// Set the number of worker threads for this queue.
Expand Down Expand Up @@ -210,6 +232,7 @@ impl<Context: Clone + Send + Sync + 'static, State> Queue<Context, State> {
pub fn register<J: BackgroundJob<Context = Context>>(mut self) -> Queue<Context, Configured> {
self.job_registry.register::<J>();
Queue {
name: self.name,
job_registry: self.job_registry,
num_workers: self.num_workers,
poll_interval: self.poll_interval,
Expand Down
84 changes: 55 additions & 29 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>);
.add_queue(Queue::default().register::<TestJob>());

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -204,7 +204,7 @@ async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ())
.configure_queue("default", Queue::register::<TestJob>);
.add_queue(Queue::default().register::<TestJob>());

assert_eq!(remaining_jobs(&pool).await?, 0);

Expand Down Expand Up @@ -245,7 +245,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow:
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>);
.add_queue(Queue::default().register::<TestJob>());

TestJob.enqueue(&pool).await?;

Expand Down Expand Up @@ -291,7 +291,7 @@ async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ())
.configure_queue("default", Queue::register::<TestJob>);
.add_queue(Queue::default().register::<TestJob>());

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -354,7 +354,7 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = Runner::new(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>)
.add_queue(Queue::default().register::<TestJob>())
.shutdown_when_queue_empty();

// Enqueue first job
Expand Down Expand Up @@ -411,13 +411,13 @@ async fn jitter_configuration_affects_polling() -> anyhow::Result<()> {

// Test that jitter configuration is accepted and compiles
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.num_workers(1)
.poll_interval(Duration::from_millis(100))
.jitter(Duration::from_millis(50)) // Add up to 50ms jitter
})
.jitter(Duration::from_millis(50)), // Add up to 50ms jitter
)
.shutdown_when_queue_empty();

// No jobs in queue, so the worker will immediately shut down
Expand Down Expand Up @@ -456,12 +456,12 @@ async fn archive_functionality_works() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.num_workers(1)
.archive(ArchivalPolicy::Always) // Enable archiving
})
.archive(ArchivalPolicy::Always), // Enable archiving
)
.shutdown_when_queue_empty();

// Enqueue a test job
Expand Down Expand Up @@ -738,12 +738,12 @@ async fn archive_cleaner_removes_old_jobs() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.num_workers(1)
.archive(ArchivalPolicy::Always)
})
.archive(ArchivalPolicy::Always),
)
.shutdown_when_queue_empty();

// Enqueue and process a test job to create an archived job
Expand Down Expand Up @@ -794,12 +794,12 @@ async fn archive_cleaner_keeps_last_n_jobs() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.num_workers(1)
.archive(ArchivalPolicy::Always)
})
.archive(ArchivalPolicy::Always),
)
.shutdown_when_queue_empty();

// Enqueue and process multiple test jobs to create archived jobs
Expand Down Expand Up @@ -855,12 +855,12 @@ async fn archive_cleaner_keeps_last_n_jobs_discards_old() -> anyhow::Result<()>

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.num_workers(1)
.archive(ArchivalPolicy::Always)
})
.archive(ArchivalPolicy::Always),
)
.shutdown_when_queue_empty();

// Enqueue and process multiple test jobs to create archived jobs
Expand Down Expand Up @@ -914,14 +914,14 @@ async fn archive_conditionally() -> anyhow::Result<()> {

// Configure runner with predicate-based archiving
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
queue
.add_queue(
Queue::default()
.register::<TestJob>()
.archive(ArchivalPolicy::If(|job, _ctx| {
// Archive only even-numbered jobs, for a 50% sample
job.id % 2 == 0
}))
})
})),
)
.shutdown_when_queue_empty();

// Enqueue multiple test jobs
Expand All @@ -941,3 +941,29 @@ async fn archive_conditionally() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
async fn show_potential_api() -> anyhow::Result<()> {
#[derive(Serialize, Deserialize)]
struct SomeJob;

impl BackgroundJob for SomeJob {
const JOB_TYPE: &'static str = "test_archive_conditionally";
type Context = ();

async fn run(&self, _ctx: Self::Context) -> anyhow::Result<()> {
Ok(())
}
}

let (pool, _container) = test_utils::setup_test_db().await?;

let _runner = Runner::new(pool, ())
.add_queue(
Queue::default()
.register::<SomeJob>()
.archive(ArchivalPolicy::If(|job, _ctx| job.id % 2 == 0)),
)
.shutdown_when_queue_empty();
Ok(())
}