diff --git a/tests/integration_tests_clean.rs b/tests/integration_tests_clean.rs index b839358..7bed18d 100644 --- a/tests/integration_tests_clean.rs +++ b/tests/integration_tests_clean.rs @@ -2,6 +2,7 @@ type Result = std::result::Result; use backfill::*; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use tokio_util::sync::CancellationToken; use uuid::Uuid; /// Test job payload for integration tests @@ -482,6 +483,18 @@ impl TaskHandler for CronTestHandler { } } +// Trivial success handler for tests that need a real job to run to completion. +#[derive(Clone, Serialize, Deserialize)] +struct OkHandler; + +impl TaskHandler for OkHandler { + const IDENTIFIER: &'static str = "ok_handler"; + + async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult { + Ok::<(), std::io::Error>(()) + } +} + #[tokio::test] async fn test_cron_schedule_registration() -> Result<()> { ensure_test_database().await?; @@ -852,6 +865,109 @@ async fn test_startup_cleanup_releases_both_lock_types() -> Result<()> { .await } +/// End-to-end worker crash-recovery test. +/// +/// Existing stale-lock tests exercise the SQL helpers (`release_stale_*`) +/// directly. This test instead validates the full crash-recovery loop +/// through the `WorkerRunner` startup path: +/// +/// 1. Enqueue a job, then forcibly stamp `_private_jobs.locked_at` far in the +/// past with a fake `locked_by` to simulate a worker that crashed +/// mid-execution. +/// 2. Spin up a fresh `WorkerRunner`. Its startup_cleanup step should release +/// the stale lock. +/// 3. The newly-started worker should then pick up the unlocked job and run it +/// to completion. +/// 4. After cancellation, the row should be gone — the job ran successfully. +/// +/// If startup_cleanup were skipped, broken, or ordered after the worker's +/// first poll, the job would stay locked and the row would still be in the +/// table at the end. This test pins the recovery contract end-to-end. +#[tokio::test] +async fn test_worker_crash_recovery_through_startup() -> Result<()> { + with_isolated_schema(|client| async move { + let schema = client.schema().to_string(); + + // Enqueue using `enqueue_task` so the payload shape matches the + // handler — a previous version of this test enqueued a TestJob + // payload against the OkHandler identifier, which deserialized + // wrong inside the worker and quietly failed instead of running. + let outcome = client.enqueue_task(OkHandler, JobSpec::default()).await?; + let job = outcome.expect("enqueue"); + let job_id = *job.id(); + + // Simulate a crashed worker holding the lock for an hour. + sqlx::query(&format!( + "UPDATE {schema}._private_jobs \ + SET locked_at = NOW() - INTERVAL '1 hour', locked_by = 'crashed_worker' \ + WHERE id = $1" + )) + .bind(job_id) + .execute(client.pool()) + .await?; + + // Sanity check — the job is locked. + let locked: (i64,) = sqlx::query_as(&format!( + "SELECT COUNT(*) FROM {schema}._private_jobs WHERE locked_at IS NOT NULL" + )) + .fetch_one(client.pool()) + .await?; + assert_eq!(locked.0, 1, "job should be locked before recovery starts"); + + // Build a worker. dlq_processor_interval is set to 60s so the + // background DLQ tick can't fire during the test window — only the + // synchronous startup_cleanup gets to act on the locked row. + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(&schema) + .with_poll_interval(std::time::Duration::from_millis(50)) + .with_dlq_processor_interval(Some(std::time::Duration::from_secs(60))); + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + let token = CancellationToken::new(); + let handle = worker.spawn_background(token.clone()); + + // Poll for the row to disappear. Worker init + startup_cleanup + + // first poll + handler + delete should all complete within a few + // seconds; bound the wait so a regression doesn't hang the test. + let mut recovered = false; + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); + while std::time::Instant::now() < deadline { + let remaining: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {schema}._private_jobs")) + .fetch_one(client.pool()) + .await?; + if remaining.0 == 0 { + recovered = true; + break; + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + // Pull diagnostic state if we didn't recover, then cancel. + type JobDiagRow = (i16, i16, Option>, Option); + let final_state: Option = sqlx::query_as(&format!( + "SELECT attempts, max_attempts, locked_at, locked_by FROM {schema}._private_jobs LIMIT 1" + )) + .fetch_optional(client.pool()) + .await?; + + token.cancel(); + let _ = handle.await; + + assert!( + recovered, + "the previously-stale-locked job should have been recovered, run, and removed within 5s. \ + Final row state: {final_state:?}" + ); + + Ok(()) + }) + .await +} + // ============================================================================= // Queue Parallel vs Serial Behavior Tests // ============================================================================= diff --git a/tests/plugin_tests.rs b/tests/plugin_tests.rs index 3c886fc..85a2868 100644 --- a/tests/plugin_tests.rs +++ b/tests/plugin_tests.rs @@ -95,6 +95,32 @@ impl TaskHandler for FailJob { } } +// Test job that fails on the first `succeed_at - 1` attempts then succeeds. +// +// `attempts` here is the value graphile_worker increments at fetch time, so +// the very first call sees attempt=1, the second sees attempt=2, etc. With +// `succeed_at = 3` the handler returns Err on attempts 1 and 2, then Ok on +// attempt 3. +#[derive(Clone, Serialize, Deserialize)] +struct EventuallySucceedJob { + succeed_at: i16, +} + +impl TaskHandler for EventuallySucceedJob { + const IDENTIFIER: &'static str = "eventually_succeed_job"; + + async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult { + let attempt = *ctx.job().attempts(); + if attempt < self.succeed_at { + Err(WorkerError::TemporaryUnavailable { + message: format!("not yet — attempt {attempt} < {}", self.succeed_at), + }) + } else { + Ok::<(), WorkerError>(()) + } + } +} + /// Test plugin that counts hook invocations #[derive(Clone)] struct CountingPlugin { @@ -553,6 +579,193 @@ async fn test_retry_to_exhaustion_then_dlq_via_worker() -> Result<()> { .await } +/// End-to-end DLQ requeue + successful re-execution test. +/// +/// `test_dlq_requeue_job` (in dlq_tests.rs) verifies the requeue *operation* +/// — that requeued_count is incremented, last_requeued_at is set, etc. It +/// does NOT verify that the requeued job actually runs to completion or +/// that the DLQ entry gets cleaned up afterward. +/// +/// This test does the full lifecycle: +/// 1. Enqueue a job with a job_key. +/// 2. Force it into permanently-failed state via SQL. +/// 3. `process_failed_jobs()` moves it to DLQ. +/// 4. `requeue_dlq_job()` creates a fresh `_private_jobs` row. +/// 5. Run the worker — the handler succeeds. +/// 6. Assert: row is gone from `_private_jobs` (success). +/// 7. Assert: DLQ is empty too — `DlqCleanupPlugin` (auto-registered when DLQ +/// is enabled) hooks `JobComplete` and deletes DLQ entries by matching +/// job_key. +/// +/// If `DlqCleanupPlugin` regressed (or wasn't auto-registered) the DLQ +/// entry would persist after the requeued job succeeded, and an admin +/// could be tricked into re-requeueing the same job indefinitely. +#[tokio::test] +async fn test_dlq_requeue_runs_to_completion_and_cleans_dlq() -> Result<()> { + with_isolated_schema(|client| async move { + client.init_dlq().await?; + + let outcome = client + .enqueue( + "success_job", + &SuccessJob { + message: "originally failed, will succeed on requeue".to_string(), + }, + JobSpec { + max_attempts: Some(3), + job_key: Some("requeue_run_test".to_string()), + ..Default::default() + }, + ) + .await?; + let job = outcome.unwrap(); + let job_id = *job.id(); + + // Force the job into permanently-failed state. + sqlx::query(&format!( + "UPDATE {}._private_jobs SET attempts = max_attempts WHERE id = $1", + client.schema() + )) + .bind(job_id) + .execute(client.pool()) + .await?; + + // Move to DLQ. + let moved = client.process_failed_jobs().await?; + assert_eq!(moved, 1); + + let dlq_pre = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!(dlq_pre.jobs.len(), 1, "DLQ should have the failed job"); + let dlq_id = dlq_pre.jobs[0].id; + + // Admin requeues the job. + let _requeued = client + .requeue_dlq_job(dlq_id, Some("retry after fix".to_string())) + .await?; + + // After requeue, the DLQ entry persists but with bumped counters + // — it'll be cleaned up only when the requeued job *succeeds*. + let dlq_after_requeue = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!(dlq_after_requeue.jobs.len(), 1); + assert_eq!(dlq_after_requeue.jobs[0].requeued_count, 1); + + // Run the worker. SuccessJob always returns Ok, so the requeued + // row should run to completion. + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(client.schema().to_string()) + .with_poll_interval(Duration::from_millis(50)); + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + worker.process_available_jobs().await?; + + // Main queue empty — job ran. + let main: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema())) + .fetch_one(client.pool()) + .await?; + assert_eq!(main.0, 0, "requeued job should have run to completion"); + + // DLQ empty — DlqCleanupPlugin removed the entry on JobComplete. + let dlq_post = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!( + dlq_post.jobs.len(), + 0, + "DlqCleanupPlugin must remove DLQ entries when their job_key completes successfully" + ); + + Ok(()) + }) + .await +} + +/// Coverage for the retry-then-eventual-success path: a handler that +/// returns Err on its first few attempts and `Ok` on a later attempt should +/// run to completion just like a never-failed job. This validates that the +/// retry mechanism returns cleanly to the success path — no stuck rows, no +/// DLQ leakage, no leftover state in `_private_jobs`. +/// +/// The retry-then-DLQ path is covered by +/// `test_retry_to_exhaustion_then_dlq_via_worker`. This is the symmetric +/// "succeeded eventually" counterpart. +#[tokio::test] +async fn test_retry_then_eventual_success() -> Result<()> { + with_isolated_schema(|client| async move { + // Handler fails on attempts 1, 2 (with a retryable error so + // PermanentFailurePlugin doesn't short-circuit) and succeeds on + // attempt 3. + client + .enqueue( + "eventually_succeed_job", + &EventuallySucceedJob { succeed_at: 3 }, + JobSpec { + max_attempts: Some(5), + job_key: Some("retry_then_success".to_string()), + ..Default::default() + }, + ) + .await?; + + let config = WorkerConfig::new(get_test_database_url()) + .with_schema(client.schema().to_string()) + .with_poll_interval(Duration::from_millis(50)); + let worker = WorkerRunner::builder(config) + .await? + .define_job::() + .build() + .await?; + + // Iteratively run + fast-forward run_at, same trick as the exhaustion + // test. Bounded loop so a regression that breaks the success path + // doesn't hang the test. + const MAX_ITERATIONS: usize = 10; + let mut succeeded = false; + for _ in 0..MAX_ITERATIONS { + sqlx::query(&format!( + "UPDATE {}._private_jobs \ + SET run_at = NOW() \ + WHERE locked_at IS NULL AND attempts < max_attempts", + client.schema() + )) + .execute(client.pool()) + .await?; + + worker.process_available_jobs().await?; + + // Successful completion deletes the row. + let remaining: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema())) + .fetch_one(client.pool()) + .await?; + if remaining.0 == 0 { + succeeded = true; + break; + } + } + assert!( + succeeded, + "job should have succeeded within {} iterations", + MAX_ITERATIONS + ); + + // Belt-and-suspenders: also confirm it didn't leak into the DLQ on + // the way through. (DLQ may not have been initialized — list_dlq_jobs + // would error if not. Initialize it first so this assertion is + // unconditional.) + client.init_dlq().await?; + let dlq = client.list_dlq_jobs(DlqFilter::default()).await?; + assert_eq!( + dlq.jobs.len(), + 0, + "successful job must not appear in DLQ — even one that failed transiently" + ); + + Ok(()) + }) + .await +} + /// Regression test for P0-3: non-retryable `WorkerError` variants must /// short-circuit retries instead of running all the way up to `max_attempts`. ///