Skip to content

Commit c8f1223

Browse files
committed
fix: unprocessed jobs from unwatched queues block watched queues
There could be a condition where jobs were added to a queue that is not longer being watched, yet the jobs in those queues would continue appearing in the pending job list because the query to get pending jobs did not filter by queues being watched.
1 parent 08ccf0f commit c8f1223

File tree

4 files changed

+58
-33
lines changed

4 files changed

+58
-33
lines changed

backends/postgres/postgres_backend.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import (
55
"embed"
66
"errors"
77
"fmt"
8+
"log/slog"
9+
"maps"
810
"net/url"
911
"os"
12+
"slices"
13+
"strings"
1014
"sync"
1115
"time"
1216

@@ -26,8 +30,6 @@ import (
2630
"github.com/jackc/pgx/v5/pgxpool"
2731
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
2832
"github.com/robfig/cron"
29-
"golang.org/x/exp/slices"
30-
"golang.org/x/exp/slog"
3133
)
3234

3335
//go:embed migrations/*.sql
@@ -46,6 +48,7 @@ const (
4648
PendingJobsQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
4749
FROM neoq_jobs
4850
WHERE status NOT IN ('processed')
51+
AND queue IN ($1)
4952
AND run_after <= NOW()
5053
ORDER BY created_at ASC
5154
FOR UPDATE SKIP LOCKED
@@ -220,16 +223,13 @@ func (p *PgBackend) listenerManager(ctx context.Context) {
220223
p.listenerConnMu.Lock()
221224
p.listenerConn = lc
222225
p.mu.Lock()
223-
handlers := p.handlers
224-
p.mu.Unlock()
225-
226-
for queue := range handlers {
226+
for queue := range p.handlers {
227227
_, err = p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, queue))
228228
if err != nil {
229229
p.logger.Error("unable to listen on queue", slog.Any("error", err), slog.String("queue", queue))
230230
}
231231
}
232-
232+
p.mu.Unlock()
233233
p.listenerConnMu.Unlock()
234234

235235
p.logger.Debug("worker database connection established")
@@ -835,14 +835,22 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
835835

836836
// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
837837
//
838+
// The interval for this task does not need to be particularly short unless an application suffers frequent database
839+
// disconnects, as most pending jobs are picked up by LISTEN after being announced with NOTIFY.
840+
//
838841
// Past due jobs are fetched on the interval [neoq.DefaultPendingJobFetchInterval]
839842
// nolint: cyclop
840843
func (p *PgBackend) processPendingJobs(ctx context.Context) {
841844
go func(ctx context.Context) {
842845
var err error
843846
var conn *pgxpool.Conn
844847
var pendingJobs []*jobs.Job
845-
ticker := time.NewTicker(neoq.DefaultPendingJobFetchInterval)
848+
var ticker *time.Ticker
849+
if p.config.PendingJobCheckInterval > 0 {
850+
ticker = time.NewTicker(p.config.PendingJobCheckInterval)
851+
} else {
852+
ticker = time.NewTicker(neoq.DefaultPendingJobFetchInterval)
853+
}
846854

847855
// check for pending jobs on an interval until the context is canceled
848856
for {
@@ -1044,7 +1052,11 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
10441052
}
10451053

10461054
func (p *PgBackend) getPendingJobs(ctx context.Context, conn *pgxpool.Conn) (pendingJobs []*jobs.Job, err error) {
1047-
rows, err := conn.Query(ctx, PendingJobsQuery)
1055+
p.mu.Lock()
1056+
// convert watched queue map to string: "'queue1', 'queue2', ..." for use in Postgres IN statement
1057+
activeQueues := slices.Collect(maps.Keys(p.handlers))
1058+
p.mu.Unlock()
1059+
rows, err := conn.Query(ctx, PendingJobsQuery, strings.Join(activeQueues, ","))
10481060
if err != nil {
10491061
return
10501062
}

backends/postgres/postgres_backend_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,26 +1144,27 @@ func TestProcessPendingJobs(t *testing.T) {
11441144

11451145
ctx := context.Background()
11461146

1147-
// INSERTing jobs into the job queue before noeq is listening on any queues ensures that the new job is not announced, and when
1147+
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString), neoq.WithPendingJobCheckInterval(time.Duration(50*time.Millisecond)))
1148+
if err != nil {
1149+
t.Fatal(err)
1150+
}
1151+
defer nq.Shutdown(ctx)
1152+
1153+
// Adding jobs into the job queue before noeq is listening on any queues ensures that the new job is not announced, and when
11481154
// neoq _is_ started, that there is a pending jobs waiting to be processed
11491155
payload := map[string]interface{}{
11501156
"message": "hello world",
11511157
}
11521158
var pendingJobID string
1153-
err := conn.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
1154-
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
1155-
queue, "dummy", payload, time.Now().UTC(), nil, 1).Scan(&pendingJobID)
1159+
pendingJobID, err = nq.Enqueue(ctx, &jobs.Job{
1160+
Queue: queue,
1161+
Payload: payload,
1162+
})
11561163
if err != nil {
11571164
t.Error(fmt.Errorf("unable to add job to queue: %w", err))
11581165
return
11591166
}
11601167

1161-
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
1162-
if err != nil {
1163-
t.Fatal(err)
1164-
}
1165-
defer nq.Shutdown(ctx)
1166-
11671168
h := handler.New(queue, func(_ context.Context) (err error) {
11681169
return
11691170
})

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/acaloiaro/neoq
22

3-
go 1.21
3+
go 1.24.0
44

55
require (
66
github.com/golang-migrate/migrate/v4 v4.16.2

neoq.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
DefaultPendingJobFetchInterval = 60 * time.Second
1919
// the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to
2020
// schdule the job for execution.
21-
// E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine
21+
// E.g. right now is 16:00:00 and a job's RunAfter is 16:00:30 of the same date. This job will get a dedicated goroutine
2222
// to wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter
2323
DefaultFutureJobWindow = 30 * time.Second
2424
DefaultJobCheckInterval = 1 * time.Second
@@ -32,18 +32,19 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified")
3232
// backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a
3333
// per-handler basis.
3434
type Config struct {
35-
BackendInitializer BackendInitializer
36-
BackendAuthPassword string // password with which to authenticate to the backend
37-
BackendConcurrency int // total number of backend processes available to process jobs
38-
ConnectionString string // a string containing connection details for the backend
39-
JobCheckInterval time.Duration // the interval of time between checking for new future/past-due jobs
40-
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
41-
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
42-
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
43-
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
44-
LogLevel logging.LogLevel // the log level of the default logger
45-
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
46-
RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
35+
BackendInitializer BackendInitializer
36+
BackendAuthPassword string // password with which to authenticate to the backend
37+
BackendConcurrency int // total number of backend processes available to process jobs
38+
ConnectionString string // a string containing connection details for the backend
39+
JobCheckInterval time.Duration // the interval of time between checking for new future jobs
40+
PendingJobCheckInterval time.Duration // duration of time between checking for unprocessed jobs due to downtime/db disconnects
41+
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
42+
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
43+
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
44+
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
45+
LogLevel logging.LogLevel // the log level of the default logger
46+
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
47+
RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
4748
}
4849

4950
// ConfigOption is a function that sets optional backend configuration
@@ -139,6 +140,17 @@ func WithJobCheckInterval(interval time.Duration) ConfigOption {
139140
}
140141
}
141142

143+
// WithPendingJobCheckInterval configures the duration of time between checking for unprocessed jobs on watched queues.
144+
//
145+
// Pending jobs are jobs that were queued while neoq wasn't listening or while a disconnect occurred. Most jobs are
146+
// processed via Postgres' LISTEN/NOTIFY facilities. This check interval is a last resort and does not need to be short.
147+
// Be careful not to hammer your database by setting this too low.
148+
func WithPendingJobCheckInterval(interval time.Duration) ConfigOption {
149+
return func(c *Config) {
150+
c.PendingJobCheckInterval = interval
151+
}
152+
}
153+
142154
// WithLogLevel configures the log level for neoq's default logger. By default, log level is "INFO".
143155
// if SetLogger is used, WithLogLevel has no effect on the set logger
144156
func WithLogLevel(level logging.LogLevel) ConfigOption {

0 commit comments

Comments
 (0)