Skip to content

Commit 58d41a9

Browse files
CCIP-9463 Durable queue for storage writer (#723)
* Adding durable job queue * Fixes * Fixes * Fixes * Fixes * Removing redundant code * Fixes * Fixes * Tests for the postgres_queue * Bench for db * Bench for db * Bench for db * Fixes * Switch attempts from number to duration * Switch attempts from number to duration * Adjustments * Update verifier/jobqueue/postgres_queue.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update verifier/verification_coordinator_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Post review fixes * Post review fixes * Post review fixes * Post review fixes * Post review fixes * Post review fixes * Post review fixes * Post review fixes --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent fc541c9 commit 58d41a9

23 files changed

+3081
-13
lines changed

build/devenv/fakes/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ require (
6767
github.com/klauspost/compress v1.18.2 // indirect
6868
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
6969
github.com/leodido/go-urn v1.4.0 // indirect
70+
github.com/lib/pq v1.10.9 // indirect
7071
github.com/lufia/plan9stats v0.0.0-20251013123823-9fd1530e3ec3 // indirect
7172
github.com/magiconair/properties v1.8.10 // indirect
7273
github.com/mailru/easyjson v0.9.0 // indirect

build/devenv/fakes/go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
204204
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
205205
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
206206
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
207+
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
208+
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
207209
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
208210
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
209211
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmLKZf+SjVanKKhCgf3bg+511DmU9eDQTen7LLbY=
@@ -252,6 +254,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
252254
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
253255
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
254256
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
257+
github.com/pressly/goose/v3 v3.26.0 h1:KJakav68jdH0WDvoAcj8+n61WqOIaPGgH0bJWS6jpmM=
258+
github.com/pressly/goose/v3 v3.26.0/go.mod h1:4hC1KrritdCxtuFsqgs1R4AU5bWtTAf+cnWvfhf2DNY=
255259
github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc=
256260
github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE=
257261
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
@@ -271,6 +275,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6Ng
271275
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
272276
github.com/scylladb/go-reflectx v1.0.1 h1:b917wZM7189pZdlND9PbIJ6NQxfDPfBvUaQ7cjj1iZQ=
273277
github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc=
278+
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
279+
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
274280
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
275281
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
276282
github.com/shirou/gopsutil/v4 v4.25.9 h1:JImNpf6gCVhKgZhtaAHJ0serfFGtlfIlSC08eaKdTrU=
@@ -323,6 +329,8 @@ github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOG
323329
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
324330
github.com/testcontainers/testcontainers-go v0.39.0 h1:uCUJ5tA+fcxbFAB0uP3pIK3EJ2IjjDUHFSZ1H1UxAts=
325331
github.com/testcontainers/testcontainers-go v0.39.0/go.mod h1:qmHpkG7H5uPf/EvOORKvS6EuDkBUPE3zpVGaH9NL7f8=
332+
github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0 h1:REJz+XwNpGC/dCgTfYvM4SKqobNqDBfvhq74s2oHTUM=
333+
github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0/go.mod h1:4K2OhtHEeT+JSIFX4V8DkGKsyLa96Y2vLdd3xsxD5HE=
326334
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
327335
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
328336
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=

cmd/verifier/servicefactory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ func (f *factory) Start(ctx context.Context, spec string, deps bootstrap.Service
248248
verifierMonitoring,
249249
chainStatusManager,
250250
observedHeartbeatClient,
251+
chainStatusDB.DB,
251252
)
252253
if err != nil {
253254
coordinatorCancel()

cmd/verifier/token/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"database/sql"
56
"fmt"
67
"net/http"
78
"os"
@@ -132,6 +133,7 @@ func main() {
132133
messageTracker,
133134
verifierMonitoring,
134135
monitoredChainStatusManager,
136+
sqlDB.DB,
135137
)
136138
} else if verifierConfig.IsCCTP() {
137139
coordinator = createCCTPCoordinator(
@@ -149,6 +151,7 @@ func main() {
149151
messageTracker,
150152
verifierMonitoring,
151153
monitoredChainStatusManager,
154+
sqlDB.DB,
152155
)
153156
} else {
154157
lggr.Fatalw("Unknown verifier type", "type", verifierConfig.Type)
@@ -217,6 +220,7 @@ func createCCTPCoordinator(
217220
messageTracker verifier.MessageLatencyTracker,
218221
verifierMonitoring verifier.Monitoring,
219222
chainStatusManager protocol.ChainStatusManager,
223+
db *sql.DB,
220224
) *verifier.Coordinator {
221225
cctpSourceConfigs := createSourceConfigs(cctpConfig.ParsedVerifierResolvers, rmnRemoteAddresses)
222226

@@ -245,6 +249,7 @@ func createCCTPCoordinator(
245249
verifierMonitoring,
246250
chainStatusManager,
247251
heartbeatclient.NewNoopHeartbeatClient(),
252+
db,
248253
)
249254
if err != nil {
250255
lggr.Errorw("Failed to create verification coordinator for cctp", "error", err)
@@ -264,6 +269,7 @@ func createLombardCoordinator(
264269
messageTracker verifier.MessageLatencyTracker,
265270
verifierMonitoring verifier.Monitoring,
266271
chainStatusManager protocol.ChainStatusManager,
272+
db *sql.DB,
267273
) *verifier.Coordinator {
268274
sourceConfigs := createSourceConfigs(lombardConfig.ParsedVerifierResolvers, rmnRemoteAddresses)
269275

@@ -298,6 +304,7 @@ func createLombardCoordinator(
298304
verifierMonitoring,
299305
chainStatusManager,
300306
heartbeatclient.NewNoopHeartbeatClient(),
307+
db,
301308
)
302309
if err != nil {
303310
lggr.Errorw("Failed to create verification coordinator for lombard", "error", err)

executor/pkg/adapter/adapter.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ func (ira *IndexerReaderAdapter) setActiveClientIdx(idx int) {
7777
}
7878

7979
// queryWithFailover implements the common failover logic for all query methods.
80-
//
81-
//nolint:gofumpt
82-
func queryWithFailover[TInput any, TResponse any](
80+
func queryWithFailover[TInput, TResponse any](
8381
ctx context.Context,
8482
ira *IndexerReaderAdapter,
8583
input TInput,

integration/pkg/constructors/committee_verifier.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func NewVerificationCoordinator(
183183
verifierMonitoring,
184184
chainStatusManager,
185185
observedHeartbeatClient,
186+
nil, // Don't use it until db migration is done in the Chainlink Node
186187
)
187188
if err != nil {
188189
lggr.Errorw("Failed to create verification coordinator", "error", err)

protocol/message_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,11 @@ type VerifierNodeResult struct {
587587
Signature ByteSlice `json:"signature"`
588588
}
589589

590+
// JobKey implements jobqueue.Jobable interface.
591+
func (vr VerifierNodeResult) JobKey() (chainSelector, messageID string) {
592+
return vr.Message.SourceChainSelector.String(), vr.MessageID.String()
593+
}
594+
590595
func (vr *VerifierResult) ValidateFieldsConsistent() error {
591596
err := vr.Message.ValidateCCVAndExecutorHash(vr.MessageCCVAddresses, vr.MessageExecutorAddress)
592597
if err != nil {

verifier/jobqueue/interface.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package jobqueue
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// JobStatus represents the current state of a job in the queue.
9+
type JobStatus string
10+
11+
const (
12+
JobStatusPending JobStatus = "pending"
13+
JobStatusProcessing JobStatus = "processing"
14+
JobStatusCompleted JobStatus = "completed"
15+
JobStatusFailed JobStatus = "failed"
16+
)
17+
18+
// Jobable is the interface that job payloads must implement to be stored in the queue.
19+
// It provides chain selector and message ID for database indexing and querying.
20+
type Jobable interface {
21+
// JobKey returns the chain selector and message ID for this job.
22+
// These are used for database indexing, querying, and job routing.
23+
JobKey() (chainSelector, messageID string)
24+
}
25+
26+
// Job wraps a payload with queue metadata.
27+
type Job[T Jobable] struct {
28+
// Unique job identifier
29+
ID string
30+
// The actual payload to process
31+
Payload T
32+
// Number of times this job has been attempted
33+
AttemptCount int
34+
// Deadline after which retries are no longer allowed
35+
RetryDeadline time.Time
36+
// When the job was created
37+
CreatedAt time.Time
38+
// When processing started (nil if not started)
39+
StartedAt *time.Time
40+
// Chain selector for routing and monitoring
41+
ChainSelector string
42+
// Message ID for deduplication and tracking
43+
MessageID string
44+
}
45+
46+
// JobQueue defines a generic durable queue interface backed by persistent storage.
47+
// The queue supports delayed retry, dead letter handling, and concurrent processing.
48+
// Type T must implement Jobable to provide chain selector and message ID.
49+
type JobQueue[T Jobable] interface {
50+
// Publish adds one or more jobs to the queue.
51+
// Jobs are immediately available for consumption unless a delay is specified.
52+
Publish(ctx context.Context, jobs ...T) error
53+
// PublishWithDelay adds jobs that become available after the specified delay.
54+
// Useful for implementing retry backoff strategies.
55+
PublishWithDelay(ctx context.Context, delay time.Duration, jobs ...T) error
56+
// Consume retrieves and locks up to batchSize jobs for processing.
57+
// Jobs in 'pending' or 'failed' status that are past their available_at time are eligible.
58+
// Additionally, jobs stuck in 'processing' for longer than the configured LockDuration
59+
// are considered stale (e.g. from a crashed worker) and are automatically reclaimed.
60+
// Returns empty slice if no jobs are available.
61+
//
62+
// The implementation should use SELECT FOR UPDATE SKIP LOCKED to ensure
63+
// concurrent consumers don't compete for the same jobs.
64+
Consume(ctx context.Context, batchSize int) ([]Job[T], error)
65+
// Complete marks jobs as successfully processed and removes them from active queue.
66+
// Completed jobs may be moved to an archive table for audit purposes.
67+
Complete(ctx context.Context, jobIDs ...string) error
68+
// Retry schedules jobs for retry after the specified delay.
69+
// Increments attempt count and records the error message.
70+
// If max attempts is exceeded, jobs are moved to failed status.
71+
Retry(ctx context.Context, delay time.Duration, errors map[string]error, jobIDs ...string) error
72+
// Fail marks jobs as permanently failed.
73+
// These jobs will not be retried and should be investigated.
74+
Fail(ctx context.Context, errors map[string]error, jobIDs ...string) error
75+
// Cleanup archives or deletes jobs older than the retention period.
76+
// Should be called periodically to prevent unbounded table growth.
77+
Cleanup(ctx context.Context, retentionPeriod time.Duration) (int, error)
78+
// Name returns the queue name for logging and monitoring
79+
Name() string
80+
}
81+
82+
// QueueConfig contains configuration for queue behavior.
83+
type QueueConfig struct {
84+
// Queue name for logging and table naming
85+
Name string
86+
// OwnerID scopes jobs so multiple verifiers sharing the same table
87+
// only consume their own jobs (e.g. "CCTPVerifier", "LombardVerifier").
88+
OwnerID string
89+
// RetryDuration is how long from creation a job is eligible for retry.
90+
// After this duration elapses, a failed retry marks the job as permanently failed.
91+
RetryDuration time.Duration
92+
// LockDuration is how long a job can remain in 'processing' before it is
93+
// considered stale and automatically reclaimed by the next Consume call.
94+
LockDuration time.Duration
95+
}

0 commit comments

Comments
 (0)