CCIP-9463 Durable queue for storage writer #723
Conversation
bfccfa1 to
348ce9a
Compare
There was a problem hiding this comment.
Pull request overview
This pull request implements a durable, database-backed queue for the storage writer component of the verifier system. The key innovation is replacing the in-memory batcher with a PostgreSQL-backed job queue to provide durability and crash recovery. The implementation maintains backward compatibility by falling back to the in-memory implementation when no database connection is provided.
Changes:
- Implements a generic PostgreSQL job queue interface with full CRUD operations, retry logic, and archival capabilities
- Adds StorageWriterProcessorDB as a database-backed alternative to the existing StorageWriterProcessor
- Introduces queue-batcher adapter to integrate the existing batcher pattern with the new persistent queue
- Adds comprehensive database migrations for job queue tables
- Updates all coordinator constructors to accept an optional database connection parameter
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| verifier/jobqueue/interface.go | Defines generic job queue interface with Jobable trait for queue items |
| verifier/jobqueue/postgres_queue.go | PostgreSQL implementation of job queue with row-level locking and SKIP LOCKED |
| verifier/jobqueue/postgres_queue_test.go | Comprehensive unit tests covering all queue operations and concurrent scenarios |
| verifier/jobqueue/postgres_queue_bench_test.go | Benchmark tests demonstrating throughput under load |
| verifier/storage_writer_db.go | Database-backed storage writer processor that polls the queue |
| verifier/queue_batcher_adapter.go | Adapter that forwards batcher output to the persistent queue |
| verifier/verification_coordinator.go | Factory function to create appropriate storage writer based on DB availability |
| verifier/types.go | Adds JobKey method to VerificationTask for queue integration |
| protocol/message_types.go | Adds JobKey method to VerifierNodeResult for queue integration |
| verifier/pkg/db/migrations/postgres/00004_create_job_queues.sql | Database schema for both verification_tasks and verification_results queues |
| verifier/testutil/test_db.go | Test utility for creating PostgreSQL testcontainers with migrations |
| verification_coordinator_*_test.go | Updates test setup to use database-backed queue |
| cmd/verifier/token/main.go | Passes database connection to coordinators |
| cmd/verifier/servicefactory.go | Passes database connection to coordinator |
| integration/pkg/constructors/committee_verifier.go | Explicitly passes nil for DB (backward compatibility) |
| executor/pkg/adapter/adapter.go | Minor code formatting improvement for type parameters |
| build/devenv/fakes/go.mod | Adds lib/pq dependency |
| build/devenv/fakes/go.sum | Adds dependencies for goose migrations and testcontainers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| go func() { | ||
| <-ctx.Done() | ||
| wg.Wait() | ||
| }() |
There was a problem hiding this comment.
The goroutine spawned on lines 33-36 will continue running until ctx.Done() is signaled, but there's no guarantee it will complete before the caller continues. This could lead to a goroutine leak if the function returns before the cleanup goroutine exits. Additionally, this cleanup goroutine waits for wg.Wait() which will only complete after forwardToQueue exits, creating a dependency where the cleanup goroutine waits for the forwarding goroutine. Consider using a more structured approach where the caller is responsible for cleanup, or ensure the cleanup goroutine signals its completion.
| // Publish items to queue | ||
| if len(batch.Items) > 0 { | ||
| publishCtx, cancel := context.WithTimeout(ctx, 10*time.Second) | ||
| if err := queue.Publish(publishCtx, batch.Items...); err != nil { | ||
| lggr.Errorw("Failed to publish to queue", "error", err, "count", len(batch.Items)) | ||
| } else { | ||
| lggr.Debugw("Published results to queue", "count", len(batch.Items)) | ||
| } | ||
| cancel() |
There was a problem hiding this comment.
When queue.Publish fails on line 66, the error is logged but the batch items are silently dropped. This could lead to data loss if the queue is temporarily unavailable or if there's a transient database issue. Since this is replacing the in-memory batcher that had retry capabilities, consider implementing a retry mechanism here or at least making the data loss more visible through metrics or critical logging.
| // Publish items to queue | |
| if len(batch.Items) > 0 { | |
| publishCtx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
| if err := queue.Publish(publishCtx, batch.Items...); err != nil { | |
| lggr.Errorw("Failed to publish to queue", "error", err, "count", len(batch.Items)) | |
| } else { | |
| lggr.Debugw("Published results to queue", "count", len(batch.Items)) | |
| } | |
| cancel() | |
| // Publish items to queue with bounded retries to reduce data loss on transient failures | |
| if len(batch.Items) > 0 { | |
| const maxPublishRetries = 3 | |
| for attempt := 1; attempt <= maxPublishRetries; attempt++ { | |
| publishCtx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
| err := queue.Publish(publishCtx, batch.Items...) | |
| cancel() | |
| if err == nil { | |
| lggr.Debugw("Published results to queue", "count", len(batch.Items)) | |
| break | |
| } | |
| if attempt < maxPublishRetries { | |
| lggr.Errorw( | |
| "Failed to publish to queue, will retry", | |
| "error", err, | |
| "count", len(batch.Items), | |
| "attempt", attempt, | |
| "maxAttempts", maxPublishRetries, | |
| ) | |
| // Simple exponential backoff between retries, but abort if context is done | |
| backoff := time.Duration(attempt) * time.Second | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(backoff): | |
| } | |
| } else { | |
| lggr.Errorw( | |
| "Failed to publish to queue after max retries, dropping batch", | |
| "error", err, | |
| "count", len(batch.Items), | |
| "maxAttempts", maxPublishRetries, | |
| ) | |
| } | |
| } |
verifier/jobqueue/postgres_queue.go
Outdated
| if time.Now().After(retryDeadline) || time.Now().Equal(retryDeadline) { | ||
| failed = append(failed, resultJobID) | ||
| } else { | ||
| retried = append(retried, resultJobID) | ||
| } |
There was a problem hiding this comment.
There's a potential race condition when comparing the current time with retry_deadline. The comparison on line 330 uses time.Now() which could be different from the NOW() used in the SQL query on line 276. This means a job could be marked as 'pending' in the database but then classified as 'failed' in the logging counters due to the time difference between SQL execution and Go code execution. Consider using the retry_deadline returned from the query to make the decision, comparing it against the time when the query was executed (captured before the query), not a new time.Now() call.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| // When the job was created | ||
| CreatedAt time.Time | ||
| // When processing started (nil if not started) | ||
| StartedAt *time.Time |
There was a problem hiding this comment.
could this be a NPE if we're not careful?
There was a problem hiding this comment.
In Go code we only write to that field; it's used in produce/consume sql queries, which won't result in NPE
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 19 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err := s.resultQueue.Complete(ctx, jobIDs...); err != nil { | ||
| s.lggr.Errorw("Failed to complete jobs in queue", | ||
| "error", err, | ||
| "batchSize", len(jobIDs), | ||
| ) | ||
| // Continue anyway - data is written, tracking will catch up | ||
| } | ||
|
|
||
| // Update checkpoints | ||
| s.updateCheckpoints(ctx, affectedChains) | ||
|
|
||
| // Track message latencies | ||
| s.messageTracker.TrackMessageLatencies(ctx, results) |
There was a problem hiding this comment.
When resultQueue.Complete() fails (lines 189-195), the code continues with checkpoint updates and message latency tracking. However, the jobs remain in 'processing' status in the database and will eventually be reclaimed and reprocessed, potentially leading to duplicate writes. Consider failing the entire operation or implementing compensating logic to handle this edge case.
| -- Index for archive cleanup | ||
| CREATE INDEX IF NOT EXISTS idx_verification_tasks_archive_completed | ||
| ON verification_tasks_archive (completed_at DESC); |
There was a problem hiding this comment.
The archive tables have indexes on completed_at for cleanup queries, but the Complete operation that inserts into the archive uses job_id for deletion from the main table. If there are queries that look up archived jobs by job_id (e.g., for debugging or audit), consider adding an index on job_id in the archive tables as well.
| } | ||
|
|
||
| // Publish items to queue | ||
| if len(batch.Items) > 0 { |
There was a problem hiding this comment.
The forwardToQueue function uses the parent ctx for the timeout context on line 65, but if the parent context is already canceled, context.WithTimeout will immediately return a canceled context. The error from queue.Publish could then be misleading (context canceled vs actual publish error). Consider checking if ctx.Err() is non-nil before attempting to publish, or use context.Background() with timeout for the publish operation.
| if len(batch.Items) > 0 { | |
| if len(batch.Items) > 0 { | |
| // If the parent context is already canceled, avoid creating a timed context | |
| // that is immediately canceled and producing a misleading publish error. | |
| if err := ctx.Err(); err != nil { | |
| lggr.Errorw("Skipping publish to queue due to context error", "error", err, "count", len(batch.Items)) | |
| return | |
| } |
| INSERT INTO %s | ||
| SELECT *, NOW() as completed_at |
There was a problem hiding this comment.
The SELECT *, NOW() as completed_at query assumes the archive table has the exact same columns as the main table plus completed_at. This is fragile because if the schema changes (e.g., new columns added), this INSERT will fail. Consider explicitly listing all columns to make schema evolution easier and failures more obvious.
| INSERT INTO %s | |
| SELECT *, NOW() as completed_at | |
| INSERT INTO %s ( | |
| job_id, | |
| owner_id, | |
| payload, | |
| priority, | |
| run_at, | |
| attempts, | |
| max_attempts, | |
| created_at, | |
| updated_at, | |
| task_job_id, | |
| completed_at | |
| ) | |
| SELECT | |
| job_id, | |
| owner_id, | |
| payload, | |
| priority, | |
| run_at, | |
| attempts, | |
| max_attempts, | |
| created_at, | |
| updated_at, | |
| task_job_id, | |
| NOW() AS completed_at |
| wg.Go(func() { | ||
| forwardToQueue(ctx, b.OutChannel(), queue, lggr) | ||
| }) |
There was a problem hiding this comment.
sync.WaitGroup from the standard library does not have a Go method. This code will fail to compile. Based on other files in the codebase that use wg.Go(), it appears you need to use a different WaitGroup type (likely from chainlink-common services package or a custom type). Check other files like storage_writer_db.go line 84 or storage_writer.go line 96 to see the correct import/type being used.
| wg.Go(func() { | |
| forwardToQueue(ctx, b.OutChannel(), queue, lggr) | |
| }) | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| forwardToQueue(ctx, b.OutChannel(), queue, lggr) | |
| }() |
| go func() { | ||
| <-ctx.Done() | ||
| wg.Wait() | ||
| }() |
There was a problem hiding this comment.
The goroutine spawned on line 33 only waits for the WaitGroup but never properly shuts down. If ctx is canceled but the forwardToQueue goroutine is blocked (e.g., waiting on a channel), this cleanup goroutine will wait indefinitely on wg.Wait(). Consider adding proper shutdown coordination or a timeout.
| // | ||
| //nolint:gofumpt | ||
| func queryWithFailover[TInput any, TResponse any]( | ||
| func queryWithFailover[TInput, TResponse any]( |
There was a problem hiding this comment.
This formatting change to the generic type parameters appears unrelated to the PR's stated purpose of adding a durable queue for storage writer. Consider moving unrelated formatting/style changes to a separate PR to keep this PR focused on its main objective.
|
Code coverage report:
|
| s.lggr.Errorw("Error processing batch", "error", err) | ||
| } | ||
|
|
||
| case <-cleanupTicker.C: |
There was a problem hiding this comment.
nit: Do we want to stall the consumption while we do the cleanup? Wondering if that could introduce some delays
|
|
||
| // Publish items to queue | ||
| if len(batch.Items) > 0 { | ||
| publishCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
There was a problem hiding this comment.
nit: Maybe we can move the 10 seconds to a constant or make it configurable
Using a DB-backed queue for processing in storageWriter. It's backward compatible. If db is not defined, we fallback to the previous implementation using memory - mainly because we need db migrations to be added in a chainlink repository as well