Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ async fn main() -> Result<()> {

// Create runner with archiving enabled for important jobs
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<NotificationJob>()
.register::<PaymentJob>()
.num_workers(2)
.poll_interval(Duration::from_millis(100))
.archive(ArchivalPolicy::Always) // Enable archiving for audit trail
.with_name("notifications_payments")
})
.shutdown_when_queue_empty();

Expand Down
2 changes: 1 addition & 1 deletion examples/archive_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn main() -> Result<()> {
let (pool, _container) = setup_database().await?;

let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<ReticulateSplineJob>()
.archive(ArchivalPolicy::Always)
Expand Down
56 changes: 47 additions & 9 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::worker::Worker;
use crate::{BackgroundJob, schema};
use futures_util::future::join_all;
use sqlx::PgPool;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -25,7 +24,7 @@ pub struct Unconfigured;
/// The core runner responsible for locking and running jobs
pub struct Runner<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
connection_pool: PgPool,
queues: HashMap<String, Queue<Context, Configured>>,
queues: Vec<Queue<Context, Configured>>,
context: Context,
shutdown_when_queue_empty: bool,
_state: PhantomData<State>,
Expand All @@ -36,7 +35,15 @@ impl<Context: std::fmt::Debug + Clone + Sync + Send, State: std::fmt::Debug> std
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Runner")
.field("queues", &self.queues.keys().collect::<Vec<_>>())
.field(
"queues",
&self
.queues
.iter()
.enumerate()
.map(|(qid, q)| q.worker_name(qid))
.collect::<Vec<_>>(),
)
.field("context", &self.context)
.field("shutdown_when_queue_empty", &self.shutdown_when_queue_empty)
.finish()
Expand All @@ -48,7 +55,7 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
pub fn new(connection_pool: PgPool, context: Context) -> Self {
Self {
connection_pool,
queues: HashMap::new(),
queues: Vec::new(),
context,
shutdown_when_queue_empty: false,
_state: PhantomData,
Expand All @@ -57,14 +64,24 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
}

impl<Context: Clone + Send + Sync + 'static, State> Runner<Context, State> {
/// TODO: Tentative API - replace `configure_queue` if accepted
pub fn add_queue(mut self, queue: Queue<Context, Configured>) -> Runner<Context, Configured> {
self.queues.push(queue);
Runner {
connection_pool: self.connection_pool,
queues: self.queues,
context: self.context,
shutdown_when_queue_empty: self.shutdown_when_queue_empty,
_state: PhantomData,
}
}

/// Configure a queue
pub fn configure_queue(
mut self,
queue_name: &str,
config_fn: impl FnOnce(Queue<Context>) -> Queue<Context, Configured>,
) -> Runner<Context, Configured> {
self.queues
.insert(queue_name.into(), config_fn(Queue::default()));
self.queues.push(config_fn(Queue::default()));

Runner {
connection_pool: self.connection_pool,
Expand All @@ -88,9 +105,12 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context, Configured> {
/// This returns a `RunningRunner` which can be used to wait for the workers to shutdown.
pub fn start(&self) -> RunHandle {
let mut handles = Vec::new();
for (queue_name, queue) in &self.queues {
for (queue_index, queue) in self.queues.iter().enumerate() {
for i in 1..=queue.num_workers {
let name = format!("background-worker-{queue_name}-{i}");
let name = format!(
"background-worker-{queue_name}-{i}",
queue_name = queue.worker_name(queue_index)
);
info!(worker.name = %name, "Starting worker…");

let worker = Worker {
Expand Down Expand Up @@ -156,6 +176,7 @@ impl<Context> std::fmt::Debug for ArchivalPolicy<Context> {
/// Configuration and state for a job queue
#[derive(Debug)]
pub struct Queue<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
name: Option<String>,
job_registry: JobRegistry<Context>,
num_workers: usize,
poll_interval: Duration,
Expand All @@ -167,6 +188,7 @@ pub struct Queue<Context: Clone + Send + Sync + 'static, State = Unconfigured> {
impl<Context: Clone + Send + Sync + 'static> Default for Queue<Context, Unconfigured> {
fn default() -> Self {
Self {
name: None,
job_registry: JobRegistry::default(),
num_workers: 1,
poll_interval: DEFAULT_POLL_INTERVAL,
Expand All @@ -178,6 +200,21 @@ impl<Context: Clone + Send + Sync + 'static> Default for Queue<Context, Unconfig
}

impl<Context: Clone + Send + Sync + 'static, State> Queue<Context, State> {
/// Get the name of the queue otherwise default to its number
pub fn worker_name(&self, nth: usize) -> String {
if let Some(name) = &self.name {
name.clone()
} else {
nth.to_string()
}
}

/// Set a name for the queue for clearer logs
pub fn with_name(mut self, name: &str) -> Self {
self.name = Some(name.to_owned());
self
}

/// Set the number of worker threads for this queue.
pub fn num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
Expand Down Expand Up @@ -210,6 +247,7 @@ impl<Context: Clone + Send + Sync + 'static, State> Queue<Context, State> {
pub fn register<J: BackgroundJob<Context = Context>>(mut self) -> Queue<Context, Configured> {
self.job_registry.register::<J>();
Queue {
name: self.name,
job_registry: self.job_registry,
num_workers: self.num_workers,
poll_interval: self.poll_interval,
Expand Down
48 changes: 37 additions & 11 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>);
.configure_queue(Queue::register::<TestJob>);

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -204,7 +204,7 @@ async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ())
.configure_queue("default", Queue::register::<TestJob>);
.configure_queue(Queue::register::<TestJob>);

assert_eq!(remaining_jobs(&pool).await?, 0);

Expand Down Expand Up @@ -245,7 +245,7 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow:
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>);
.configure_queue(Queue::register::<TestJob>);

TestJob.enqueue(&pool).await?;

Expand Down Expand Up @@ -291,7 +291,7 @@ async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ())
.configure_queue("default", Queue::register::<TestJob>);
.configure_queue(Queue::register::<TestJob>);

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -354,7 +354,7 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = Runner::new(pool.clone(), test_context.clone())
.configure_queue("default", Queue::register::<TestJob>)
.configure_queue(Queue::register::<TestJob>)
.shutdown_when_queue_empty();

// Enqueue first job
Expand Down Expand Up @@ -411,7 +411,7 @@ async fn jitter_configuration_affects_polling() -> anyhow::Result<()> {

// Test that jitter configuration is accepted and compiles
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.num_workers(1)
Expand Down Expand Up @@ -456,7 +456,7 @@ async fn archive_functionality_works() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.num_workers(1)
Expand Down Expand Up @@ -738,7 +738,7 @@ async fn archive_cleaner_removes_old_jobs() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.num_workers(1)
Expand Down Expand Up @@ -794,7 +794,7 @@ async fn archive_cleaner_keeps_last_n_jobs() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.num_workers(1)
Expand Down Expand Up @@ -855,7 +855,7 @@ async fn archive_cleaner_keeps_last_n_jobs_discards_old() -> anyhow::Result<()>

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.num_workers(1)
Expand Down Expand Up @@ -914,7 +914,7 @@ async fn archive_conditionally() -> anyhow::Result<()> {

// Configure runner with predicate-based archiving
let runner = Runner::new(pool.clone(), ())
.configure_queue("default", |queue| {
.configure_queue(|queue| {
queue
.register::<TestJob>()
.archive(ArchivalPolicy::If(|job, _ctx| {
Expand All @@ -941,3 +941,29 @@ async fn archive_conditionally() -> anyhow::Result<()> {

Ok(())
}

#[tokio::test]
async fn show_potential_api() -> anyhow::Result<()> {
#[derive(Serialize, Deserialize)]
struct SomeJob;

impl BackgroundJob for SomeJob {
const JOB_TYPE: &'static str = "test_archive_conditionally";
type Context = ();

async fn run(&self, _ctx: Self::Context) -> anyhow::Result<()> {
Ok(())
}
}

let (pool, _container) = test_utils::setup_test_db().await?;

let _runner = Runner::new(pool, ())
.add_queue(
Queue::default()
.register::<SomeJob>()
.archive(ArchivalPolicy::If(|job, _ctx| job.id % 2 == 0)),
)
.shutdown_when_queue_empty();
Ok(())
}