diff --git a/examples/archive.rs b/examples/archive.rs index 5e11014..08975d5 100644 --- a/examples/archive.rs +++ b/examples/archive.rs @@ -20,7 +20,8 @@ use testcontainers_modules::postgres::Postgres; use tracing::{debug, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use workers::{ - ArchivalPolicy, ArchiveQuery, BackgroundJob, Runner, archived_job_count, get_archived_jobs, + ArchivalPolicy, ArchiveQuery, BackgroundJob, Queue, Runner, archived_job_count, + get_archived_jobs, }; #[derive(Debug, Serialize, Deserialize)] @@ -127,14 +128,14 @@ async fn main() -> Result<()> { // Create runner with archiving enabled for important jobs let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::new("notifications_payments") .register::() .register::() .num_workers(2) .poll_interval(Duration::from_millis(100)) - .archive(ArchivalPolicy::Always) // Enable archiving for audit trail - }) + .archive(ArchivalPolicy::Always), // Enable archiving for audit trail + ) .shutdown_when_queue_empty(); // Enqueue some notification jobs diff --git a/examples/archive_cleanup.rs b/examples/archive_cleanup.rs index 4566baa..c519f1f 100644 --- a/examples/archive_cleanup.rs +++ b/examples/archive_cleanup.rs @@ -22,7 +22,7 @@ use testcontainers_modules::postgres::Postgres; use tracing::info; use workers::{ ArchivalPolicy, ArchiveCleanerBuilder, BackgroundJob, CleanupConfiguration, CleanupPolicy, - Runner, archived_job_count, + Queue, Runner, archived_job_count, }; #[derive(Debug, Serialize, Deserialize)] @@ -76,11 +76,11 @@ async fn main() -> Result<()> { let (pool, _container) = setup_database().await?; let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::new("spline_reticulator") .register::() - .archive(ArchivalPolicy::Always) - }) + .archive(ArchivalPolicy::Always), + ) .shutdown_when_queue_empty(); ArchiveCleanerBuilder::new() diff --git a/src/runner.rs b/src/runner.rs index 0b7ab71..046116c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -3,7 +3,6 @@ use crate::worker::Worker; use crate::{BackgroundJob, schema}; use futures_util::future::join_all; use sqlx::PgPool; -use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -25,7 +24,7 @@ pub struct Unconfigured; /// The core runner responsible for locking and running jobs pub struct Runner { connection_pool: PgPool, - queues: HashMap>, + queues: Vec>, context: Context, shutdown_when_queue_empty: bool, _state: PhantomData, @@ -36,7 +35,15 @@ impl std { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Runner") - .field("queues", &self.queues.keys().collect::>()) + .field( + "queues", + &self + .queues + .iter() + .enumerate() + .map(|(qid, q)| q.name.clone().unwrap_or_else(|| qid.to_string())) + .collect::>(), + ) .field("context", &self.context) .field("shutdown_when_queue_empty", &self.shutdown_when_queue_empty) .finish() @@ -48,7 +55,7 @@ impl Runner { pub fn new(connection_pool: PgPool, context: Context) -> Self { Self { connection_pool, - queues: HashMap::new(), + queues: Vec::new(), context, shutdown_when_queue_empty: false, _state: PhantomData, @@ -57,15 +64,9 @@ impl Runner { } impl Runner { - /// Configure a queue - pub fn configure_queue( - mut self, - queue_name: &str, - config_fn: impl FnOnce(Queue) -> Queue, - ) -> Runner { - self.queues - .insert(queue_name.into(), config_fn(Queue::default())); - + /// TODO: Tentative API - replace `configure_queue` if accepted + pub fn add_queue(mut self, queue: Queue) -> Runner { + self.queues.push(queue); Runner { connection_pool: self.connection_pool, queues: self.queues, @@ -88,9 +89,15 @@ impl Runner { /// This returns a `RunningRunner` which can be used to wait for the workers to shutdown. pub fn start(&self) -> RunHandle { let mut handles = Vec::new(); - for (queue_name, queue) in &self.queues { - for i in 1..=queue.num_workers { - let name = format!("background-worker-{queue_name}-{i}"); + for (queue_index, queue) in self.queues.iter().enumerate() { + for i in 0..queue.num_workers { + let name = format!( + "queue-{queue_name}-worker-{i}", + queue_name = queue + .name + .clone() + .unwrap_or_else(|| queue_index.to_string()), + ); info!(worker.name = %name, "Starting worker…"); let worker = Worker { @@ -156,6 +163,8 @@ impl std::fmt::Debug for ArchivalPolicy { /// Configuration and state for a job queue #[derive(Debug)] pub struct Queue { + /// Queue name can be set for clearer log output + pub name: Option, job_registry: JobRegistry, num_workers: usize, poll_interval: Duration, @@ -167,6 +176,7 @@ pub struct Queue { impl Default for Queue { fn default() -> Self { Self { + name: None, job_registry: JobRegistry::default(), num_workers: 1, poll_interval: DEFAULT_POLL_INTERVAL, @@ -176,6 +186,18 @@ impl Default for Queue Queue { + /// Make a new, named queue + /// The name is used only in logging. + /// + /// Use `Queue::default()` if you don't need a name. + pub fn new(name: &str) -> Self { + Self { + name: Some(name.to_string()), + ..Default::default() + } + } +} impl Queue { /// Set the number of worker threads for this queue. @@ -210,6 +232,7 @@ impl Queue { pub fn register>(mut self) -> Queue { self.job_registry.register::(); Queue { + name: self.name, job_registry: self.job_registry, num_workers: self.num_workers, poll_interval: self.poll_interval, diff --git a/tests/cli.rs b/tests/cli.rs index b1cd322..b062531 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -159,7 +159,7 @@ async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> { let (pool, _container) = test_utils::setup_test_db().await?; let runner = test_utils::create_test_runner(pool.clone(), test_context.clone()) - .configure_queue("default", Queue::register::); + .add_queue(Queue::default().register::()); let job_id = assert_some!(TestJob.enqueue(&pool).await?); @@ -204,7 +204,7 @@ async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> { let (pool, _container) = test_utils::setup_test_db().await?; let runner = test_utils::create_test_runner(pool.clone(), ()) - .configure_queue("default", Queue::register::); + .add_queue(Queue::default().register::()); assert_eq!(remaining_jobs(&pool).await?, 0); @@ -245,7 +245,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow: let (pool, _container) = test_utils::setup_test_db().await?; let runner = test_utils::create_test_runner(pool.clone(), test_context.clone()) - .configure_queue("default", Queue::register::); + .add_queue(Queue::default().register::()); TestJob.enqueue(&pool).await?; @@ -291,7 +291,7 @@ async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> { let (pool, _container) = test_utils::setup_test_db().await?; let runner = test_utils::create_test_runner(pool.clone(), ()) - .configure_queue("default", Queue::register::); + .add_queue(Queue::default().register::()); let job_id = assert_some!(TestJob.enqueue(&pool).await?); @@ -354,7 +354,7 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> { let (pool, _container) = test_utils::setup_test_db().await?; let runner = Runner::new(pool.clone(), test_context.clone()) - .configure_queue("default", Queue::register::) + .add_queue(Queue::default().register::()) .shutdown_when_queue_empty(); // Enqueue first job @@ -411,13 +411,13 @@ async fn jitter_configuration_affects_polling() -> anyhow::Result<()> { // Test that jitter configuration is accepted and compiles let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .num_workers(1) .poll_interval(Duration::from_millis(100)) - .jitter(Duration::from_millis(50)) // Add up to 50ms jitter - }) + .jitter(Duration::from_millis(50)), // Add up to 50ms jitter + ) .shutdown_when_queue_empty(); // No jobs in queue, so the worker will immediately shut down @@ -456,12 +456,12 @@ async fn archive_functionality_works() -> anyhow::Result<()> { // Configure runner with archiving enabled let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .num_workers(1) - .archive(ArchivalPolicy::Always) // Enable archiving - }) + .archive(ArchivalPolicy::Always), // Enable archiving + ) .shutdown_when_queue_empty(); // Enqueue a test job @@ -738,12 +738,12 @@ async fn archive_cleaner_removes_old_jobs() -> anyhow::Result<()> { // Configure runner with archiving enabled let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .num_workers(1) - .archive(ArchivalPolicy::Always) - }) + .archive(ArchivalPolicy::Always), + ) .shutdown_when_queue_empty(); // Enqueue and process a test job to create an archived job @@ -794,12 +794,12 @@ async fn archive_cleaner_keeps_last_n_jobs() -> anyhow::Result<()> { // Configure runner with archiving enabled let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .num_workers(1) - .archive(ArchivalPolicy::Always) - }) + .archive(ArchivalPolicy::Always), + ) .shutdown_when_queue_empty(); // Enqueue and process multiple test jobs to create archived jobs @@ -855,12 +855,12 @@ async fn archive_cleaner_keeps_last_n_jobs_discards_old() -> anyhow::Result<()> // Configure runner with archiving enabled let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .num_workers(1) - .archive(ArchivalPolicy::Always) - }) + .archive(ArchivalPolicy::Always), + ) .shutdown_when_queue_empty(); // Enqueue and process multiple test jobs to create archived jobs @@ -914,14 +914,14 @@ async fn archive_conditionally() -> anyhow::Result<()> { // Configure runner with predicate-based archiving let runner = Runner::new(pool.clone(), ()) - .configure_queue("default", |queue| { - queue + .add_queue( + Queue::default() .register::() .archive(ArchivalPolicy::If(|job, _ctx| { // Archive only even-numbered jobs, for a 50% sample job.id % 2 == 0 - })) - }) + })), + ) .shutdown_when_queue_empty(); // Enqueue multiple test jobs @@ -941,3 +941,29 @@ async fn archive_conditionally() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn show_potential_api() -> anyhow::Result<()> { + #[derive(Serialize, Deserialize)] + struct SomeJob; + + impl BackgroundJob for SomeJob { + const JOB_TYPE: &'static str = "test_archive_conditionally"; + type Context = (); + + async fn run(&self, _ctx: Self::Context) -> anyhow::Result<()> { + Ok(()) + } + } + + let (pool, _container) = test_utils::setup_test_db().await?; + + let _runner = Runner::new(pool, ()) + .add_queue( + Queue::default() + .register::() + .archive(ArchivalPolicy::If(|job, _ctx| job.id % 2 == 0)), + ) + .shutdown_when_queue_empty(); + Ok(()) +}