Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions tests/integration_tests_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ type Result<T> = std::result::Result<T, BackfillError>;
use backfill::*;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

/// Test job payload for integration tests
Expand Down Expand Up @@ -482,6 +483,18 @@ impl TaskHandler for CronTestHandler {
}
}

// Trivial success handler for tests that need a real job to run to completion.
#[derive(Clone, Serialize, Deserialize)]
struct OkHandler;

impl TaskHandler for OkHandler {
const IDENTIFIER: &'static str = "ok_handler";

async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
Ok::<(), std::io::Error>(())
}
}

#[tokio::test]
async fn test_cron_schedule_registration() -> Result<()> {
ensure_test_database().await?;
Expand Down Expand Up @@ -852,6 +865,109 @@ async fn test_startup_cleanup_releases_both_lock_types() -> Result<()> {
.await
}

/// End-to-end worker crash-recovery test.
///
/// Existing stale-lock tests exercise the SQL helpers (`release_stale_*`)
/// directly. This test instead validates the full crash-recovery loop
/// through the `WorkerRunner` startup path:
///
/// 1. Enqueue a job, then forcibly stamp `_private_jobs.locked_at` far in the
/// past with a fake `locked_by` to simulate a worker that crashed
/// mid-execution.
/// 2. Spin up a fresh `WorkerRunner`. Its startup_cleanup step should release
/// the stale lock.
/// 3. The newly-started worker should then pick up the unlocked job and run it
/// to completion.
/// 4. After cancellation, the row should be gone — the job ran successfully.
///
/// If startup_cleanup were skipped, broken, or ordered after the worker's
/// first poll, the job would stay locked and the row would still be in the
/// table at the end. This test pins the recovery contract end-to-end.
#[tokio::test]
async fn test_worker_crash_recovery_through_startup() -> Result<()> {
with_isolated_schema(|client| async move {
let schema = client.schema().to_string();

// Enqueue using `enqueue_task` so the payload shape matches the
// handler — a previous version of this test enqueued a TestJob
// payload against the OkHandler identifier, which deserialized
// wrong inside the worker and quietly failed instead of running.
let outcome = client.enqueue_task(OkHandler, JobSpec::default()).await?;
let job = outcome.expect("enqueue");
let job_id = *job.id();

// Simulate a crashed worker holding the lock for an hour.
sqlx::query(&format!(
"UPDATE {schema}._private_jobs \
SET locked_at = NOW() - INTERVAL '1 hour', locked_by = 'crashed_worker' \
WHERE id = $1"
))
.bind(job_id)
.execute(client.pool())
.await?;

// Sanity check — the job is locked.
let locked: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM {schema}._private_jobs WHERE locked_at IS NOT NULL"
))
.fetch_one(client.pool())
.await?;
assert_eq!(locked.0, 1, "job should be locked before recovery starts");

// Build a worker. dlq_processor_interval is set to 60s so the
// background DLQ tick can't fire during the test window — only the
// synchronous startup_cleanup gets to act on the locked row.
let config = WorkerConfig::new(get_test_database_url())
.with_schema(&schema)
.with_poll_interval(std::time::Duration::from_millis(50))
.with_dlq_processor_interval(Some(std::time::Duration::from_secs(60)));
let worker = WorkerRunner::builder(config)
.await?
.define_job::<OkHandler>()
.build()
.await?;

let token = CancellationToken::new();
let handle = worker.spawn_background(token.clone());

// Poll for the row to disappear. Worker init + startup_cleanup +
// first poll + handler + delete should all complete within a few
// seconds; bound the wait so a regression doesn't hang the test.
let mut recovered = false;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
while std::time::Instant::now() < deadline {
let remaining: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {schema}._private_jobs"))
.fetch_one(client.pool())
.await?;
if remaining.0 == 0 {
recovered = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}

// Pull diagnostic state if we didn't recover, then cancel.
type JobDiagRow = (i16, i16, Option<chrono::DateTime<chrono::Utc>>, Option<String>);
let final_state: Option<JobDiagRow> = sqlx::query_as(&format!(
"SELECT attempts, max_attempts, locked_at, locked_by FROM {schema}._private_jobs LIMIT 1"
))
.fetch_optional(client.pool())
.await?;

token.cancel();
let _ = handle.await;

assert!(
recovered,
"the previously-stale-locked job should have been recovered, run, and removed within 5s. \
Final row state: {final_state:?}"
);

Ok(())
})
.await
}

// =============================================================================
// Queue Parallel vs Serial Behavior Tests
// =============================================================================
Expand Down
213 changes: 213 additions & 0 deletions tests/plugin_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ impl TaskHandler for FailJob {
}
}

// Test job that fails on the first `succeed_at - 1` attempts then succeeds.
//
// `attempts` here is the value graphile_worker increments at fetch time, so
// the very first call sees attempt=1, the second sees attempt=2, etc. With
// `succeed_at = 3` the handler returns Err on attempts 1 and 2, then Ok on
// attempt 3.
#[derive(Clone, Serialize, Deserialize)]
struct EventuallySucceedJob {
succeed_at: i16,
}

impl TaskHandler for EventuallySucceedJob {
const IDENTIFIER: &'static str = "eventually_succeed_job";

async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
let attempt = *ctx.job().attempts();
if attempt < self.succeed_at {
Err(WorkerError::TemporaryUnavailable {
message: format!("not yet — attempt {attempt} < {}", self.succeed_at),
})
} else {
Ok::<(), WorkerError>(())
}
}
}

/// Test plugin that counts hook invocations
#[derive(Clone)]
struct CountingPlugin {
Expand Down Expand Up @@ -553,6 +579,193 @@ async fn test_retry_to_exhaustion_then_dlq_via_worker() -> Result<()> {
.await
}

/// End-to-end DLQ requeue + successful re-execution test.
///
/// `test_dlq_requeue_job` (in dlq_tests.rs) verifies the requeue *operation*
/// — that requeued_count is incremented, last_requeued_at is set, etc. It
/// does NOT verify that the requeued job actually runs to completion or
/// that the DLQ entry gets cleaned up afterward.
///
/// This test does the full lifecycle:
/// 1. Enqueue a job with a job_key.
/// 2. Force it into permanently-failed state via SQL.
/// 3. `process_failed_jobs()` moves it to DLQ.
/// 4. `requeue_dlq_job()` creates a fresh `_private_jobs` row.
/// 5. Run the worker — the handler succeeds.
/// 6. Assert: row is gone from `_private_jobs` (success).
/// 7. Assert: DLQ is empty too — `DlqCleanupPlugin` (auto-registered when DLQ
/// is enabled) hooks `JobComplete` and deletes DLQ entries by matching
/// job_key.
///
/// If `DlqCleanupPlugin` regressed (or wasn't auto-registered) the DLQ
/// entry would persist after the requeued job succeeded, and an admin
/// could be tricked into re-requeueing the same job indefinitely.
#[tokio::test]
async fn test_dlq_requeue_runs_to_completion_and_cleans_dlq() -> Result<()> {
with_isolated_schema(|client| async move {
client.init_dlq().await?;

let outcome = client
.enqueue(
"success_job",
&SuccessJob {
message: "originally failed, will succeed on requeue".to_string(),
},
JobSpec {
max_attempts: Some(3),
job_key: Some("requeue_run_test".to_string()),
..Default::default()
},
)
.await?;
let job = outcome.unwrap();
let job_id = *job.id();

// Force the job into permanently-failed state.
sqlx::query(&format!(
"UPDATE {}._private_jobs SET attempts = max_attempts WHERE id = $1",
client.schema()
))
.bind(job_id)
.execute(client.pool())
.await?;

// Move to DLQ.
let moved = client.process_failed_jobs().await?;
assert_eq!(moved, 1);

let dlq_pre = client.list_dlq_jobs(DlqFilter::default()).await?;
assert_eq!(dlq_pre.jobs.len(), 1, "DLQ should have the failed job");
let dlq_id = dlq_pre.jobs[0].id;

// Admin requeues the job.
let _requeued = client
.requeue_dlq_job(dlq_id, Some("retry after fix".to_string()))
.await?;

// After requeue, the DLQ entry persists but with bumped counters
// — it'll be cleaned up only when the requeued job *succeeds*.
let dlq_after_requeue = client.list_dlq_jobs(DlqFilter::default()).await?;
assert_eq!(dlq_after_requeue.jobs.len(), 1);
assert_eq!(dlq_after_requeue.jobs[0].requeued_count, 1);

// Run the worker. SuccessJob always returns Ok, so the requeued
// row should run to completion.
let config = WorkerConfig::new(get_test_database_url())
.with_schema(client.schema().to_string())
.with_poll_interval(Duration::from_millis(50));
let worker = WorkerRunner::builder(config)
.await?
.define_job::<SuccessJob>()
.build()
.await?;

worker.process_available_jobs().await?;

// Main queue empty — job ran.
let main: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema()))
.fetch_one(client.pool())
.await?;
assert_eq!(main.0, 0, "requeued job should have run to completion");

// DLQ empty — DlqCleanupPlugin removed the entry on JobComplete.
let dlq_post = client.list_dlq_jobs(DlqFilter::default()).await?;
assert_eq!(
dlq_post.jobs.len(),
0,
"DlqCleanupPlugin must remove DLQ entries when their job_key completes successfully"
);

Ok(())
})
.await
}

/// Coverage for the retry-then-eventual-success path: a handler that
/// returns Err on its first few attempts and `Ok` on a later attempt should
/// run to completion just like a never-failed job. This validates that the
/// retry mechanism returns cleanly to the success path — no stuck rows, no
/// DLQ leakage, no leftover state in `_private_jobs`.
///
/// The retry-then-DLQ path is covered by
/// `test_retry_to_exhaustion_then_dlq_via_worker`. This is the symmetric
/// "succeeded eventually" counterpart.
#[tokio::test]
async fn test_retry_then_eventual_success() -> Result<()> {
with_isolated_schema(|client| async move {
// Handler fails on attempts 1, 2 (with a retryable error so
// PermanentFailurePlugin doesn't short-circuit) and succeeds on
// attempt 3.
client
.enqueue(
"eventually_succeed_job",
&EventuallySucceedJob { succeed_at: 3 },
JobSpec {
max_attempts: Some(5),
job_key: Some("retry_then_success".to_string()),
..Default::default()
},
)
.await?;

let config = WorkerConfig::new(get_test_database_url())
.with_schema(client.schema().to_string())
.with_poll_interval(Duration::from_millis(50));
let worker = WorkerRunner::builder(config)
.await?
.define_job::<EventuallySucceedJob>()
.build()
.await?;

// Iteratively run + fast-forward run_at, same trick as the exhaustion
// test. Bounded loop so a regression that breaks the success path
// doesn't hang the test.
const MAX_ITERATIONS: usize = 10;
let mut succeeded = false;
for _ in 0..MAX_ITERATIONS {
sqlx::query(&format!(
"UPDATE {}._private_jobs \
SET run_at = NOW() \
WHERE locked_at IS NULL AND attempts < max_attempts",
client.schema()
))
.execute(client.pool())
.await?;

worker.process_available_jobs().await?;

// Successful completion deletes the row.
let remaining: (i64,) = sqlx::query_as(&format!("SELECT COUNT(*) FROM {}._private_jobs", client.schema()))
.fetch_one(client.pool())
.await?;
if remaining.0 == 0 {
succeeded = true;
break;
}
}
assert!(
succeeded,
"job should have succeeded within {} iterations",
MAX_ITERATIONS
);

// Belt-and-suspenders: also confirm it didn't leak into the DLQ on
// the way through. (DLQ may not have been initialized — list_dlq_jobs
// would error if not. Initialize it first so this assertion is
// unconditional.)
client.init_dlq().await?;
let dlq = client.list_dlq_jobs(DlqFilter::default()).await?;
assert_eq!(
dlq.jobs.len(),
0,
"successful job must not appear in DLQ — even one that failed transiently"
);

Ok(())
})
.await
}

/// Regression test for P0-3: non-retryable `WorkerError` variants must
/// short-circuit retries instead of running all the way up to `max_attempts`.
///
Expand Down