diff --git a/.sqlx/sqlx-data.json b/.sqlx/sqlx-data.json deleted file mode 100644 index 0967ef4..0000000 --- a/.sqlx/sqlx-data.json +++ /dev/null @@ -1 +0,0 @@ -{} diff --git a/CHANGELOG.md b/CHANGELOG.md index 7039085..9267468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,126 @@ All notable changes to the Backfill project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project will adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) after reaching version 1.0.0. +## [1.2.0] — Production-readiness audit + +A focused bug-fix release driven by a top-to-bottom production-readiness +review of the priority queue, retry, and DLQ subsystems. Several silent +failure modes are corrected, several misleading APIs are deprecated with +migration paths, and two long-tail correctness bugs in the DLQ subsystem +are fixed. Every fix is covered by a regression test that fails on the +prior code. + +### Critical fixes + +- **DLQ no longer loses jobs on worker restart.** `WorkerRunner` startup + used to call `cleanup_permanently_failed_jobs()` (which `DELETE`s rows + with `attempts >= max_attempts`) before the DLQ processor's first tick + — any job that hit max_attempts while the worker was down was silently + deleted instead of captured. The startup sequence now runs + `process_failed_jobs()` synchronously before cleanup whenever the DLQ + is enabled. +- **`WorkerError::is_retryable()` is now wired to actual retry behaviour.** + The classification was previously dead code; a `WorkerError::ValidationFailed` + would retry up to `max_attempts` instead of going straight to DLQ. + A new `PermanentFailurePlugin` (auto-registered when DLQ is enabled) + hooks `JobFail`, classifies the error, and short-circuits retries for + non-retryable variants by setting `attempts = max_attempts`. +- **`process_failed_jobs` now atomically moves rows from `_private_jobs` to + the DLQ.** Previously the UPSERT and DELETE were separate statements; a + crash between them could duplicate the job. Combined into a single + writable-CTE statement. + +### Breaking changes + +- `WorkerRunner::process_available_jobs` returns `Result<(), BackfillError>` + (was `Result` always returning `Ok(0)`). For + job-count instrumentation, register a `JobComplete` / `JobFail` plugin + before building the worker. +- `WorkerRunner::worker_count()` returns `1` truthfully (was + `queue_configs.len()`, which lied any time multi-queue config was passed + — only one worker has ever actually been spawned). + +### Deprecations + +All marked `#[deprecated(since = "1.2.0")]` with migration notes. +Scheduled for removal in `2.0.0`. + +- `RetryPolicy::new(4-arg)`, `with_jitter`, `calculate_delay`, + `calculate_retry_time`, `JobSpec::calculate_retry_time` — graphile_worker + uses a hard-coded `exp(min(attempts, 10))`-second SQL formula for retry + scheduling, so the timing fields on `RetryPolicy` and the math helpers + that operated on them never reached the worker. Only `max_attempts` was + ever honored. Use `RetryPolicy { max_attempts: n, ..Default::default() }` + or the `fast` / `aggressive` / `conservative` presets, which now + differ only in attempt count. +- `WorkerConfig::with_queues`, `QueueConfig::named_queue`, + `QueueConfig::priority_queue` — graphile_worker doesn't expose + per-worker queue filtering; only the first config's `concurrency` + was ever used at runtime, and `priority_range` was never read. Use + `WorkerConfig::with_concurrency(n)` and route jobs to named queues at + enqueue time via `Queue::serial(name)`. + +### Other fixes and improvements + +- `list_dlq_jobs.total` now respects filters (was returning unfiltered + count regardless of filter, breaking paginated admin UIs). +- `Queue::metric_label()` added — bounded `"parallel"`/`"serial"` label + for built-in metrics. Built-in metric emission no longer uses + unbounded queue names (e.g., `Queue::serial_for("user", id)`) as + Prometheus labels. +- DLQ-side metrics emit `"parallel"` instead of empty string for + parallel-origin jobs. +- `requeue_dlq_job` no longer fails the operation when the post-enqueue + bookkeeping `UPDATE` blips; the enqueue (the real intent) succeeded, + so a stale `requeued_count` is logged at `WARN` rather than propagated + as an error. +- DLQ processor uses exponential backoff on consecutive errors + (interval, 2x, 4x, … capped at 32x). Resets on success. +- `process_failed_jobs` switched from `id NOT IN (subquery)` to + `NOT EXISTS` for the DLQ-membership check — better planner behaviour + and no NULL-poisoning workaround needed. +- `failure_count` on DLQ rows is now a clean touch counter (1 on first + DLQ landing, +1 each subsequent UPSERT) instead of cumulative + handler-attempts. +- `cleanup_permanently_failed_jobs` now logs at `WARN` when DLQ is + disabled (those jobs cannot be recovered) and `INFO` when DLQ exists. +- `delete_dlq_job` halved its DB round-trips (single `DELETE … RETURNING`). +- `enqueue` records duration on every outcome, not just success. +- Stale-lock SQL parameterized instead of format-interpolating timeout. +- `enqueue_emergency` no longer redundantly sets `run_at = NOW()`. + +### Documentation + +- Truthful `RetryPolicy` rustdoc — explicit about which fields graphile_worker + honors and which are stored-but-ignored. +- `EnqueueOutcome::AlreadyInProgress` rustdoc rewritten to flag the + footgun of unconditionally `.unwrap()`ing. +- `docs/01-database-setup.md` clarifies that backfill itself uses runtime + SQLx queries; the offline-mode setup is for *user* queries. +- `docs/02-dlq.md` "queue_name shows as default" warning marked resolved + (was fixed in 1.1.1 / PR #9, docs were stale). +- `Cargo.toml` warns about the `_private_*` schema coupling so + `cargo update` doesn't silently break across graphile_worker patch + releases. +- `lib.rs` documents which `graphile_worker` types are re-exported and + the SemVer implications of upstream changes. + +### Internals + +- Removed orphan `src/client/queries.rs` (was never `mod`-declared). +- Removed empty `.sqlx/sqlx-data.json` (compile-time query macros aren't + used). + +### Tests + +- Test count: 107 → 115 (+8). New regression tests for the startup race, + the retryable / non-retryable plugin behaviour, the filtered-pagination + fix, the failure_count touch counter, the bounded metric label, an + end-to-end retry-to-DLQ path through the actual worker, and a + concurrent-enqueue stress test. +- All P0 and P1 fixes were verified to *fail* on the prior code via + `git stash` / re-run before being committed. + ## [Unreleased] ### Breaking Changes diff --git a/Cargo.lock b/Cargo.lock index 7135c02..ea0a9b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,18 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -191,7 +179,7 @@ dependencies = [ [[package]] name = "backfill" -version = "1.1.1" +version = "1.2.0" dependencies = [ "axum", "chrono", @@ -582,6 +570,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "evmap" +version = "11.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8874945f036109c72242964c1174cf99434e30cfa45bf45fedc983f50046f8" +dependencies = [ + "hashbag", + "left-right", + "smallvec", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -737,6 +736,21 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows-link", + "windows-result", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -987,9 +1001,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733" dependencies = [ "atomic-waker", "bytes", @@ -1004,6 +1018,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbag" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7040a10f52cba493ddb09926e15d10a9d8a28043708a405931fe4c6f19fac064" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1318,9 +1338,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" dependencies = [ "icu_normalizer", "icu_properties", @@ -1401,10 +1421,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.95" +version = "0.3.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +checksum = "a1840c94c045fbcf8ba2812c95db44499f7c64910a912551aaaa541decebcacf" dependencies = [ + "cfg-if", + "futures-util", "once_cell", "wasm-bindgen", ] @@ -1434,6 +1456,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "left-right" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a" +dependencies = [ + "crossbeam-utils", + "loom", + "slab", +] + [[package]] name = "libc" version = "0.2.186" @@ -1455,7 +1488,7 @@ dependencies = [ "bitflags", "libc", "plain", - "redox_syscall 0.7.4", + "redox_syscall 0.7.5", ] [[package]] @@ -1489,6 +1522,19 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "matchers" version = "0.2.0" @@ -1522,21 +1568,22 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "metrics" -version = "0.24.3" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +checksum = "ff56c2e7dce6bd462e3b8919986a617027481b1dcc703175b58cf9dd98a2f071" dependencies = [ - "ahash", "portable-atomic", + "rapidhash", ] [[package]] name = "metrics-exporter-prometheus" -version = "0.18.1" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda" +checksum = "1db0d8f1fc9e62caebd0319e11eaec5822b0186c171568f0480b46a0137f9108" dependencies = [ "base64", + "evmap", "http-body-util", "hyper", "hyper-rustls", @@ -1554,9 +1601,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.20.1" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +checksum = "9e56997f084e57b045edf17c3ed8ba7f9f779c670df8206dfd1c736f4c02dc4a" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -1565,6 +1612,7 @@ dependencies = [ "quanta", "rand 0.9.4", "rand_xoshiro", + "rapidhash", "sketches-ddsketch", ] @@ -1938,6 +1986,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rapidhash" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" +dependencies = [ + "rustversion", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -1958,9 +2015,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a" +checksum = "4666a1a60d8412eab19d94f6d13dcc9cea0a5ef4fdf6a5db306537413c661b1b" dependencies = [ "bitflags", ] @@ -2030,9 +2087,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.39" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "aws-lc-rs", "once_cell", @@ -2097,6 +2154,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2628,9 +2691,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.1" +version = "1.52.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" +checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" dependencies = [ "bytes", "libc", @@ -2706,9 +2769,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.8" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" dependencies = [ "bitflags", "bytes", @@ -2959,9 +3022,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +checksum = "df52b6d9b87e0c74c9edfa1eb2d9bf85e5d63515474513aa50fa181b3c4f5db1" dependencies = [ "cfg-if", "once_cell", @@ -2972,9 +3035,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +checksum = "78b1041f495fb322e64aca85f5756b2172e35cd459376e67f2a6c9dffcedb103" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2982,9 +3045,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +checksum = "9dcd0ff20416988a18ac686d4d4d0f6aae9ebf08a389ff5d29012b05af2a1b41" dependencies = [ "bumpalo", "proc-macro2", @@ -2995,9 +3058,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +checksum = "49757b3c82ebf16c57d69365a142940b384176c24df52a087fb748e2085359ea" dependencies = [ "unicode-ident", ] @@ -3038,9 +3101,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.95" +version = "0.3.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +checksum = "2eadbac71025cd7b0834f20d1fe8472e8495821b4e9801eb0a60bd1f19827602" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 8e6542f..03b71b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "backfill" -version = "1.1.1" +version = "1.2.0" edition = "2024" description = "A boringly-named priority work queue system for doing async tasks." categories = ["concurrency", "data-structures"] @@ -16,6 +16,11 @@ axum = ["dep:axum"] [dependencies] chrono = { version = "0.4", features = ["serde"] } +# IMPORTANT: backfill queries graphile_worker's internal `_private_*` tables +# directly (cleanup, DLQ scanner, admin endpoints). The `_private_*` prefix +# signals these are internal API. Even within 0.11.x, a patch release that +# touches the internal schema can silently break backfill — re-run the +# integration tests against any new graphile_worker version before shipping. graphile_worker = "0.11.1" graphile_worker_crontab_parser = "0.5.12" log = "0.4.28" diff --git a/README.md b/README.md index 0f22321..72888c7 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # backfill [![CI](https://github.com/ceejbot/backfill/workflows/CI/badge.svg)](https://github.com/ceejbot/backfill/actions) -[![Coverage](https://img.shields.io/badge/coverage-64.67%25-yellow)](https://github.com/ceejbot/backfill/actions) [![Security](https://github.com/ceejbot/backfill/actions/workflows/security.yml/badge.svg)](https://github.com/ceejbot/backfill/actions/workflows/security.yml) A boringly-named priority queue system for doing async work. This library and work process wrap the the [graphile_worker crate](https://lib.rs/crates/graphile_worker) to do things the way I want to do them. It's unlikely you'll want to do things exactly this way, but perhaps you can learn by reading the code, or get a jumpstart by borrowing open-source code, or heck, maybe this will do what you need. @@ -10,37 +9,70 @@ A boringly-named priority queue system for doing async work. This library and wo This is a postgres-backed async work queue library that is a set of conveniences and features on top of the rust port of Graphile Worker. It gives you a library you can integrate with your own project to handle background tasks. -> **Status**: Core features are complete and tested (64.67%% test coverage, 55 tests). The library is suitable for production use for job enqueueing, worker processing, and DLQ management. The Admin API (feature-gated) is experimental. See [CHANGELOG.md](CHANGELOG.md) for details and [Known Limitations](docs/02-dlq.md#known-limitations). +> **Status**: Core features are complete and covered by an integration test +> suite. The library is suitable for production use for job enqueueing, +> worker processing, and DLQ management. The Admin API (feature-gated) is +> experimental. See [CHANGELOG.md](CHANGELOG.md) for details and +> [Known Limitations](docs/02-dlq.md#known-limitations). ### What's New Over graphile_worker -Built on top of `graphile_worker` (v0.8.6), backfill adds these production-ready features: - -- 🎯 **Priority System** - Six-level priority queue (EMERGENCY to BULK_LOWEST) with numeric priority values -- 📦 **Named Queues** - Pre-configured Fast/Bulk queues plus custom queue support -- 🔄 **Smart Retry Policies** - Exponential backoff with jitter (fast/aggressive/conservative presets) -- 💀 **Dead Letter Queue (DLQ)** - Automatic failed job handling with query/requeue/deletion APIs -- 📊 **Comprehensive Metrics** - Prometheus-compatible metrics for jobs, DLQ, and database operations -- 🛠️ **High-Level Client API** - `BackfillClient` with ergonomic enqueueing helpers -- 🏃 **Flexible Worker Patterns** - `WorkerRunner` supporting tokio::select!, background tasks, and one-shot processing -- 🔧 **Admin API** - Optional Axum router for HTTP-based job management (experimental) -- 📝 **Convenience Functions** - `enqueue_fast()`, `enqueue_bulk()`, `enqueue_critical()`, etc. -- 🧹 **Stale Lock Cleanup** - Automatic cleanup of orphaned locks from crashed workers (startup + periodic) +Built on top of `graphile_worker` (v0.11.x), backfill adds these production-ready features: + +- 🎯 **Priority System** — Six-level priority enum (EMERGENCY=-20 down through + BULK_LOWEST=10), mapped through to graphile_worker's `priority asc` fetch + ordering. +- 📦 **Parallel + Serial queues** — `Queue::Parallel` (default, jobs run + concurrently across workers) or `Queue::Serial(name)` (one-job-at-a-time + per named queue, for rate limiting or per-entity ordering). +- 🔄 **Retry policy presets** — `fast` / `aggressive` / `conservative` + presets that differ in `max_attempts`. Note: backoff *timing* is fixed by + graphile_worker (see [`docs/02-dlq.md`](docs/02-dlq.md)) — only the + attempt count is configurable. +- 💀 **Dead Letter Queue (DLQ)** — Automatic failed-job handling with + query/requeue/deletion APIs. Includes a permanent-failure short-circuit + plugin: handlers that return non-retryable `WorkerError` variants land in + the DLQ on the first failure rather than waiting for `max_attempts` to + exhaust. +- 📊 **Comprehensive Metrics** — Prometheus-compatible metrics for jobs, + DLQ, and database operations. +- 🛠️ **High-Level Client API** — `BackfillClient` with ergonomic enqueueing + helpers. +- 🏃 **Flexible Worker Patterns** — `WorkerRunner` supporting + `tokio::select!`, background tasks, and one-shot processing. +- 🔧 **Admin API** — Optional Axum router for HTTP-based job management + (experimental). +- 📝 **Convenience Functions** — `enqueue_fast()`, `enqueue_bulk()`, + `enqueue_critical()`, etc. +- 🧹 **Stale Lock Cleanup** — Automatic cleanup of orphaned locks from + crashed workers (startup + periodic). Ordered correctly with the DLQ + scanner so failed jobs aren't lost across restarts. All built on graphile_worker's rock-solid foundation of PostgreSQL SKIP LOCKED and LISTEN/NOTIFY. ### Features -- **Priority queues**: EMERGENCY, FAST_HIGH, FAST_DEFAULT, BULK_DEFAULT, BULK_LOW, BULK_LOWEST -- **Named queues**: Fast, Bulk, DeadLetter, Custom(name) -- **Scheduling**: Immediate or delayed execution with `run_at` -- **Idempotency**: Use `job_key` for deduplication -- **Exponential backoff**: Built-in retry policies with jitter to prevent thundering herds -- **Dead letter queue**: Handling jobs that experience un-retryable failures or exceed their retry limits -- **Error handling**: Automatic retry classification -- **Metrics**: Comprehensive metrics via the `metrics` crate - bring your own exporter (Prometheus, StatsD, etc.) -- **Monitoring**: Structured logging and tracing throughout -- **Building blocks for an axum admin api**: via a router you can mount on your own axum api server +- **Priority queues**: EMERGENCY (-20), FAST_HIGH (-10), FAST_DEFAULT (-5), + BULK_DEFAULT (0), BULK_LOW (5), BULK_LOWEST (10) — lower number = higher + priority. +- **Queue types**: `Queue::Parallel` (default), `Queue::Serial(name)` — plus + `Queue::serial_for(entity, id)` for per-entity ordering. +- **Scheduling**: Immediate or delayed execution with `run_at`. +- **Idempotency**: Use `job_key` for deduplication. +- **Retries**: Configurable `max_attempts` per job; graphile_worker handles + the exponential-backoff schedule (`exp(min(attempts, 10))` seconds, capped + at ~6h per retry). +- **Dead letter queue**: Automatic capture of jobs that exceed their retry + limits or return non-retryable errors. Includes a synchronous startup + pre-move so DLQ doesn't lose jobs across worker restarts. +- **Error classification**: `WorkerError` variants split into retryable and + non-retryable; non-retryable errors short-circuit retries to DLQ via an + auto-registered lifecycle plugin. +- **Metrics**: Comprehensive metrics via the `metrics` crate — bring your + own exporter (Prometheus, StatsD, etc.). +- **Monitoring**: Structured logging and tracing throughout. +- **Building blocks for an axum admin api**: via a router you can mount on + your own axum api server. Look at the `examples/` directory and the readme there for practical usage examples. @@ -109,22 +141,18 @@ When workers crash without graceful shutdown, they can leave locks behind that p **⚠️ Warning:** Setting `stale_job_lock_timeout` too short can cause duplicate job execution if jobs legitimately run longer than the timeout. This can lead to data corruption. -### SQLx Compile-Time Query Verification - -This library uses SQLx's compile-time query verification for production safety. Set `DATABASE_URL` during compilation to enable type-safe, compile-time checked SQL queries: +### SQLx usage -```bash -export DATABASE_URL="postgresql://localhost:5432/backfill" -cargo build # Queries verified against actual database schema -``` - -Alternatively, use offline mode with pre-generated query metadata: -```bash -cargo sqlx prepare # Generates .sqlx/sqlx-data.json -cargo build # Uses cached metadata, no database required -``` +Backfill currently uses runtime SQLx queries (`sqlx::query()` / +`sqlx::query_scalar()`) rather than the compile-time-checked +`sqlx::query!()` / `query_as!()` macros. No `DATABASE_URL` is required at +compile time, and there's no `.sqlx/` metadata cache to maintain. Schema +errors surface at runtime (caught by the integration test suite). -See [Database Setup](docs/01-database-setup.md#sqlx-compile-time-query-verification) for detailed setup instructions and best practices. +If you write your own SQLx queries against backfill's tables in *your* +application, the compile-time macros are a great fit — see +[Database Setup](docs/01-database-setup.md) for `cargo sqlx prepare` +guidance. ### Automatic Setup diff --git a/docs/01-database-setup.md b/docs/01-database-setup.md index 79ad872..5287c2b 100644 --- a/docs/01-database-setup.md +++ b/docs/01-database-setup.md @@ -218,7 +218,21 @@ let client = BackfillClient::new_with_schema(&url, \"app1_jobs\").await?; let client = BackfillClient::new_with_schema(&url, \"app2_jobs\").await?; ``` -## SQLx Compile-Time Query Verification +## SQLx usage + +> **Note on backfill itself**: backfill uses runtime SQLx queries +> (`sqlx::query()` and friends) rather than the compile-time +> `sqlx::query!()` macros. This is a deliberate choice — backfill's SQL +> targets dynamic schema names (the user's chosen schema, not a fixed +> identifier), which the compile-time macros don't support cleanly. So +> `DATABASE_URL` is **not** required at compile time, there is no `.sqlx/` +> metadata to maintain, and schema mismatches surface at runtime (the +> integration test suite is the safety net). +> +> The setup below is a recommendation for **your own** queries against +> backfill's tables, in code you write that uses SQLx. If you don't write +> any `sqlx::query!()` / `sqlx::query_as!()` macros yourself, you can +> skip this section. This library uses SQLx for all database operations, which provides excellent compile-time verification of SQL queries. This is one of Rust's greatest strengths for database applications. diff --git a/docs/02-dlq.md b/docs/02-dlq.md index f4ee570..55f5511 100644 --- a/docs/02-dlq.md +++ b/docs/02-dlq.md @@ -880,16 +880,21 @@ The DLQ system is fully functional for production use, but has a few known limit ### 1. Queue Name Tracking -**Issue**: DLQ entries may show `queue_name` as `"default"` even if the job ran in a different queue (e.g., "fast" or "bulk"). +**Status**: Fixed in v1.1.1 (PR #9). -**Cause**: The GraphileWorker `Job` struct doesn't expose the queue name field, so when jobs are moved to the DLQ, the queue name defaults to `"default"`. +DLQ entries now correctly preserve the queue type of the original job: -**Workarounds**: -- The `task_identifier` field is always accurate and can be used for filtering -- Job priority is preserved, which often correlates with queue assignment -- For critical workflows, track queue assignment in your application logs or metrics +- Parallel jobs (`Queue::Parallel`) are stored with an empty `queue_name`. + When requeued, they go back to `Queue::Parallel` — concurrent execution + is preserved. +- Serial jobs (`Queue::Serial(name)`) are stored with their queue name. + When requeued, they go back to `Queue::Serial(name)` — single-job-at-a- + time semantics are preserved. -**Future**: This will be resolved when GraphileWorker exposes queue_name on the Job struct, or when we implement direct database querying. +The DDL retains a `DEFAULT 'default'` clause for the `queue_name` column for +schema compatibility, but it is never used by the production code path — +`add_to_dlq` and `process_failed_jobs` always pass an explicit value +(possibly the empty string for parallel jobs). ### 2. Payload Visibility @@ -929,7 +934,7 @@ These limitations are minor and don't affect the core DLQ functionality: - ✅ **Error message capture** - Fully functional - ✅ **Requeuing workflows** - Production-ready - ✅ **Statistics and monitoring** - Complete -- ⚠️ **Queue name tracking** - Shows "default" for all queues +- ✅ **Queue name tracking** - Fixed in v1.1.1 (parallel ↔ serial round-trip preserved) - ⚠️ **Payload inspection** - Requires direct DB access - ⚠️ **Job cancellation** - Not yet implemented diff --git a/examples/basic_worker.rs b/examples/basic_worker.rs index eea89d4..c6ed622 100644 --- a/examples/basic_worker.rs +++ b/examples/basic_worker.rs @@ -16,8 +16,7 @@ use std::num::ParseIntError; use std::time::Duration; use backfill::{ - BackfillError, IntoTaskHandlerResult, QueueConfig, TaskHandler, WorkerConfig, WorkerContext, WorkerError, - WorkerRunner, + BackfillError, IntoTaskHandlerResult, TaskHandler, WorkerConfig, WorkerContext, WorkerError, WorkerRunner, }; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; @@ -107,19 +106,22 @@ impl ExampleWorkerConfig { impl From for WorkerConfig { fn from(value: ExampleWorkerConfig) -> Self { - WorkerConfig { - database_url: value.database_url, - schema: value.schema, - queue_configs: vec![ - QueueConfig::named_queue("fast", value.fast_concurrency), - QueueConfig::named_queue("bulk", value.bulk_concurrency), - QueueConfig::named_queue("dead_letter", value.dlq_concurrency), - QueueConfig::default_queue(5), // Default queue with moderate concurrency - ], - poll_interval: value.poll_interval, - dlq_processor_interval: Some(value.dlq_processor_interval), - ..Default::default() - } + // A single WorkerRunner spawns one Worker; pick the highest of the + // configured per-queue concurrencies so this worker can keep up with + // any of them. To run truly separate workers per queue, spawn + // multiple WorkerRunner instances yourself — graphile_worker doesn't + // expose per-worker queue filtering. + let concurrency = value + .fast_concurrency + .max(value.bulk_concurrency) + .max(value.dlq_concurrency) + .max(5); + + WorkerConfig::new(value.database_url) + .with_schema(value.schema) + .with_concurrency(concurrency) + .with_poll_interval(value.poll_interval) + .with_dlq_processor_interval(Some(value.dlq_processor_interval)) } } diff --git a/examples/enqueue_jobs.rs b/examples/enqueue_jobs.rs index 7bc310b..d6c9528 100644 --- a/examples/enqueue_jobs.rs +++ b/examples/enqueue_jobs.rs @@ -248,14 +248,15 @@ async fn main() -> Result<(), Box> { outcome.expect("outcome should contain a job").id() ); - // Enqueue a job with custom retry policy - let custom_retry_policy = RetryPolicy::new( - 6, // 6 attempts - std::time::Duration::from_millis(500), // Start with 500ms - std::time::Duration::from_secs(60), // Cap at 1 minute - 1.8, // 1.8x multiplier - ) - .with_jitter(0.2); // 20% jitter + // Enqueue a job with custom retry settings. + // Note: only `max_attempts` affects runtime behaviour. graphile_worker + // schedules retries via a fixed `exp(min(attempts, 10))` second formula, + // so the timing fields on RetryPolicy are not honored. See the docs on + // `RetryPolicy` for the full story. + let custom_retry_policy = RetryPolicy { + max_attempts: 6, + ..Default::default() + }; let custom_job = GenerateReportJob { report_type: "analytics_summary".to_string(), diff --git a/src/client/cleanup.rs b/src/client/cleanup.rs index e577a24..22dd5d0 100644 --- a/src/client/cleanup.rs +++ b/src/client/cleanup.rs @@ -57,18 +57,22 @@ impl BackfillClient { pub async fn release_stale_queue_locks(&self, timeout: Duration) -> Result { let timeout_secs = timeout.as_secs(); + // Bind timeout via parameter rather than format-interpolating it. + // u64 → i64 cast is safe: i64::MAX seconds is ~292 billion years. let query = format!( r#" UPDATE {schema}._private_job_queues SET locked_at = NULL, locked_by = NULL WHERE locked_at IS NOT NULL - AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds' + AND locked_at < NOW() - ($1::bigint * interval '1 second') "#, schema = self.schema, - timeout_secs = timeout_secs ); - let result = sqlx::query(&query).execute(&self.pool).await?; + let result = sqlx::query(&query) + .bind(timeout_secs as i64) + .execute(&self.pool) + .await?; let released = result.rows_affected(); // Always emit metrics (counter increments even for 0) @@ -105,13 +109,15 @@ impl BackfillClient { UPDATE {schema}._private_jobs SET locked_at = NULL, locked_by = NULL WHERE locked_at IS NOT NULL - AND locked_at < NOW() - INTERVAL '{timeout_secs} seconds' + AND locked_at < NOW() - ($1::bigint * interval '1 second') "#, schema = self.schema, - timeout_secs = timeout_secs ); - let result = sqlx::query(&query).execute(&self.pool).await?; + let result = sqlx::query(&query) + .bind(timeout_secs as i64) + .execute(&self.pool) + .await?; let released = result.rows_affected(); // Always emit metrics (counter increments even for 0) @@ -134,9 +140,25 @@ impl BackfillClient { /// remain in the main queue with `is_available = false`. These jobs /// will never be processed again and should be cleaned up. /// - /// Note: These jobs should already be captured to the DLQ by the task - /// handler or DLQ processor before reaching this state. This function - /// removes the leftover rows from the main queue. + /// # Important: ordering with the DLQ + /// + /// **This function deletes the same rows the DLQ processor uses as input.** + /// If you call it before `process_failed_jobs()` has captured those rows + /// into the DLQ, those jobs are lost forever — they leave the main queue + /// without ever reaching the DLQ. + /// + /// Safe usage when DLQ is enabled: + /// 1. Call `process_failed_jobs()` first (moves rows into DLQ). + /// 2. Then call this function (cleans up anything the DLQ processor chose + /// not to move — typically jobs with `max_attempts = 0`, which the DLQ + /// processor explicitly skips). + /// + /// `WorkerRunner::run_until_cancelled` already enforces this ordering at + /// startup. Direct callers (ad-hoc maintenance scripts, etc.) must enforce + /// it themselves. + /// + /// If DLQ is disabled, this function is the only cleanup mechanism and will + /// silently delete failed jobs — that is by design. /// /// # Returns /// Number of permanently failed jobs that were deleted @@ -157,10 +179,38 @@ impl BackfillClient { crate::metrics::record_cleanup_failed_jobs_deleted(deleted); if deleted > 0 { - log::info!( - "Cleaned up permanently failed jobs from main queue (count: {})", - deleted - ); + // When the DLQ is enabled, these rows have already been captured + // (either by an earlier `process_failed_jobs()` tick or by the + // synchronous pre-cleanup move in `WorkerRunner` startup) so this + // delete is just garbage collection — INFO is fine. When the DLQ + // is *not* enabled, this delete is the only mechanism removing + // failed jobs from the main queue and they are gone forever: + // surface that loudly so an operator who didn't realize that's + // the consequence can see it in their logs. + // + // Detection uses `to_regclass` which returns NULL if the table + // doesn't exist. Failure of the existence check itself doesn't + // matter — we default to the louder log on uncertainty. + let dlq_oid: Option = sqlx::query_scalar("SELECT to_regclass($1)::text") + .bind(format!("{}.backfill_dlq", self.schema)) + .fetch_one(&self.pool) + .await + .ok() + .flatten(); + + if dlq_oid.is_some() { + log::info!( + "Cleaned up permanently failed jobs from main queue (count: {})", + deleted + ); + } else { + log::warn!( + "Deleted permanently failed jobs from main queue WITHOUT DLQ capture \ + (count: {}); they cannot be recovered. Enable dlq_processor_interval \ + to retain failed jobs for inspection.", + deleted + ); + } } Ok(deleted) @@ -186,6 +236,14 @@ impl BackfillClient { /// This allows configuring the stale lock thresholds for environments /// where the defaults aren't appropriate. /// + /// # DLQ ordering note + /// + /// This calls `cleanup_permanently_failed_jobs()`, which DELETEs rows from + /// `_private_jobs` where `attempts >= max_attempts`. If you run a DLQ, + /// **call `process_failed_jobs()` first** so those rows reach the DLQ + /// before they're deleted. `WorkerRunner::run_until_cancelled` does this + /// automatically; direct callers must do it themselves. + /// /// # Arguments /// * `queue_lock_timeout` - Timeout for queue locks (normally held for ms) /// * `job_lock_timeout` - Timeout for job locks (held during job execution) diff --git a/src/client/dlq.rs b/src/client/dlq.rs index 2bf2688..5cd836b 100644 --- a/src/client/dlq.rs +++ b/src/client/dlq.rs @@ -32,7 +32,13 @@ pub struct DlqJob { pub max_attempts: Option, /// Human-readable failure reason pub failure_reason: String, - /// Number of times the job failed + /// How many times this logical job (by `job_key`) has reached the DLQ. + /// + /// For jobs with a `job_key`, the DLQ row is upserted: a job that fails, + /// gets requeued by an admin, and fails again touches the same DLQ row + /// twice — `failure_count` increments by 1 each time. For jobs without + /// a `job_key` every failure creates a fresh DLQ row, so this always + /// reads `1`. pub failure_count: i32, /// Last error details as JSON pub last_error: Option, @@ -248,9 +254,25 @@ impl BackfillClient { crate::metrics::record_dlq_age(&job.task_identifier, age_seconds); } - // Get total count for pagination (simplified - could be optimized) - let count_query = format!("SELECT COUNT(*) FROM {}.backfill_dlq", self.schema); - let total: i64 = sqlx::query_scalar(&count_query).fetch_one(&self.pool).await?; + // Get total count for pagination, applying the same filters as the list + // query. Without this, paging UIs that filter (e.g., by task) compute + // wrong page counts because `total` would reflect every DLQ row, not + // the filtered subset. + let mut count_builder = sqlx::QueryBuilder::new("SELECT COUNT(*) FROM "); + count_builder.push(&self.schema).push(".backfill_dlq WHERE 1=1"); + if let Some(task) = &filter.task_identifier { + count_builder.push(" AND task_identifier = ").push_bind(task); + } + if let Some(queue) = &filter.queue_name { + count_builder.push(" AND queue_name = ").push_bind(queue); + } + if let Some(from) = filter.failed_after { + count_builder.push(" AND failed_at >= ").push_bind(from); + } + if let Some(to) = filter.failed_before { + count_builder.push(" AND failed_at <= ").push_bind(to); + } + let total: i64 = count_builder.build_query_scalar().fetch_one(&self.pool).await?; Ok(DlqJobList { jobs, @@ -306,6 +328,18 @@ impl BackfillClient { } /// Requeue a job from the DLQ back to the main queue. + /// + /// Two SQL operations happen back-to-back: + /// 1. Enqueue the job (via `WorkerUtils::add_raw_job`). + /// 2. UPDATE the DLQ row's `requeued_count`, `last_requeued_at`, `notes`. + /// + /// `WorkerUtils::add_raw_job` hard-codes the pool, so we can't bundle + /// these into a single transaction. To prefer truthful return values, + /// step 2 failures are logged at WARN level rather than failing the + /// operation: a successful enqueue followed by a failed bookkeeping + /// UPDATE returns `Ok(job)` (the job IS in the queue) with a stale + /// `requeued_count` in the DLQ admin view. The next refresh of the row + /// will show the discrepancy if it matters. pub async fn requeue_dlq_job(&self, dlq_id: i64, notes: Option) -> Result { // Get the DLQ job let dlq_job = self @@ -347,7 +381,8 @@ impl BackfillClient { }; // Record metrics - crate::metrics::record_dlq_job_requeued(&dlq_job.task_identifier, spec.queue.as_str()); + // Use bounded metric_label() to keep label cardinality small. + crate::metrics::record_dlq_job_requeued(&dlq_job.task_identifier, spec.queue.metric_label()); log::info!( "Job requeued from DLQ (dlq_id: {}, job_id: {}, task: {})", @@ -368,35 +403,48 @@ impl BackfillClient { self.schema ); - sqlx::query(&update_query) + // Bookkeeping update — see function docstring. If this fails, the + // job has already been re-enqueued; we return success but log so an + // operator can see that the DLQ row's counters didn't advance. + if let Err(e) = sqlx::query(&update_query) .bind(¬es) .bind(dlq_id) .execute(&self.pool) - .await?; + .await + { + log::warn!( + "DLQ row bookkeeping update failed after successful requeue \ + (dlq_id: {}, job_id: {}, error: {}); requeued_count/last_requeued_at \ + may be stale", + dlq_id, + job.id(), + e + ); + } Ok(*job) } /// Delete a job from the DLQ permanently. pub async fn delete_dlq_job(&self, dlq_id: i64) -> Result { - // Get the job first to record task identifier in metrics - let task_identifier = if let Some(job) = self.get_dlq_job(dlq_id).await? { - Some(job.task_identifier.clone()) - } else { - None - }; - - let query = format!("DELETE FROM {}.backfill_dlq WHERE id = $1", self.schema); - let result = sqlx::query(&query).bind(dlq_id).execute(&self.pool).await?; - - let deleted = result.rows_affected() > 0; + // Single round-trip: DELETE … RETURNING gives us the task_identifier + // for the metric in the same query. Returns None if the row didn't + // exist (no rows deleted). + let query = format!( + "DELETE FROM {}.backfill_dlq WHERE id = $1 RETURNING task_identifier", + self.schema + ); + let task_identifier: Option = sqlx::query_scalar(&query) + .bind(dlq_id) + .fetch_optional(&self.pool) + .await?; - if deleted && let Some(task) = task_identifier { - crate::metrics::record_dlq_job_deleted(&task); + if let Some(task) = &task_identifier { + crate::metrics::record_dlq_job_deleted(task); log::info!("Job deleted from DLQ (dlq_id: {}, task: {})", dlq_id, task); } - Ok(deleted) + Ok(task_identifier.is_some()) } /// Delete DLQ entries by job_key. @@ -512,26 +560,29 @@ impl BackfillClient { String::new() }; - // Use UPSERT to handle the case where a requeued job fails again. - // If a DLQ entry with the same job_key already exists, update it - // instead of creating a duplicate. This ensures one DLQ entry per - // logical job and keeps failed_at current for cooldown calculations. + // UPSERT to handle the case where a requeued job fails again. If a + // DLQ entry with the same job_key already exists, update it instead + // of creating a duplicate — keeps `failed_at` current and counts the + // touch in `failure_count`. + // + // `failure_count` is a count of DLQ-touch events for this logical job + // (1 for first DLQ landing, +1 each subsequent requeue-then-fail). + // It is NOT a cumulative count of handler-failure invocations. let upsert_query = format!( r#" - INSERT INTO {}.backfill_dlq ( + INSERT INTO {schema}.backfill_dlq ( original_job_id, task_identifier, payload, queue_name, priority, job_key, max_attempts, failure_reason, failure_count, last_error, original_created_at, original_run_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 1, $9, $10, $11) ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET failed_at = NOW(), - failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count, + failure_count = {schema}.backfill_dlq.failure_count + 1, failure_reason = EXCLUDED.failure_reason, last_error = EXCLUDED.last_error, original_job_id = EXCLUDED.original_job_id RETURNING * "#, - self.schema, schema = self.schema ); @@ -544,7 +595,6 @@ impl BackfillClient { .bind(original_job.key()) .bind(original_job.max_attempts()) .bind(failure_reason) - .bind(original_job.attempts()) .bind(&last_error) .bind(original_job.created_at()) .bind(original_job.run_at()) @@ -574,7 +624,13 @@ impl BackfillClient { // Record metrics crate::metrics::record_db_operation("dlq_add", "success"); crate::metrics::record_db_operation_duration("dlq_add", start.elapsed().as_secs_f64()); - crate::metrics::record_dlq_job_added(&dlq_job.queue_name, &dlq_job.task_identifier, &dlq_job.failure_reason); + // Bounded label: "parallel" / "serial" rather than the raw queue name + // (which may be "" for parallel-origin jobs and unbounded for serial). + crate::metrics::record_dlq_job_added( + crate::metrics::queue_metric_label_from_name(&dlq_job.queue_name), + &dlq_job.task_identifier, + &dlq_job.failure_reason, + ); log::info!( "Job moved to DLQ (dlq_id: {}, task: {}, failure_reason: {})", @@ -594,23 +650,33 @@ impl BackfillClient { /// Returns the number of jobs moved to the DLQ. pub async fn process_failed_jobs(&self) -> Result { // Find jobs that have failed permanently (attempts >= max_attempts) - // and haven't been processed yet + // and aren't already in the DLQ. + // + // NOT EXISTS is preferred over NOT IN here: it scales better at large + // DLQ sizes (PostgreSQL can hash-anti-join), and it handles NULL + // values in `backfill_dlq.original_job_id` cleanly without the + // COALESCE workaround that NOT IN requires (NULL in a NOT IN list + // makes the whole predicate UNKNOWN, which silently filters + // everything out). let find_failed_jobs_query = format!( r#" SELECT jobs.id, tasks.identifier AS task_identifier, job_queues.queue_name, jobs.priority, jobs.key as job_key, jobs.max_attempts, jobs.attempts, jobs.last_error, jobs.created_at, jobs.run_at, jobs.updated_at, jobs.payload - FROM {}._private_jobs AS jobs - INNER JOIN {}._private_tasks AS tasks ON tasks.id = jobs.task_id - LEFT JOIN {}._private_job_queues AS job_queues ON job_queues.id = jobs.job_queue_id + FROM {schema}._private_jobs AS jobs + INNER JOIN {schema}._private_tasks AS tasks ON tasks.id = jobs.task_id + LEFT JOIN {schema}._private_job_queues AS job_queues ON job_queues.id = jobs.job_queue_id WHERE jobs.attempts >= jobs.max_attempts AND jobs.max_attempts > 0 - AND jobs.id NOT IN (SELECT COALESCE(original_job_id, -1) FROM {}.backfill_dlq) + AND NOT EXISTS ( + SELECT 1 FROM {schema}.backfill_dlq dlq + WHERE dlq.original_job_id = jobs.id + ) ORDER BY jobs.updated_at ASC LIMIT 100 "#, - self.schema, self.schema, self.schema, self.schema + schema = self.schema ); let failed_jobs = sqlx::query(&find_failed_jobs_query).fetch_all(&self.pool).await?; @@ -636,17 +702,32 @@ impl BackfillClient { // Convert last_error from TEXT to JSONB for DLQ table let last_error_json = last_error.map(serde_json::Value::String); - // Move to DLQ using UPSERT to handle requeued jobs that fail again - let upsert_dlq_query = format!( + // Atomic DELETE + UPSERT in a single statement using a writable + // CTE. Either the row is gone from `_private_jobs` AND in + // `backfill_dlq`, or neither happened — no partial-failure window + // where the job ends up in both tables. The INSERT's SELECT + // sources from `deleted`, so when DELETE finds nothing + // (e.g., another worker already moved the row) the INSERT runs + // zero times. + // failure_count semantics: count of DLQ-touch events for this job + // (1 on first move, +1 each subsequent requeue-fail). See + // DlqJob::failure_count docstring. + let move_query = format!( r#" + WITH deleted AS ( + DELETE FROM {schema}._private_jobs WHERE id = $1 RETURNING 1 + ) INSERT INTO {schema}.backfill_dlq ( original_job_id, task_identifier, payload, queue_name, priority, job_key, max_attempts, failure_reason, failure_count, last_error, original_created_at, original_run_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ) + SELECT $1::bigint, $2::text, $3::jsonb, $4::text, $5::int, $6::text, + $7::int, $8::text, 1, $9::jsonb, $10::timestamptz, $11::timestamptz + FROM deleted ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET failed_at = NOW(), - failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count, + failure_count = {schema}.backfill_dlq.failure_count + 1, failure_reason = EXCLUDED.failure_reason, last_error = EXCLUDED.last_error, original_job_id = EXCLUDED.original_job_id @@ -656,52 +737,45 @@ impl BackfillClient { let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts); - let upsert_result = sqlx::query(&upsert_dlq_query) + let move_result = sqlx::query(&move_query) .bind(job_id) .bind(&task_identifier) .bind(&payload) .bind(&queue_name) - .bind(priority) + .bind(priority as i32) .bind(&job_key) .bind(max_attempts as i32) .bind(failure_reason) - .bind(attempts as i32) .bind(&last_error_json) .bind(created_at) .bind(run_at) .execute(&self.pool) .await; - match upsert_result { + match move_result { + Ok(result) if result.rows_affected() > 0 => { + moved_count += 1; + log::info!( + "Successfully moved failed job to DLQ (job_id: {}, task: {}, attempts: {}/{})", + job_id, + task_identifier, + attempts, + max_attempts + ); + } Ok(_) => { - // Successfully moved to DLQ, now remove from main jobs table - let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema); - match sqlx::query(&delete_query).bind(job_id).execute(&self.pool).await { - Ok(_) => { - moved_count += 1; - log::info!( - "Successfully moved failed job to DLQ (job_id: {}, task: {}, attempts: {}/{})", - job_id, - task_identifier, - attempts, - max_attempts - ); - } - Err(e) => { - log::error!( - "Failed to delete job from main table after DLQ insertion (job_id: {}, task: {}, error: {})", - job_id, - task_identifier, - e - ); - // Consider this a partial failure - job is in DLQ - // but also still in main table - } - } + // DELETE found no row — likely another worker beat us to + // it. Not a problem; the job either already reached DLQ + // via the other worker or is still progressing normally. + log::debug!( + "DLQ move skipped (job_id: {}, task: {}); row no longer present in _private_jobs", + job_id, + task_identifier + ); } Err(e) => { log::error!( - "Failed to insert job into DLQ (job_id: {}, task: {}, error: {})", + "Failed to move job to DLQ (job_id: {}, task: {}, error: {})", job_id, task_identifier, e @@ -719,14 +793,17 @@ impl BackfillClient { /// Process failed jobs continuously in a background task. /// - /// This spawns a background task that periodically scans for failed jobs - /// and moves them to the DLQ. The task runs until the provided cancellation - /// token is triggered. + /// Spawns a background task that periodically scans for failed jobs and + /// moves them to the DLQ. Runs until the cancellation token is triggered. + /// + /// On consecutive errors the wait between scans grows exponentially — + /// `interval`, `2*interval`, `4*interval`, … capped at `32*interval` — + /// so a transient outage doesn't produce log spam at fixed cadence. + /// On any successful scan the wait resets to `interval`. /// /// # Arguments - /// * `interval` - How often to scan for failed jobs - /// * `cancellation_token` - Token to signal when to stop the background - /// task + /// * `interval` - How often to scan for failed jobs (steady state) + /// * `cancellation_token` - Signals when to stop the background task /// /// # Returns /// A JoinHandle for the background task @@ -735,36 +812,56 @@ impl BackfillClient { interval: std::time::Duration, cancellation_token: tokio_util::sync::CancellationToken, ) -> tokio::task::JoinHandle<()> { + // 2^5 = 32x interval cap. With the default 60s interval that's ~32min + // between attempts under sustained failure, which is far enough apart + // to be quiet but close enough to still recover quickly when the + // underlying issue clears. + const MAX_BACKOFF_SHIFT: u32 = 5; + let client = self.clone(); tokio::spawn(async move { - let mut interval_timer = tokio::time::interval(interval); - interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - log::info!( "Starting DLQ processor background task (interval_seconds: {})", interval.as_secs() ); + let mut consecutive_errors: u32 = 0; + loop { + // Run the scan first (immediate on first iteration, then after + // each subsequent wait — matches the prior tokio::interval + // behaviour where the first tick fires at t=0). + let mut next_wait = interval; + match client.process_failed_jobs().await { + Ok(count) => { + if count > 0 { + log::info!("DLQ processor moved failed jobs (moved_jobs: {})", count); + } + consecutive_errors = 0; + } + Err(e) => { + consecutive_errors = consecutive_errors.saturating_add(1); + let shift = consecutive_errors.min(MAX_BACKOFF_SHIFT); + let factor = 1u32 << shift; + next_wait = interval * factor; + log::error!( + "DLQ processor encountered error (consecutive: {}, next attempt in: {:?}): {}", + consecutive_errors, + next_wait, + e + ); + } + } + + // Wait or shut down. Cancellation always wins ties. tokio::select! { + biased; _ = cancellation_token.cancelled() => { log::info!("DLQ processor shutting down"); break; } - _ = interval_timer.tick() => { - match client.process_failed_jobs().await { - Ok(count) if count > 0 => { - log::info!("DLQ processor moved failed jobs (moved_jobs: {})", count); - } - Ok(_) => { - // No jobs moved, no need to log - } - Err(e) => { - log::error!("DLQ processor encountered error: {}", e); - } - } - } + _ = tokio::time::sleep(next_wait) => {} } } }) diff --git a/src/client/enqueue.rs b/src/client/enqueue.rs index 85a5864..dc83801 100644 --- a/src/client/enqueue.rs +++ b/src/client/enqueue.rs @@ -95,14 +95,18 @@ impl BackfillClient { .add_raw_job(task_identifier, serde_json::to_value(payload)?, spec.clone().into()) .await; + // Record duration once for every outcome — success, already-in-progress, + // and error — so the histogram isn't biased toward fast successes. + crate::metrics::record_db_operation_duration("enqueue", start.elapsed().as_secs_f64()); + match result { Ok(job) => { - // Record successful enqueue crate::metrics::record_db_operation("enqueue", "success"); - crate::metrics::record_db_operation_duration("enqueue", start.elapsed().as_secs_f64()); - // Record job enqueued metric - crate::metrics::record_job_enqueued(spec.queue.as_str(), task_identifier, spec.priority.0); + // Record job enqueued metric. Use the bounded `metric_label()` + // (either "parallel" or "serial") to avoid exploding the label + // cardinality when callers use `Queue::serial_for(entity, id)`. + crate::metrics::record_job_enqueued(spec.queue.metric_label(), task_identifier, spec.priority.0); log::debug!( "Job enqueued (job_id: {}, task: {}, queue: {}, priority: {})", @@ -120,10 +124,9 @@ impl BackfillClient { if is_row_not_found(&e) && job_key.is_some() { // Job with this key is already locked/in progress crate::metrics::record_db_operation("enqueue", "already_in_progress"); - crate::metrics::record_db_operation_duration("enqueue", start.elapsed().as_secs_f64()); - // Record the already_in_progress metric - crate::metrics::record_job_already_in_progress(spec.queue.as_str(), task_identifier); + // Record the already_in_progress metric (bounded label) + crate::metrics::record_job_already_in_progress(spec.queue.metric_label(), task_identifier); let key = job_key.clone().unwrap_or_else(|| "".to_string()); log::debug!( diff --git a/src/client/queries.rs b/src/client/queries.rs deleted file mode 100644 index 8af0299..0000000 --- a/src/client/queries.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Compile-time checked SQLx queries -//! -//! This module contains examples of how to use SQLx macros for compile-time -//! verification of database queries. These provide type safety and guarantee -//! that your queries will work at runtime. -//! -//! ## Setup Requirements -//! -//! To use compile-time query verification, you need either: -//! -//! ### Option 1: Online Mode (Recommended) -//! Set the `DATABASE_URL` environment variable: -//! ```bash -//! export DATABASE_URL="postgresql://user:password@localhost:5432/database" -//! ``` -//! -//! ### Option 2: Offline Mode -//! Generate query metadata using: -//! ```bash -//! cargo sqlx prepare -//! ``` -//! This creates `.sqlx/sqlx-data.json` with query metadata. -//! -//! ## Usage Example -//! -//! ```rust,no_run -//! use sqlx::PgPool; -//! use crate::BackfillError; -//! -//! /// Example of compile-time checked query -//! pub async fn get_job_count_safe(pool: &PgPool) -> Result { -//! let result = sqlx::query!( -//! "SELECT COUNT(*) as count FROM graphile_worker.jobs" -//! ) -//! .fetch_one(pool) -//! .await?; -//! -//! Ok(result.count.unwrap_or(0)) -//! } -//! -//! /// Example with parameters -//! pub async fn get_jobs_by_task_safe( -//! pool: &PgPool, -//! task_name: &str -//! ) -> Result, BackfillError> { -//! let rows = sqlx::query!( -//! r#" -//! SELECT id, task_identifier, queue_name, priority, created_at -//! FROM graphile_worker.jobs -//! WHERE task_identifier = $1 -//! ORDER BY created_at DESC -//! "#, -//! task_name -//! ) -//! .fetch_all(pool) -//! .await?; -//! -//! let jobs = rows.into_iter().map(|row| JobInfo { -//! id: row.id, -//! task_identifier: row.task_identifier, -//! queue_name: row.queue_name, -//! priority: row.priority, -//! created_at: row.created_at, -//! }).collect(); -//! -//! Ok(jobs) -//! } -//! ``` -//! -//! ## Benefits -//! -//! - **Type Safety**: Column names and types are verified at compile time -//! - **Runtime Safety**: Guaranteed that queries will execute successfully -//! - **Refactoring Safety**: Schema changes cause compile errors, not runtime errors -//! - **Performance**: No runtime query parsing overhead -//! -//! ## Migration Strategy -//! -//! For existing projects, migrate gradually: -//! 1. Start with high-risk queries (production critical paths) -//! 2. Use `sqlx::query!()` for SELECT queries first -//! 3. Move to `sqlx::query_as!()` for complex mappings -//! 4. Convert INSERT/UPDATE/DELETE queries last -//! -//! ## CI/CD Integration -//! -//! Add to your CI pipeline: -//! ```yaml -//! # GitHub Actions example -//! - name: Check SQLx queries -//! run: cargo sqlx prepare --check -//! env: -//! DATABASE_URL: ${{ secrets.DATABASE_URL }} -//! ``` - -use chrono::{DateTime, Utc}; - -/// Example job information structure -#[derive(Debug)] -pub struct JobInfo { - pub id: i64, - pub task_identifier: String, - pub queue_name: Option, - pub priority: i32, - pub created_at: DateTime, -} - -/// Example DLQ information structure for compile-time queries -#[derive(Debug)] -pub struct DlqJobInfo { - pub id: i64, - pub task_identifier: String, - pub failed_at: DateTime, - pub failure_count: i32, -} - -// Note: Actual query implementations would go here if DATABASE_URL is available -// For now, this serves as documentation and examples for users to implement diff --git a/src/lib.rs b/src/lib.rs index e4aaf41..1dd3679 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,14 @@ //! - **Parallel execution** by default - jobs run concurrently across all //! workers //! - **Serial queues** when you need ordering or rate limiting -//! - **Exponential backoff** with jitter to prevent thundering herds -//! - **Flexible retry policies** (fast, aggressive, conservative, or custom) +//! - **Exponential backoff retries** via graphile_worker (timing is +//! `exp(min(attempts, 10))` seconds; fixed, not per-job tunable — see +//! [`RetryPolicy`]) +//! - **Configurable max-attempts per job** with `fast`, `aggressive`, and +//! `conservative` presets +//! - **Permanent-failure short-circuit** — non-retryable `WorkerError` variants +//! land in the DLQ on the first failure instead of waiting for `max_attempts` +//! to exhaust //! - **Dead letter queue** handling for failed jobs //! - **Type-safe job handlers** using Rust's type system //! - **Low-latency execution** via PostgreSQL LISTEN/NOTIFY @@ -62,6 +68,23 @@ //! ``` use chrono::{DateTime, Utc}; +// === Re-exports from `graphile_worker` === +// +// The types below are part of backfill's *public* API surface — users +// importing `backfill::*` see them and write code against them. As a +// consequence, every graphile_worker minor or patch release that touches +// any of these types (renames, signature changes, removal) is a breaking +// change for backfill, even when graphile_worker itself doesn't intend +// it as one (graphile_worker is pre-1.0; minor bumps are allowed to be +// breaking under semver, and patch releases occasionally are too). +// +// When upgrading the graphile_worker dependency: +// 1. Re-run the integration test suite (`cargo nextest run -F axum`). +// 2. Audit this re-export list against the new graphile_worker for any rename/removal — those need a corresponding +// backfill major bump and migration note for downstream users. +// 3. Audit `_private_*` schema usage in src/client/dlq.rs, src/client/cleanup.rs, and src/admin.rs — +// graphile_worker reserves the right to change those tables across versions. +// // Lifecycle hooks for plugins - new Plugin API with event registration pub use graphile_worker::{ // Event types (for hooks.on() registration) @@ -113,6 +136,7 @@ use serde::Serialize; mod client; mod dlq_cleanup_plugin; mod errors; +mod permanent_failure_plugin; mod priorities; mod retries; mod worker; @@ -128,6 +152,7 @@ pub use client::cleanup::{ pub use client::*; pub use dlq_cleanup_plugin::DlqCleanupPlugin; pub use errors::{BackfillError, WorkerError}; +pub use permanent_failure_plugin::PermanentFailurePlugin; pub use priorities::*; pub use retries::*; pub use worker::*; @@ -227,10 +252,13 @@ impl Queue { } } - /// Returns a string representation for logging/metrics. + /// Returns a string representation for logging. /// /// Returns "parallel" for parallel queues, or the queue name for serial - /// queues. + /// queues. **Do not use this for metric labels** — serial queue names can + /// have unbounded cardinality (e.g., `Queue::serial_for("user", id)` + /// produces a different name per user, which would explode Prometheus + /// time-series storage). Use [`Queue::metric_label`] for metrics. pub fn as_str(&self) -> &str { match self { Queue::Parallel => "parallel", @@ -238,6 +266,20 @@ impl Queue { } } + /// Bounded label suitable for metrics: returns either `"parallel"` or + /// `"serial"`. + /// + /// This drops the queue name. The library uses this for all built-in + /// metric emission (`backfill_jobs_enqueued`, `backfill_dlq_*`, etc.) to + /// keep label cardinality bounded. If you need per-queue metrics for a + /// fixed, small set of named queues, emit them yourself via a plugin. + pub fn metric_label(&self) -> &'static str { + match self { + Queue::Parallel => "parallel", + Queue::Serial(_) => "serial", + } + } + /// Returns true if this is a parallel (non-serialized) queue. pub fn is_parallel(&self) -> bool { matches!(self, Queue::Parallel) @@ -256,17 +298,45 @@ impl Queue { /// Outcome of an enqueue operation. /// -/// When enqueueing a job, the result can either be: -/// - `Enqueued(Job)`: The job was successfully created or updated -/// - `AlreadyInProgress { job_key }`: A job with this key is currently locked -/// by a worker +/// `enqueue` returns `Result`. This enum distinguishes +/// the two non-error outcomes: +/// +/// - [`EnqueueOutcome::Enqueued`] — the job was created (or, with +/// `JobKeyMode::Replace`, updated). The boxed `Job` carries the job's +/// id/queue_id/etc. +/// - [`EnqueueOutcome::AlreadyInProgress`] — a job with the same `job_key` was +/// **currently locked by a worker** when we tried to add this one. This is +/// *not* a duplicate-key collision (that's handled by `job_key_mode`); it's a +/// race where the worker grabbed the existing job before our update could +/// land. The new payload was discarded. +/// +/// # ⚠️ Footgun warning +/// +/// Treating every `Ok(_)` return as "the job is enqueued" is wrong. A +/// caller who writes: +/// +/// ```ignore +/// // BUG: silently drops AlreadyInProgress as if it were success +/// let job_id = client.enqueue(...).await?.unwrap().id(); +/// ``` +/// +/// will panic at runtime any time the race fires. Always pattern-match, +/// or use [`EnqueueOutcome::is_already_in_progress`] / [`EnqueueOutcome::job`] +/// to handle the case explicitly. `.unwrap()` and `.expect()` panic on +/// `AlreadyInProgress` by design — they're only safe when you're certain +/// no worker is holding the key. #[derive(Debug, Clone)] pub enum EnqueueOutcome { - /// Job was successfully enqueued (either created or updated) + /// Job was successfully enqueued (either created or updated). Enqueued(Box), /// A job with this key is already in progress (locked by a worker). - /// Contains the job_key that was in conflict. - AlreadyInProgress { job_key: String }, + /// The new payload was **not** stored — your update is lost. Decide in + /// the caller whether to retry, queue a different job, or accept the + /// drop. + AlreadyInProgress { + /// The job_key that conflicted. + job_key: String, + }, } impl EnqueueOutcome { @@ -360,14 +430,22 @@ impl Default for JobSpec { } impl JobSpec { - /// Create a JobSpec with exponential backoff retry policy + /// Attach a [`RetryPolicy`] to this JobSpec. + /// + /// Sets `max_attempts` from the policy. Note that backoff timing fields + /// on the policy are stored but not honored at runtime — see + /// [`RetryPolicy`]. pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self { self.max_attempts = Some(retry_policy.max_attempts); self.retry_policy = Some(retry_policy); self } - /// Create a JobSpec optimized for fast retries + /// Configure for the `fast` preset: `max_attempts = 3`. + /// + /// In practice this differs from [`with_aggressive_retries`] and + /// [`with_conservative_retries`] only in the attempt count — see + /// [`RetryPolicy`]. pub fn with_fast_retries(mut self) -> Self { let policy = RetryPolicy::fast(); self.max_attempts = Some(policy.max_attempts); @@ -375,7 +453,10 @@ impl JobSpec { self } - /// Create a JobSpec optimized for aggressive retries + /// Configure for the `aggressive` preset: `max_attempts = 12`. + /// + /// At graphile_worker's fixed exp-backoff schedule, 12 attempts gives + /// roughly half a day of cumulative retry coverage before DLQ. pub fn with_aggressive_retries(mut self) -> Self { let policy = RetryPolicy::aggressive(); self.max_attempts = Some(policy.max_attempts); @@ -383,7 +464,7 @@ impl JobSpec { self } - /// Create a JobSpec optimized for conservative retries + /// Configure for the `conservative` preset: `max_attempts = 5`. pub fn with_conservative_retries(mut self) -> Self { let policy = RetryPolicy::conservative(); self.max_attempts = Some(policy.max_attempts); @@ -391,15 +472,28 @@ impl JobSpec { self } - /// Get the effective retry policy (returns default if none specified) + /// Get the effective retry policy (returns default if none specified). + /// + /// **Note:** Only `max_attempts` from the returned policy reaches + /// graphile_worker. See [`RetryPolicy`] for details. pub fn effective_retry_policy(&self) -> RetryPolicy { self.retry_policy.clone().unwrap_or_default() } - /// Calculate the next retry time for a failed job + /// Calculate what the next retry time *would* be under this spec's + /// policy. + /// + /// **Not used at runtime.** graphile_worker schedules retries via a + /// fixed SQL formula. This method is preserved as a utility but has no + /// effect on actual job behaviour. + #[deprecated( + since = "1.2.0", + note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." + )] pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime) -> Option> { let policy = self.effective_retry_policy(); if policy.should_retry(attempt) { + #[allow(deprecated)] Some(policy.calculate_retry_time(attempt, failed_at)) } else { None // No more retries @@ -492,9 +586,11 @@ pub async fn enqueue_emergency( where T: Serialize, { + // run_at defaults to None, which graphile_worker resolves to NOW() at the + // SQL layer — equivalent to "execute immediately" without the extra Rust- + // side clock read. let spec = JobSpec { priority: Priority::EMERGENCY, - run_at: Some(Utc::now()), // Execute immediately job_key, ..Default::default() }; @@ -502,10 +598,12 @@ where client.enqueue(task_identifier, payload, spec).await } -/// Enqueue a high-priority job with fast exponential backoff retries. +/// Enqueue a high-priority job configured for a low retry count (3 attempts). /// -/// Best for high-priority jobs that need quick retries (3 attempts, 100ms-30s -/// delays). +/// Use for jobs where rapid failure-to-DLQ is preferred over many retries. +/// graphile_worker's retry timing is fixed at `exp(min(attempts, 10))` seconds +/// regardless of policy — see [`RetryPolicy`] — so the only difference between +/// this and other `_with_retries` helpers is the attempt cap. pub async fn enqueue_fast_with_retries( client: &BackfillClient, task_identifier: &str, @@ -525,9 +623,12 @@ where client.enqueue(task_identifier, payload, spec).await } -/// Enqueue a critical job with aggressive exponential backoff retries. +/// Enqueue a critical job with a high retry count (12 attempts). /// -/// Best for critical jobs that must succeed (12 attempts, up to 4 hour delays). +/// Use for jobs that must eventually succeed if at all possible. +/// graphile_worker retries on a fixed `exp(min(attempts, 10))` second schedule, +/// capping at ~6h per retry — so 12 attempts gives roughly half a day of total +/// retry coverage. See [`RetryPolicy`] for the full timing. pub async fn enqueue_critical( client: &BackfillClient, task_identifier: &str, @@ -547,10 +648,13 @@ where client.enqueue(task_identifier, payload, spec).await } -/// Enqueue a bulk job with conservative exponential backoff retries. +/// Enqueue a bulk job with a moderate retry count (5 attempts via the +/// `conservative` preset). /// -/// Best for background jobs where consistency matters more than speed -/// (8 attempts, 1 min - 8 hour delays). +/// Use for background jobs that should be retried but where you don't want +/// many attempts. graphile_worker's retry timing is `exp(min(attempts, 10))` +/// seconds — see [`RetryPolicy`] — so this gives roughly 1s, 3s, 7s, 20s, +/// 55s before the job lands in DLQ. pub async fn enqueue_bulk_with_retries( client: &BackfillClient, task_identifier: &str, @@ -640,6 +744,18 @@ mod tests { assert_eq!(Queue::Parallel.as_str(), "parallel"); } + #[test] + fn queue_metric_label_is_bounded() { + // The whole point of metric_label vs as_str: per-entity serial queues + // (e.g., "user:123") collapse to "serial" instead of carrying the + // unbounded entity id into Prometheus labels. + assert_eq!(Queue::Parallel.metric_label(), "parallel"); + assert_eq!(Queue::serial("anything").metric_label(), "serial"); + assert_eq!(Queue::serial_for("user", 12345).metric_label(), "serial"); + assert_eq!(Queue::serial_for("user", 99999).metric_label(), "serial"); + assert_eq!(Queue::dead_letter().metric_label(), "serial"); + } + #[test] fn queue_serial_creation() { let queue = Queue::serial("test"); diff --git a/src/metrics.rs b/src/metrics.rs index 694cd9d..54a60c2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -89,6 +89,15 @@ pub(crate) fn record_job_already_in_progress(queue: &str, task: &str) { .increment(1); } +/// Convert a stored DLQ `queue_name` (which may be "" for parallel-origin +/// jobs) into a bounded metric label of either `"parallel"` or `"serial"`. +/// +/// Mirrors [`crate::Queue::metric_label`] but operates on the post-storage +/// string form used by `DlqJob.queue_name`. +pub(crate) fn queue_metric_label_from_name(queue_name: &str) -> &'static str { + if queue_name.is_empty() { "parallel" } else { "serial" } +} + /// Record a job being added to DLQ pub(crate) fn record_dlq_job_added(queue: &str, task: &str, reason: &str) { metrics::counter!( diff --git a/src/permanent_failure_plugin.rs b/src/permanent_failure_plugin.rs new file mode 100644 index 0000000..5fd86cc --- /dev/null +++ b/src/permanent_failure_plugin.rs @@ -0,0 +1,129 @@ +//! Built-in plugin that short-circuits retries for non-retryable errors. +//! +//! `WorkerError` distinguishes retryable failures (`ConnectionTimeout`, +//! `RateLimitExceeded`, …) from non-retryable failures (`ValidationFailed`, +//! `Unauthorized`, …). Without this plugin, graphile_worker treats every +//! `Err` identically and keeps retrying until `max_attempts` is exhausted — +//! which can mean six hours of pointless retries for a malformed payload. +//! +//! With this plugin registered, when a handler returns an error that +//! `WorkerError::classify_from_message` flags as non-retryable, the job's +//! `attempts` is set to `max_attempts` so it lands in the DLQ on the next +//! processor tick instead of retrying. +//! +//! The plugin is auto-registered when the DLQ processor is enabled +//! (`WorkerConfig::dlq_processor_interval = Some(_)`). Without DLQ enabled +//! it would set `attempts = max_attempts` and the row would just sit in +//! `_private_jobs` forever — pointless and confusing — so the gating is +//! intentional. +//! +//! # Implementation note: why we can't use `permanently_fail_jobs` +//! +//! graphile_worker exposes a `permanently_fail_jobs(job_ids, reason)` SQL +//! function that does what we want, but it has a `WHERE locked_at IS NULL OR +//! locked_at < NOW() - 4h` guard to prevent racing with an active worker. +//! Inside the `JobFail` hook the worker (us) still owns the lock — +//! graphile_worker doesn't clear `locked_at` until `fail_job` runs *after* +//! our hook returns. So `permanently_fail_jobs` would silently no-op. +//! +//! We work around this with a direct UPDATE that filters by `locked_by = +//! ctx.worker_id` instead of the `locked_at IS NULL` guard. Same safety +//! property (only this worker can update its own row) but compatible with +//! the in-flight lock state. + +use graphile_worker::{HookRegistry, JobFail, JobFailContext, Plugin}; + +use crate::{BackfillClient, WorkerError}; + +/// Plugin that converts non-retryable handler errors into immediate DLQ +/// candidates by short-circuiting retry attempts. +/// +/// Auto-registered by `WorkerRunner` when DLQ is enabled. +#[derive(Clone)] +pub struct PermanentFailurePlugin { + client: BackfillClient, +} + +impl PermanentFailurePlugin { + /// Create a new permanent-failure plugin. + pub fn new(client: BackfillClient) -> Self { + Self { client } + } +} + +impl Plugin for PermanentFailurePlugin { + fn register(self, hooks: &mut HookRegistry) { + let client = self.client; + hooks.on(JobFail, move |ctx: JobFailContext| { + let client = client.clone(); + async move { + let classified = WorkerError::classify_from_message(ctx.error.clone()); + if classified.is_retryable() { + // Retryable error — let graphile_worker reschedule normally. + return; + } + + let job_id = *ctx.job.id(); + let task = ctx.job.task_identifier().to_string(); + + // Set attempts = max_attempts so the next `get_job()` doesn't + // pick this up (its predicate is `is_available`, which is a + // generated column = `locked_at IS NULL AND attempts < + // max_attempts`). The next DLQ processor tick will scan this + // row and move it to the DLQ. + // + // We filter by `locked_by = $3` to ensure we only mutate the + // row while we still own its lock. graphile_worker's + // `fail_job` will run immediately after this hook returns and + // will clear locked_at; the `attempts = max_attempts` we + // wrote here will persist (fail_job doesn't touch attempts). + let sql = format!( + "UPDATE {schema}._private_jobs \ + SET attempts = max_attempts, last_error = $2, updated_at = now() \ + WHERE id = $1 AND locked_by = $3", + schema = client.schema() + ); + let reason = format!("Permanent failure (non-retryable error): {}", ctx.error); + + match sqlx::query(&sql) + .bind(job_id) + .bind(&reason) + .bind(&ctx.worker_id) + .execute(client.pool()) + .await + { + Ok(result) if result.rows_affected() > 0 => { + log::info!( + "Short-circuited retries for non-retryable error (job_id: {}, task: {}, error: {})", + job_id, + task, + ctx.error + ); + } + Ok(_) => { + // No row updated — likely the lock changed hands or + // the row was deleted between fetch and hook. Rare + // but not catastrophic; fall back to normal retry. + log::warn!( + "Permanent-failure short-circuit affected zero rows (job_id: {}, task: {})", + job_id, + task + ); + } + Err(e) => { + // DB error — log and continue. graphile_worker's + // fail_job will run next and the job will retry once + // more (and likely fail again, hitting this hook on + // the next attempt). + log::warn!( + "Permanent-failure short-circuit failed; job will retry instead (job_id: {}, task: {}, error: {})", + job_id, + task, + e + ); + } + } + } + }); + } +} diff --git a/src/retries.rs b/src/retries.rs index d75f94d..609f4fa 100644 --- a/src/retries.rs +++ b/src/retries.rs @@ -1,19 +1,53 @@ -//! Retries: policy, implementation, and tests. +//! Retry policy. +//! +//! # ⚠️ Important: backoff timing is fixed, not configurable +//! +//! `RetryPolicy` exposes `initial_delay`, `max_delay`, `backoff_multiplier`, +//! and `jitter_factor` fields, but **graphile_worker does not honor them**. +//! graphile_worker schedules every retry on a hard-coded SQL formula: +//! +//! ```text +//! run_at = greatest(now(), run_at) + (exp(least(attempts, 10)) * interval '1 second') +//! ``` +//! +//! Concretely: ~1s after the 1st failure, ~2.7s after the 2nd, ~7.4s after +//! the 3rd, ~20s, ~55s, ~2.5min, ~6.7min, ~18min, ~49min, ~2.2h, ~6h (capped +//! at attempts ≥ 10). This formula applies to every job regardless of which +//! `RetryPolicy` the caller configured. +//! +//! **The only `RetryPolicy` field that actually affects job behaviour is +//! `max_attempts`**, which is forwarded to graphile_worker's `JobSpec`. +//! Methods that compute delays (`calculate_delay`, `calculate_retry_time`, +//! `with_jitter`) are deprecated — they were dead code masquerading as +//! configuration. The `fast()`, `aggressive()`, and `conservative()` presets +//! are kept for compatibility but in practice differ only in attempt count. +//! +//! Per-job backoff customisation requires upstream support in +//! `graphile_worker` and is not currently available. use std::time::Duration; -/// Configuration for exponential backoff retry policy. +/// Configuration for retry behaviour. +/// +/// **Only `max_attempts` affects runtime behaviour.** The other fields are +/// retained for source-compatibility but graphile_worker computes retry +/// timing from a fixed SQL formula (`exp(min(attempts, 10))` seconds) — see +/// the module-level docs for details. #[derive(Debug, Clone)] pub struct RetryPolicy { - /// Maximum number of retry attempts + /// Maximum number of retry attempts. **Honored at runtime.** pub max_attempts: i32, - /// Initial delay between retries + /// Initial delay between retries. **NOT honored** — graphile_worker uses + /// `exp(min(attempts, 10))` seconds regardless. pub initial_delay: Duration, - /// Maximum delay between retries + /// Maximum delay between retries. **NOT honored** — graphile_worker's + /// formula caps naturally at attempts ≥ 10 (~6 hours). pub max_delay: Duration, - /// Backoff multiplier (typically 2.0 for exponential backoff) + /// Backoff multiplier. **NOT honored** — graphile_worker's formula uses + /// `exp()`, not a configurable multiplier. pub backoff_multiplier: f64, - /// Add jitter to prevent thundering herd (0.0 to 1.0, default 0.1) + /// Jitter factor. **NOT honored** — graphile_worker's formula adds no + /// jitter. pub jitter_factor: f64, } @@ -30,7 +64,17 @@ impl Default for RetryPolicy { } impl RetryPolicy { - /// Create a new RetryPolicy with custom settings + /// Create a new RetryPolicy with custom settings. + /// + /// **Note:** Only `max_attempts` affects runtime behaviour. The other + /// arguments are stored on the returned policy but never reach + /// graphile_worker. See the module-level docs. + #[deprecated( + since = "1.2.0", + note = "graphile_worker honors only max_attempts; the timing arguments are dead config. \ + Construct directly with RetryPolicy { max_attempts: n, ..Default::default() } \ + or use a preset like RetryPolicy::fast()." + )] pub fn new(max_attempts: i32, initial_delay: Duration, max_delay: Duration, backoff_multiplier: f64) -> Self { Self { max_attempts, @@ -41,13 +85,26 @@ impl RetryPolicy { } } - /// Create a RetryPolicy with jitter configuration + /// Configure jitter on this policy. **Not applied at runtime.** + #[deprecated( + since = "1.2.0", + note = "jitter_factor is not honored by graphile_worker; this method has no runtime effect." + )] pub fn with_jitter(mut self, jitter_factor: f64) -> Self { self.jitter_factor = jitter_factor.clamp(0.0, 1.0); self } - /// Calculate the delay for a specific attempt number (0-based) + /// Calculate what the delay *would* be for a given attempt number under + /// this policy. + /// + /// **Not used at runtime.** graphile_worker schedules retries via a + /// fixed SQL formula and ignores this calculation. Retained as a + /// utility for tests and future use. + #[deprecated( + since = "1.2.0", + note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." + )] pub fn calculate_delay(&self, attempt: i32) -> Duration { if attempt >= self.max_attempts { return Duration::ZERO; @@ -81,12 +138,20 @@ impl RetryPolicy { Duration::from_millis(final_delay_ms as u64) } - /// Calculate the run_at time for a retry + /// Calculate what the run_at time *would* be for a retry under this + /// policy. + /// + /// **Not used at runtime.** See [`RetryPolicy::calculate_delay`]. + #[deprecated( + since = "1.2.0", + note = "graphile_worker computes retry timing in SQL and ignores this method. Returns a value but has no runtime effect." + )] pub fn calculate_retry_time( &self, attempt: i32, base_time: chrono::DateTime, ) -> chrono::DateTime { + #[allow(deprecated)] let delay = self.calculate_delay(attempt); base_time + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::MAX) } @@ -101,7 +166,11 @@ impl RetryPolicy { self.max_attempts + 1 // +1 for the initial attempt } - /// Create a fast retry policy for high-priority jobs + /// Preset for high-priority jobs that need quick turnaround. + /// + /// `max_attempts = 3`. Other fields are stored but not honored — see the + /// module-level docs. In practice this preset differs from `aggressive()` + /// and `conservative()` only in attempt count. pub fn fast() -> Self { Self { max_attempts: 3, @@ -112,23 +181,29 @@ impl RetryPolicy { } } - /// Create an aggressive retry policy for critical jobs + /// Preset for critical jobs that should keep retrying for a long time. + /// + /// `max_attempts = 12` — graphile_worker's exponential backoff caps at + /// ~6h per retry once attempts ≥ 10, so this gives roughly half a day of + /// retry coverage. Other fields are stored but not honored. pub fn aggressive() -> Self { Self { max_attempts: 12, initial_delay: Duration::from_millis(500), - max_delay: Duration::from_secs(600), // 10 minutes + max_delay: Duration::from_secs(600), backoff_multiplier: 1.5, jitter_factor: 0.15, } } - /// Create a conservative retry policy for bulk jobs + /// Preset for bulk jobs where consistency matters more than latency. + /// + /// `max_attempts = 5`. Other fields are stored but not honored. pub fn conservative() -> Self { Self { max_attempts: 5, initial_delay: Duration::from_secs(5), - max_delay: Duration::from_secs(1800), // 30 minutes + max_delay: Duration::from_secs(1800), backoff_multiplier: 2.5, jitter_factor: 0.2, } @@ -136,6 +211,7 @@ impl RetryPolicy { } #[cfg(test)] +#[allow(deprecated)] // tests still exercise the deprecated math helpers mod tests { use super::*; use crate::JobSpec; diff --git a/src/worker.rs b/src/worker.rs index ce2b1a8..91b5b68 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -101,19 +101,31 @@ use tokio_util::sync::CancellationToken; use crate::{BackfillClient, BackfillError, Plugin, TaskHandler, WorkerOptions}; // Re-export for use in wrapper -/// Configuration for a worker queue +/// Configuration for a worker queue. +/// +/// **Currently only `concurrency` is honored.** `name` and `priority_range` +/// were intended for multi-queue worker setups, but graphile_worker's +/// `WorkerOptions` doesn't expose per-worker queue filtering, so a single +/// `WorkerRunner` runs exactly one worker and consumes only the first +/// `QueueConfig` in `WorkerConfig::queue_configs`. To run multiple workers +/// with different queue specializations, spawn multiple `WorkerRunner`s. #[derive(Debug, Clone)] pub struct QueueConfig { - /// Queue name (None for default queue) + /// Queue name (None for default queue). **Not honored** — the worker + /// processes jobs from any queue regardless of this field. pub name: Option, - /// Number of concurrent jobs to process in this queue + /// Number of concurrent jobs to process. **Honored.** pub concurrency: usize, - /// Priority range for jobs in this queue (inclusive) + /// Priority range for jobs in this queue. **Not honored** — the worker + /// fetches by priority order with no range filter. pub priority_range: Option<(i32, i32)>, } impl QueueConfig { - /// Create configuration for the default queue + /// Create configuration for the default queue with the given concurrency. + /// + /// This is the only `QueueConfig` constructor where every field is + /// actually honored at runtime. pub fn default_queue(concurrency: usize) -> Self { Self { name: None, @@ -122,7 +134,17 @@ impl QueueConfig { } } - /// Create configuration for a named queue + /// Create configuration for a named queue. + /// + /// **Deprecated**: the `name` field is not honored at worker startup — + /// graphile_worker's `WorkerOptions` doesn't filter jobs by queue. Use + /// [`WorkerConfig::with_concurrency`] instead, and route jobs to + /// specific named queues at enqueue time via `Queue::serial(name)`. + #[deprecated( + since = "1.2.0", + note = "queue name is not honored by graphile_worker's WorkerOptions; use WorkerConfig::with_concurrency \ + and route jobs to named queues at enqueue time via Queue::serial(name)" + )] pub fn named_queue(name: impl Into, concurrency: usize) -> Self { Self { name: Some(name.into()), @@ -131,7 +153,16 @@ impl QueueConfig { } } - /// Create configuration for a priority-based queue + /// Create configuration for a priority-based queue. + /// + /// **Deprecated**: neither `name` nor `priority_range` is honored at + /// runtime. The worker fetches jobs by priority order (lower number + /// first) regardless of any range configured here. Use + /// [`WorkerConfig::with_concurrency`] instead. + #[deprecated( + since = "1.2.0", + note = "priority_range is never honored by the worker fetch loop; this constructor stores values that are dead. Use WorkerConfig::with_concurrency." + )] pub fn priority_queue(name: impl Into, concurrency: usize, min_priority: i32, max_priority: i32) -> Self { Self { name: Some(name.into()), @@ -203,7 +234,28 @@ impl WorkerConfig { self } - /// Set queue configurations + /// Set the worker's concurrency. + /// + /// `WorkerRunner` runs a single graphile_worker `Worker` whose + /// `concurrency` setting determines how many jobs it can execute in + /// parallel. To run multiple specialized workers, spawn multiple + /// `WorkerRunner` instances yourself. + pub fn with_concurrency(mut self, concurrency: usize) -> Self { + self.queue_configs = vec![QueueConfig::default_queue(concurrency)]; + self + } + + /// Set queue configurations. + /// + /// **Deprecated**: only the first `QueueConfig`'s `concurrency` is + /// honored — graphile_worker's `WorkerOptions` doesn't expose + /// per-worker queue filtering, so subsequent configs are ignored. Use + /// [`Self::with_concurrency`] instead. Route jobs to named queues at + /// enqueue time via `Queue::serial(name)`. + #[deprecated( + since = "1.2.0", + note = "only the first QueueConfig's concurrency is honored; use with_concurrency() and route via Queue::serial(name) at enqueue time" + )] pub fn with_queues(mut self, queues: Vec) -> Self { self.queue_configs = queues; self @@ -518,11 +570,20 @@ impl WorkerRunner { worker_options = applier(worker_options); } - // If DLQ is enabled, add the cleanup plugin that removes DLQ entries - // when jobs with matching job_keys complete successfully + // If DLQ is enabled, register the built-in plugins that wire backfill's + // error semantics through to graphile_worker: + // + // - DlqCleanupPlugin: when a requeued DLQ job completes successfully, removes + // the matching DLQ entry so it isn't requeued again. + // - PermanentFailurePlugin: when a handler returns a non-retryable + // `WorkerError` variant (ValidationFailed, Unauthorized, etc.), + // short-circuits remaining retry attempts so the job lands in the DLQ on the + // next processor tick instead of waiting hours. if self.config.dlq_processor_interval.is_some() { let cleanup_plugin = crate::DlqCleanupPlugin::new(self.client.clone()); worker_options = worker_options.add_plugin(cleanup_plugin); + let permanent_failure_plugin = crate::PermanentFailurePlugin::new(self.client.clone()); + worker_options = worker_options.add_plugin(permanent_failure_plugin); } worker_options @@ -667,6 +728,39 @@ impl WorkerRunner { self.config.stale_lock_cleanup_interval.is_some() ); + // Move permanently-failed jobs to the DLQ BEFORE startup cleanup runs. + // + // `startup_cleanup_with_timeouts()` calls `cleanup_permanently_failed_jobs()` + // which deletes rows from `_private_jobs` where `attempts >= max_attempts`. + // The DLQ processor uses the SAME predicate to find candidates to move into + // the DLQ — but it runs as a periodic background task that hasn't ticked yet + // at startup. Without this pre-move, any job that hit `max_attempts` while + // the worker was down (or in the gap between final failure and the next + // DLQ-processor tick on the previous run) would be silently deleted by + // cleanup before it reached the DLQ. + // + // We only run this when DLQ is enabled. If the user opted out of DLQ via + // `dlq_processor_interval = None`, they've explicitly chosen the + // delete-failed-jobs behaviour and we leave it intact. + if self.config.dlq_processor_interval.is_some() { + match self.client.process_failed_jobs().await { + Ok(moved) if moved > 0 => { + log::info!( + "Pre-cleanup DLQ move captured permanently-failed jobs (moved: {})", + moved + ); + } + Ok(_) => { + // No failed jobs waiting; nothing to capture. + } + Err(e) => { + // If this fails, the next call to startup_cleanup may delete the + // affected jobs. We log loudly so an operator can investigate. + log::error!("Pre-cleanup DLQ move failed; jobs may be lost when cleanup runs: {}", e); + } + } + } + // Run startup cleanup with configured timeouts if let Err(e) = self .client @@ -783,33 +877,29 @@ impl WorkerRunner { tokio::spawn(async move { runner.run_until_cancelled(cancellation_token).await }) } - /// Process all currently available jobs and return + /// Process all currently available jobs and return. /// /// This method is designed for batch processing or testing scenarios where /// you want to process the current job queue without running a persistent /// worker. /// - /// This method processes all jobs that are currently available (where - /// `run_at <= now()`), respecting the configured concurrency limit. - /// Jobs are processed in priority order (lower priority number = higher - /// priority), then by `run_at` timestamp. - /// - /// The method returns when: - /// - All available jobs have been processed - /// - No more jobs are available to process + /// Processes all jobs where `run_at <= now()`, respecting the configured + /// concurrency limit. Jobs are processed in priority order (lower number = + /// higher priority), then by `run_at`. /// - /// Note: This method currently returns 0 as an accurate job count would - /// require additional instrumentation. The jobs are still processed - /// correctly. + /// Returns when all available jobs have been processed. /// - /// # Returns + /// # Counting jobs /// - /// Returns `Ok(0)` on success (job count tracking not yet implemented). + /// If you need to know how many jobs ran, register a `JobComplete` / + /// `JobFail` plugin via `add_plugin()` before building the worker — that's + /// the supported path for runtime job-count instrumentation. This method + /// no longer pretends to count for you. /// /// # Errors /// /// Returns an error if worker initialization or job processing fails. - pub async fn process_available_jobs(&self) -> Result { + pub async fn process_available_jobs(&self) -> Result<(), BackfillError> { log::info!("Processing available jobs (one-shot mode)"); // Create worker instance @@ -823,10 +913,7 @@ impl WorkerRunner { .map_err(|e| BackfillError::WorkerRuntime(e.to_string()))?; log::info!("Finished processing available jobs"); - - // Note: Returning 0 for now as accurate counting would require additional - // instrumentation. Consider using metrics or hooks to track job counts. - Ok(0) + Ok(()) } /// Get access to the underlying BackfillClient for job enqueueing and @@ -835,9 +922,14 @@ impl WorkerRunner { &self.client } - /// Get the number of worker instances configured + /// Get the number of worker instances actually running. + /// + /// Always returns 1: `WorkerRunner` spawns exactly one graphile_worker + /// `Worker`. The historical `Vec` API allowed callers to + /// pass multiple configs, but only the first was ever used at runtime — + /// this method now reports the truth instead of `queue_configs.len()`. pub fn worker_count(&self) -> usize { - self.config.queue_configs.len() + 1 } /// Check if DLQ processor is enabled @@ -851,6 +943,7 @@ mod tests { use super::*; #[test] + #[allow(deprecated)] // pins the deprecated constructors' storage shape fn test_queue_config_builders() { let default = QueueConfig::default_queue(5); assert_eq!(default.name, None); diff --git a/tests/dlq_tests.rs b/tests/dlq_tests.rs index 2e3fd11..8c026f2 100644 --- a/tests/dlq_tests.rs +++ b/tests/dlq_tests.rs @@ -1,8 +1,11 @@ //! Dead Letter Queue integration tests -use backfill::{BackfillClient, BackfillError, DlqFilter, JobSpec, Priority, Queue}; +use std::time::Duration; + +use backfill::{BackfillClient, BackfillError, DlqFilter, JobSpec, Priority, Queue, WorkerConfig, WorkerRunner}; use chrono::Utc; use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -191,7 +194,9 @@ async fn test_dlq_add_job_and_retrieve() { assert_eq!(dlq_job.task_identifier, "test_job"); assert_eq!(dlq_job.failure_reason, "Test failure reason"); - assert_eq!(dlq_job.failure_count, *job.attempts() as i32); + // failure_count counts DLQ-touch events for this job_key, not handler + // attempts. First touch = 1 regardless of how many retries the job had. + assert_eq!(dlq_job.failure_count, 1); // Retrieve it let retrieved = client.get_dlq_job(dlq_job.id).await.expect("should retrieve"); @@ -700,3 +705,254 @@ async fn test_dlq_with_different_priorities() { assert!(priorities.contains(&-10)); // FAST_HIGH assert!(priorities.contains(&5)); // BULK_LOW } + +/// Regression test for P2-7: `failure_count` is a DLQ-touch counter, not a +/// cumulative handler-attempts counter. +/// +/// First DLQ touch for a given `job_key` should set failure_count = 1. +/// A subsequent UPSERT (same job_key, different `_private_jobs` id — +/// simulating a requeue-then-fail-again cycle) should increment to 2. +#[tokio::test] +async fn test_dlq_failure_count_is_touch_count() { + let client = setup_test_client("dlq_failure_count").await; + client.init_dlq().await.expect("DLQ init"); + + let job_data = TestJob { + message: "touch counter test".to_string(), + number: 7, + }; + + // First failure cycle: enqueue with a job_key, add to DLQ. + let outcome = client + .enqueue( + "touch_test", + &job_data, + JobSpec { + job_key: Some("touch_count_key".to_string()), + ..Default::default() + }, + ) + .await + .expect("first enqueue"); + let first = outcome.unwrap(); + let dlq_first = client + .add_to_dlq(&first, "first cycle failure", None) + .await + .expect("first add_to_dlq"); + assert_eq!(dlq_first.failure_count, 1, "first DLQ touch should be 1"); + + // Simulate a requeue-then-fail-again. Re-enqueueing with the same key + // (job_key_mode Replace) creates a fresh _private_jobs row; adding that + // to DLQ should UPSERT into the existing DLQ row by job_key. + let outcome2 = client + .enqueue( + "touch_test", + &job_data, + JobSpec { + job_key: Some("touch_count_key".to_string()), + ..Default::default() + }, + ) + .await + .expect("second enqueue"); + let second = outcome2.unwrap(); + let dlq_second = client + .add_to_dlq(&second, "second cycle failure", None) + .await + .expect("second add_to_dlq"); + + assert_eq!( + dlq_first.id, dlq_second.id, + "same job_key should map to the same DLQ row via UPSERT" + ); + assert_eq!(dlq_second.failure_count, 2, "second DLQ touch should increment to 2"); +} + +/// Regression test for P1-1: `list_dlq_jobs.total` must reflect the filtered +/// row count, not the unfiltered table size. +/// +/// Before the fix, the count query was an unconditional `SELECT COUNT(*) FROM +/// backfill_dlq` so any paginated admin UI that filtered (by task/queue/time) +/// computed wrong page counts. The list query honored filters; only the total +/// didn't. +#[tokio::test] +async fn test_list_dlq_jobs_total_respects_filters() { + let client = setup_test_client("dlq_total_filtered").await; + client.init_dlq().await.expect("DLQ init"); + + let test_job = TestJob { + message: "filter total test".to_string(), + number: 1, + }; + + // Stage 5 jobs with task "task_a" and 3 with task "task_b" — 8 total. + for _ in 0..5 { + let outcome = client + .enqueue("task_a", &test_job, JobSpec::default()) + .await + .expect("enqueue task_a"); + client + .add_to_dlq(&outcome.unwrap(), "task_a failure", None) + .await + .expect("add task_a to DLQ"); + } + for _ in 0..3 { + let outcome = client + .enqueue("task_b", &test_job, JobSpec::default()) + .await + .expect("enqueue task_b"); + client + .add_to_dlq(&outcome.unwrap(), "task_b failure", None) + .await + .expect("add task_b to DLQ"); + } + + // Unfiltered: total = 8. + let all = client.list_dlq_jobs(DlqFilter::default()).await.expect("list all"); + assert_eq!(all.total, 8); + + // Filter by task_a — must report 5, not 8. + let only_a = client + .list_dlq_jobs(DlqFilter { + task_identifier: Some("task_a".to_string()), + ..Default::default() + }) + .await + .expect("list task_a"); + assert_eq!( + only_a.total, 5, + "filtered total must reflect the filter, not the whole table" + ); + assert_eq!(only_a.jobs.len(), 5); + + // Filter by task_b — must report 3, not 8. + let only_b = client + .list_dlq_jobs(DlqFilter { + task_identifier: Some("task_b".to_string()), + ..Default::default() + }) + .await + .expect("list task_b"); + assert_eq!(only_b.total, 3); + assert_eq!(only_b.jobs.len(), 3); + + // Combine filter + pagination: total reflects filter, jobs reflect page. + let page_a = client + .list_dlq_jobs(DlqFilter { + task_identifier: Some("task_a".to_string()), + limit: Some(2), + offset: Some(0), + ..Default::default() + }) + .await + .expect("list task_a paginated"); + assert_eq!(page_a.total, 5, "total must ignore LIMIT/OFFSET"); + assert_eq!(page_a.jobs.len(), 2); +} + +/// Regression test for the DLQ-vs-cleanup startup race (P0-1). +/// +/// Before the fix, `WorkerRunner::run_until_cancelled` called +/// `startup_cleanup_with_timeouts` first, which DELETEs rows from +/// `_private_jobs` where `attempts >= max_attempts`. The DLQ processor was +/// supposed to capture those same rows, but it ran as a periodic background +/// task that hadn't ticked yet at startup. Net effect: jobs that hit +/// max_attempts while the worker was down (or in the gap before the next DLQ +/// tick) were silently deleted. +/// +/// The fix runs `process_failed_jobs()` synchronously before cleanup when DLQ +/// is enabled. This test stages a permanently-failed job, runs the actual +/// worker startup path (`run_until_cancelled` via `spawn_background`), and +/// asserts the job lands in DLQ instead of being deleted. +#[tokio::test] +async fn test_worker_startup_moves_failed_jobs_to_dlq_before_cleanup() { + let database_url = + std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgresql://localhost/backfill_test".to_string()); + + let pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&database_url) + .await + .expect("connect to test database"); + + let schema = format!("test_dlq_startup_capture_{}", Uuid::new_v4().simple()); + let client = BackfillClient::with_pool_and_schema(pool.clone(), schema.clone()) + .await + .expect("create client"); + client.init_dlq().await.expect("init DLQ"); + + // Stage a job that will look "permanently failed" to the DLQ scanner. + let test_job = TestJob { + message: "must reach DLQ on worker startup".to_string(), + number: 4242, + }; + let outcome = client + .enqueue( + "test_job", + &test_job, + JobSpec { + max_attempts: Some(3), + ..Default::default() + }, + ) + .await + .expect("enqueue"); + let job = outcome.unwrap(); + let job_id = *job.id(); + + // Force `attempts = max_attempts` to simulate a job that exhausted retries + // before the previous worker shut down. + sqlx::query(&format!( + "UPDATE {}._private_jobs SET attempts = max_attempts WHERE id = $1", + schema + )) + .bind(job_id) + .execute(&pool) + .await + .expect("stage permanent failure"); + + // Sanity check: not in DLQ yet, still in main table. + let dlq_pre = client.list_dlq_jobs(DlqFilter::default()).await.expect("list DLQ pre"); + assert_eq!(dlq_pre.jobs.len(), 0, "DLQ should be empty before worker startup"); + + // Build and run a real WorkerRunner. The DLQ processor's tick interval is + // long enough that we know the only thing that can move the job to DLQ + // during this test window is the synchronous startup pre-move. + let config = WorkerConfig::new(database_url) + .with_schema(schema.clone()) + .with_poll_interval(Duration::from_millis(50)) + .with_dlq_processor_interval(Some(Duration::from_secs(60))); + let worker = WorkerRunner::builder(config) + .await + .expect("build runner") + .build() + .await + .expect("build worker"); + + let token = CancellationToken::new(); + let handle = worker.spawn_background(token.clone()); + + // Give startup enough time to run the pre-move + cleanup sequence. + // (No background DLQ tick will fire — we set the interval to 60s.) + tokio::time::sleep(Duration::from_millis(750)).await; + token.cancel(); + let _ = handle.await; + + // Verify: the job is in DLQ, not deleted. + let dlq_post = client.list_dlq_jobs(DlqFilter::default()).await.expect("list DLQ post"); + assert_eq!( + dlq_post.jobs.len(), + 1, + "permanently-failed job must reach DLQ during worker startup, not be deleted by cleanup" + ); + let dlq_job = &dlq_post.jobs[0]; + assert_eq!(dlq_job.original_job_id, Some(job_id)); + assert_eq!(dlq_job.task_identifier, "test_job"); + + // Cleanup + sqlx::query(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema)) + .execute(&pool) + .await + .ok(); + pool.close().await; +} diff --git a/tests/integration_tests_clean.rs b/tests/integration_tests_clean.rs index 563bb05..fe6d531 100644 --- a/tests/integration_tests_clean.rs +++ b/tests/integration_tests_clean.rs @@ -1208,3 +1208,69 @@ async fn test_mixed_parallel_and_serial_jobs() -> Result<()> { }) .await } + +/// Concurrent enqueue stress test (test gap from §6 of the review). +/// +/// Spawns multiple tokio tasks each enqueueing a batch of jobs in parallel. +/// Verifies that: +/// - Every enqueue returns success (no deadlocks, no SQL errors). +/// - All jobs land in `_private_jobs` (no lost rows). +/// - Multiple workers/clients can hammer the enqueue path simultaneously +/// without serializing on database locks. +/// +/// This is the kind of regression test that would catch race conditions +/// or contention bugs introduced by future changes to the enqueue path +/// or to graphile_worker's add_job SQL. +#[tokio::test] +async fn test_concurrent_enqueue_under_load() -> Result<()> { + with_isolated_schema(|client| async move { + const TASKS: usize = 10; + const JOBS_PER_TASK: usize = 50; + const TOTAL: usize = TASKS * JOBS_PER_TASK; + + // Fan out: each task enqueues JOBS_PER_TASK jobs serially within + // itself, but TASKS tasks run concurrently against the same pool. + let mut handles = Vec::with_capacity(TASKS); + for task_id in 0..TASKS { + let client = client.clone(); + let handle = tokio::spawn(async move { + let mut enqueued = 0usize; + for job_id in 0..JOBS_PER_TASK { + let payload = TestJob { + message: format!("stress-{task_id}-{job_id}"), + number: (task_id * JOBS_PER_TASK + job_id) as i32, + }; + let outcome = client.enqueue("stress_job", &payload, JobSpec::default()).await?; + if outcome.is_enqueued() { + enqueued += 1; + } + } + Ok::(enqueued) + }); + handles.push(handle); + } + + // Collect results — any task panic or task error fails the test. + let mut total_enqueued = 0usize; + for handle in handles { + let count = handle + .await + .map_err(|e| BackfillError::WorkerRuntime(format!("task join failed: {e}")))??; + total_enqueued += count; + } + assert_eq!(total_enqueued, TOTAL, "every concurrent enqueue must succeed"); + + // Verify every row landed in _private_jobs. + let row_count: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema())) + .fetch_one(client.pool()) + .await?; + assert_eq!( + row_count.0 as usize, TOTAL, + "all {} jobs must be persisted in _private_jobs", + TOTAL + ); + + Ok(()) + }) + .await +} diff --git a/tests/plugin_tests.rs b/tests/plugin_tests.rs index fdd2355..3c886fc 100644 --- a/tests/plugin_tests.rs +++ b/tests/plugin_tests.rs @@ -447,6 +447,247 @@ async fn test_multiple_plugins_called_in_order() -> Result<()> { .await } +/// End-to-end retry-to-exhaustion → DLQ test exercising the actual worker +/// path (handler runs, graphile_worker reschedules, repeat until +/// max_attempts hits, DLQ scanner moves the row). +/// +/// Existing DLQ tests SQL-fake exhaustion via +/// `UPDATE _private_jobs SET attempts = max_attempts`. This test instead +/// runs the real worker + real handler + real `fail_job` SQL multiple +/// times, fast-forwarding `run_at` between iterations to skip the +/// `exp(attempts)` backoff sleeps that would otherwise slow the test to +/// minutes. The retry-then-DLQ path through every layer (enqueue, get_job, +/// handler, fail_job, run_at scheduling, eventual DLQ capture) is what +/// gets validated. +#[tokio::test] +async fn test_retry_to_exhaustion_then_dlq_via_worker() -> Result<()> { + with_isolated_schema(|client| async move { + // FailJob with should_retry=true returns TemporaryUnavailable + // (retryable) on attempts < 3 and ValidationFailed (non-retryable) + // on the final attempt. PermanentFailurePlugin only acts on the + // JobFail event (will_retry=true), so the retryable failures just + // get rescheduled; the final failure fires JobPermanentlyFail + // because attempts has reached max_attempts naturally. Net effect: + // the job exhausts retries through graphile_worker's normal path. + client + .enqueue( + "fail_job", + &FailJob { should_retry: true }, + JobSpec { + max_attempts: Some(3), + job_key: Some("retry_to_exhaustion_e2e".to_string()), + ..Default::default() + }, + ) + .await?; + + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(client.schema().to_string()) + .with_poll_interval(Duration::from_millis(50)); + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + // Run the worker repeatedly. After each iteration the failed job's + // `run_at` has been pushed forward by `exp(attempts)` seconds (1s, + // ~3s, ~7s, …). Force it back to NOW() so the next process_available_jobs() + // can pick it up immediately. Bounded loop so a regression that + // breaks the retry path doesn't hang the test forever. + const MAX_ITERATIONS: usize = 10; + let mut exhausted = false; + for _ in 0..MAX_ITERATIONS { + sqlx::query(&format!( + "UPDATE {}._private_jobs \ + SET run_at = NOW() \ + WHERE locked_at IS NULL AND attempts < max_attempts", + client.schema() + )) + .execute(client.pool()) + .await?; + + worker.process_available_jobs().await?; + + // Check if attempts hit max_attempts. + let row: Option<(i16, i16)> = sqlx::query_as(&format!( + "SELECT attempts, max_attempts FROM {}._private_jobs LIMIT 1", + client.schema() + )) + .fetch_optional(client.pool()) + .await?; + if let Some((attempts, max_attempts)) = row + && attempts >= max_attempts + && max_attempts > 0 + { + exhausted = true; + break; + } + } + assert!( + exhausted, + "Worker should have exhausted retries within {} iterations", + MAX_ITERATIONS + ); + + // Now run the DLQ processor. The job should be captured. + let moved = client.process_failed_jobs().await?; + assert_eq!(moved, 1, "exhausted job should move to DLQ on next process_failed_jobs"); + + let dlq = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!(dlq.jobs.len(), 1); + assert_eq!(dlq.jobs[0].task_identifier, "fail_job"); + assert_eq!(dlq.jobs[0].job_key.as_deref(), Some("retry_to_exhaustion_e2e")); + // `failure_count` is the touch counter (P2-7) — one DLQ touch. + assert_eq!(dlq.jobs[0].failure_count, 1); + + // The original job row should be gone from _private_jobs (the DLQ + // move's atomic CTE deleted it — P1-2). + let count: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema())) + .fetch_one(client.pool()) + .await?; + assert_eq!(count.0, 0, "_private_jobs should be empty after DLQ move"); + + Ok(()) + }) + .await +} + +/// Regression test for P0-3: non-retryable `WorkerError` variants must +/// short-circuit retries instead of running all the way up to `max_attempts`. +/// +/// `WorkerError::is_retryable()` and `classify_from_message()` exist as the +/// public contract, but until this test landed nothing actually consulted them +/// at runtime. graphile_worker treats every `Err` identically and reschedules +/// for `e^min(attempts,10)` seconds regardless of error type. The fix is the +/// auto-registered `PermanentFailurePlugin` (gated on `dlq_processor_interval = +/// Some(_)`), which on a non-retryable error rewrites the row's `attempts` to +/// `max_attempts` so the next get_job() ignores it and the next DLQ tick +/// captures it. +/// +/// This test enqueues a job whose handler always returns +/// `WorkerError::ValidationFailed` (non-retryable), runs the worker once, and +/// asserts the row's attempts immediately reached `max_attempts` after a +/// single execution. +#[tokio::test] +async fn test_non_retryable_error_short_circuits_retries() -> Result<()> { + with_isolated_schema(|client| async move { + // Enqueue a job that will always return ValidationFailed (non-retryable + // because FailJob with should_retry=false hits the else-branch). + client + .enqueue( + "fail_job", + &FailJob { should_retry: false }, + JobSpec { + max_attempts: Some(5), + job_key: Some("non_retryable_short_circuit".to_string()), + ..Default::default() + }, + ) + .await?; + + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(client.schema().to_string()) + .with_poll_interval(Duration::from_millis(50)); + // dlq_processor_interval defaults to Some(60s), which is what triggers + // PermanentFailurePlugin auto-registration. Don't override it. + + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + // Run once. The handler runs, returns ValidationFailed, JobFail hook + // fires, our plugin classifies the error as non-retryable, and rewrites + // attempts to max_attempts. + worker.process_available_jobs().await?; + + // Verify: attempts hit max_attempts after a single execution rather + // than incrementing by one per retry. + let row: (i16, i16) = sqlx::query_as(&format!( + "SELECT attempts, max_attempts FROM {}._private_jobs LIMIT 1", + client.schema() + )) + .fetch_one(client.pool()) + .await?; + let (attempts, max_attempts) = row; + assert_eq!( + attempts, max_attempts, + "Non-retryable error should short-circuit retries: attempts = max_attempts \ + (got attempts={}, max_attempts={})", + attempts, max_attempts + ); + + // The DLQ processor would normally pick this up on its next tick. We + // can call it directly to verify the end-to-end behaviour: the job + // ends up in the DLQ within a single retry-cycle's worth of time. + let moved = client.process_failed_jobs().await?; + assert_eq!( + moved, 1, + "permanent-failure short-circuit should make the job DLQ-eligible" + ); + + let dlq = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!(dlq.jobs.len(), 1); + assert_eq!(dlq.jobs[0].task_identifier, "fail_job"); + + Ok(()) + }) + .await +} + +#[tokio::test] +async fn test_retryable_error_does_not_short_circuit() -> Result<()> { + // Mirror of the previous test: confirms the plugin only fires for + // non-retryable errors, leaving the normal exponential-backoff path + // intact for everything else. + with_isolated_schema(|client| async move { + client + .enqueue( + "fail_job", + // should_retry=true with attempt<3 returns TemporaryUnavailable + // (retryable); on later attempts it would return ValidationFailed, + // but with max_attempts=2 we'll only see one execution. + &FailJob { should_retry: true }, + JobSpec { + max_attempts: Some(2), + job_key: Some("retryable_no_short_circuit".to_string()), + ..Default::default() + }, + ) + .await?; + + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(client.schema().to_string()) + .with_poll_interval(Duration::from_millis(50)); + + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + worker.process_available_jobs().await?; + + let (attempts, max_attempts): (i16, i16) = sqlx::query_as(&format!( + "SELECT attempts, max_attempts FROM {}._private_jobs LIMIT 1", + client.schema() + )) + .fetch_one(client.pool()) + .await?; + assert_eq!( + attempts, 1, + "Retryable error must NOT short-circuit; expected attempts=1 after one execution \ + (got attempts={}, max_attempts={})", + attempts, max_attempts + ); + + Ok(()) + }) + .await +} + #[tokio::test] async fn test_plugin_worker_lifecycle_hooks() -> Result<()> { with_isolated_schema(|client| async move { diff --git a/tests/worker_tests.rs b/tests/worker_tests.rs index a79cf23..43875ae 100644 --- a/tests/worker_tests.rs +++ b/tests/worker_tests.rs @@ -45,16 +45,14 @@ async fn test_worker_config_builder() { .with_schema("custom_schema") .with_poll_interval(Duration::from_millis(100)) .with_dlq_processor_interval(Some(Duration::from_secs(30))) - .with_queues(vec![ - QueueConfig::default_queue(5), - QueueConfig::named_queue("fast", 10), - ]); + .with_concurrency(10); assert_eq!(config.database_url, "postgresql://localhost/test"); assert_eq!(config.schema, "custom_schema"); assert_eq!(config.poll_interval, Duration::from_millis(100)); assert_eq!(config.dlq_processor_interval, Some(Duration::from_secs(30))); - assert_eq!(config.queue_configs.len(), 2); + assert_eq!(config.queue_configs.len(), 1); + assert_eq!(config.queue_configs[0].concurrency, 10); } #[tokio::test] @@ -74,6 +72,7 @@ async fn test_queue_config_default_queue() { } #[tokio::test] +#[allow(deprecated)] // documents the deprecated constructor's storage shape async fn test_queue_config_named_queue() { let config = QueueConfig::named_queue("bulk", 20); @@ -83,6 +82,7 @@ async fn test_queue_config_named_queue() { } #[tokio::test] +#[allow(deprecated)] // documents the deprecated constructor's storage shape async fn test_queue_config_priority_queue() { let config = QueueConfig::priority_queue("urgent", 5, -100, 100); @@ -151,7 +151,12 @@ async fn test_worker_runner_multiple_job_types() -> Result<(), BackfillError> { } #[tokio::test] -async fn test_worker_runner_with_multiple_queues() -> Result<(), BackfillError> { +#[allow(deprecated)] // covers the deprecated multi-queue API while it still exists +async fn test_worker_runner_with_multiple_queues_only_first_honored() -> Result<(), BackfillError> { + // The Vec API is deprecated because graphile_worker doesn't + // expose per-worker queue filtering; only one Worker is ever spawned and + // only the first config's concurrency is used. This test pins that + // behaviour: passing 3 configs results in worker_count == 1. let config = WorkerConfig::new(get_test_database_url()) .with_schema("test_worker_queues") .with_queues(vec![ @@ -167,7 +172,11 @@ async fn test_worker_runner_with_multiple_queues() -> Result<(), BackfillError> .build() .await?; - assert_eq!(worker.worker_count(), 3); + assert_eq!( + worker.worker_count(), + 1, + "WorkerRunner only ever spawns one Worker, regardless of queue_configs.len()" + ); Ok(()) } @@ -320,12 +329,9 @@ async fn test_worker_runner_process_available_jobs() -> Result<(), BackfillError enqueue_fast(client, SimpleTestJob::IDENTIFIER, &job2, None).await?; enqueue_fast(client, SimpleTestJob::IDENTIFIER, &job3, None).await?; - // Process all available jobs - let processed = worker.process_available_jobs().await?; - - // Note: Returns 0 because job counting isn't implemented yet - // But the jobs should still be processed successfully - assert_eq!(processed, 0); + // Process all available jobs. Returns () — for job counts, plug in a + // JobComplete hook before building the worker. + worker.process_available_jobs().await?; // Verify jobs were actually processed by checking the database // Jobs should be completed and removed from the queue @@ -361,7 +367,7 @@ async fn test_worker_runner_invalid_database_url() { async fn test_worker_options_builder_concurrency() -> Result<(), BackfillError> { let config = WorkerConfig::new(get_test_database_url()) .with_schema("test_concurrency") - .with_queues(vec![QueueConfig::default_queue(15)]) + .with_concurrency(15) .with_dlq_processor_interval(None); let worker = WorkerRunner::builder(config)