11# jqueue
22
33A lightweight, storage-agnostic job queue for Python that runs on top of ordinary object
4- storage — S3, GCS, a local file, or an in-memory buffer. No message broker, no database,
4+ storage ( S3, GCS, a local file, or an in-memory buffer) . No message broker, no database,
55no sidecar process required.
66
77Inspired by the [ turbopuffer object-storage queue pattern] ( https://turbopuffer.com/blog/object-storage-queue ) .
88
9- ---
10-
119## Contents
1210
1311- [ Why object storage?] ( #why-object-storage )
@@ -16,8 +14,8 @@ Inspired by the [turbopuffer object-storage queue pattern](https://turbopuffer.c
1614- [ Use cases] ( #use-cases )
1715- [ How it works] ( #how-it-works )
1816 - [ Compare-and-set writes] ( #compare-and-set-writes )
19- - [ DirectQueue — one CAS write per operation] ( #directqueue--one-cas-write-per-operation )
20- - [ BrokerQueue — group commit] ( #brokerqueue--group-commit )
17+ - [ DirectQueue: one CAS write per operation] ( #directqueue--one-cas-write-per-operation )
18+ - [ BrokerQueue: group commit] ( #brokerqueue--group-commit )
2119 - [ Heartbeats and stale-job recovery] ( #heartbeats-and-stale-job-recovery )
2220 - [ Wire format] ( #wire-format )
2321- [ Storage adapters] ( #storage-adapters )
@@ -31,32 +29,28 @@ Inspired by the [turbopuffer object-storage queue pattern](https://turbopuffer.c
3129- [ Architecture] ( #architecture )
3230- [ Limitations and trade-offs] ( #limitations-and-trade-offs )
3331
34- ---
35-
3632## Why object storage?
3733
3834Traditional job queues add a dependency: Redis, RabbitMQ, SQS, a Postgres table. Each comes
39- with operational overhead — provisioning, monitoring, capacity planning, and another failure
35+ with operational overhead ( provisioning, monitoring, capacity planning) and another failure
4036domain to manage.
4137
4238For workloads that don't need sub-millisecond latency or thousands of operations per second,
43- object storage is a surprisingly capable alternative :
39+ object storage works well :
4440
4541| Property | Object storage queue |
4642| ---| ---|
47- | ** Durability** | 11 nines (S3/GCS) — survives entire AZ outages |
43+ | ** Durability** | 11 nines (S3/GCS), survives entire AZ outages |
4844| ** Cost** | ~ $0.004/10 000 operations (S3 PUT pricing) |
49- | ** Infrastructure** | Zero — uses storage you already have |
45+ | ** Infrastructure** | None. Uses storage you already have |
5046| ** Concurrency safety** | CAS writes (If-Match / if_generation_match) |
5147| ** Exactly-once delivery** | Guaranteed by the CAS protocol |
52- | ** Ops/sec** | ~ 1– 100 ops/s depending on backend and batching |
48+ | ** Ops/sec** | ~ 1- 100 ops/s depending on backend and batching |
5349
5450The queue state lives in ** a single JSON file** . Every mutation is a conditional write that
5551only succeeds if the file hasn't changed since you last read it. Concurrent writers that lose
5652the race retry automatically.
5753
58- ---
59-
6054## Installation
6155
6256``` bash
@@ -75,8 +69,6 @@ pip install "jqueue[s3,gcs]"
7569
7670Requires Python 3.12+.
7771
78- ---
79-
8072## Quick start
8173
8274``` python
@@ -100,7 +92,7 @@ async def main():
10092asyncio.run(main())
10193```
10294
103- Switch to a real backend by swapping the storage adapter — the queue logic is identical:
95+ Switch to a real backend by swapping the storage adapter. The queue logic is identical:
10496
10597``` python
10698# Local file (single machine)
@@ -116,8 +108,6 @@ from jqueue.adapters.storage.gcs import GCSStorage
116108storage = GCSStorage(bucket_name = " my-bucket" , blob_name = " queues/jobs.json" )
117109```
118110
119- ---
120-
121111## Use cases
122112
123113### Background job processing
@@ -152,7 +142,7 @@ sms_queue = BrokerQueue(S3Storage(bucket="b", key="queues/sms.json"))
152142
153143### Priority work
154144
155- Lower ` priority ` value = processed first (think Unix ` nice ` ):
145+ Lower ` priority ` value = processed first (same convention as Unix ` nice ` ):
156146
157147``` python
158148await q.enqueue(" report" , b " payload" , priority = 0 ) # urgent
@@ -177,7 +167,7 @@ except Exception:
177167
178168### Testing without infrastructure
179169
180- ` InMemoryStorage ` implements the same interface — no mocking required :
170+ ` InMemoryStorage ` implements the same interface, so no mocking is needed :
181171
182172``` python
183173async def test_email_worker ():
@@ -201,8 +191,6 @@ storage = S3Storage(
201191)
202192```
203193
204- ---
205-
206194## How it works
207195
208196### Compare-and-set writes
@@ -244,7 +232,7 @@ write (If-Match: abc) → ✓ "xyz"
244232 write (If-Match: xyz) → ✓ "pqr"
245233```
246234
247- ### DirectQueue — one CAS write per operation
235+ ### DirectQueue: one CAS write per operation
248236
249237` DirectQueue ` is the simplest implementation. Every call to ` enqueue ` , ` dequeue ` , ` ack ` ,
250238` nack ` , or ` heartbeat ` performs its own independent CAS cycle:
@@ -257,10 +245,10 @@ enqueue("task", b"payload")
257245```
258246
259247** Retry policy:** up to 10 retries on ` CASConflictError ` , with linear back-off
260- (10 ms × attempt number).
248+ (10 ms x attempt number).
261249
262250Use ` DirectQueue ` when:
263- - throughput is ~ 1– 5 ops/s
251+ - throughput is ~ 1- 5 ops/s
264252- you want the simplest possible code path
265253- you're running a single worker
266254
@@ -273,13 +261,13 @@ job = await q.enqueue("task", b"data")
273261await q.ack(claimed.id)
274262```
275263
276- ### BrokerQueue — group commit
264+ ### BrokerQueue: group commit
277265
278266When multiple coroutines (or asyncio tasks) call the queue concurrently, each one would
279- normally trigger its own storage round-trip. With a 100 ms S3 latency, 10 concurrent
280- enqueues would take 10 × 100 ms = 1 second if serialized naively.
267+ normally trigger its own storage round-trip. With 100 ms S3 latency, 10 concurrent
268+ enqueues would take 10 x 100 ms = 1 second if serialized naively.
281269
282- ` BrokerQueue ` solves this with a ** group commit loop** (` GroupCommitLoop ` ) — a single
270+ ` BrokerQueue ` solves this with a ** group commit loop** (` GroupCommitLoop ` ), a single
283271background writer task that batches all pending operations into one CAS write:
284272
285273```
@@ -303,7 +291,7 @@ Writer: read → apply op₁,op₂,op₃ → CAS write → resolve fut
303291 the fresh state and retried with exponential back-off (up to 20 retries, capped at
304292 ~ 320 ms).
305293
306- ** Per-operation error isolation:** if one mutation in a batch raises (e.g., ` nack ` on a
294+ ** Per-operation error isolation:** if one mutation in a batch raises (e.g. ` nack ` on a
307295job that no longer exists), only that caller's future receives the exception. All other
308296operations in the same batch commit normally.
309297
@@ -314,7 +302,7 @@ Batch: [valid_enqueue, bad_nack, valid_dequeue]
314302```
315303
316304` BrokerQueue ` collapses N concurrent callers into O(1) storage operations per write
317- cycle, making it suitable for ~ 10– 100 ops/s depending on backend latency.
305+ cycle, making it suitable for ~ 10- 100 ops/s depending on backend latency.
318306
319307``` python
320308from jqueue import BrokerQueue, InMemoryStorage
@@ -349,7 +337,7 @@ async with HeartbeatManager(q, job.id, interval=timedelta(seconds=30)):
349337** Automatic stale recovery:** On every write cycle, ` BrokerQueue ` (via ` GroupCommitLoop ` )
350338sweeps ` IN_PROGRESS ` jobs and resets any whose ` heartbeat_at ` is older than
351339` stale_timeout ` (default: 5 minutes) back to ` QUEUED ` . This requires zero extra storage
352- operations — the sweep piggybacks on writes that are already happening.
340+ operations; the sweep piggybacks on writes that are already happening.
353341
354342` DirectQueue ` exposes this as an explicit call:
355343
@@ -388,15 +376,13 @@ The queue state is stored as pretty-printed JSON. Here's a complete example:
388376}
389377```
390378
391- - ` version ` — monotonically increasing counter, incremented on every successful write.
392- - ` payload ` — arbitrary bytes, base64-encoded for JSON compatibility.
393- - ` heartbeat_at ` — ` null ` when ` QUEUED ` ; set by ` dequeue ` and refreshed by ` heartbeat ` .
379+ - ` version ` : monotonically increasing counter, incremented on every successful write.
380+ - ` payload ` : arbitrary bytes, base64-encoded for JSON compatibility.
381+ - ` heartbeat_at ` : ` null ` when ` QUEUED ` ; set by ` dequeue ` and refreshed by ` heartbeat ` .
394382
395383Pydantic v2 handles all serialization, including base64 encoding of ` bytes ` fields and
396384ISO-8601 datetime formatting.
397385
398- ---
399-
400386## Storage adapters
401387
402388### InMemoryStorage
@@ -410,8 +396,7 @@ storage = InMemoryStorage(initial_content=b'{"version":0,"jobs":[]}')
410396```
411397
412398Uses an ` asyncio.Lock ` to serialise reads and writes. Safe for concurrent coroutines
413- in a single event loop. ** Not** safe across processes or threads. Ideal for tests and
414- local development.
399+ in a single event loop. Not safe across processes or threads. Good for tests.
415400
416401### LocalFileSystemStorage
417402
@@ -430,13 +415,13 @@ Uses `fcntl.flock` for POSIX exclusive locking. The etag is the file's `st_mtime
430415``` python
431416from jqueue.adapters.storage.s3 import S3Storage
432417
433- # Standard AWS — credentials from environment / IAM role
418+ # Standard AWS, credentials from environment / IAM role
434419storage = S3Storage(bucket = " my-bucket" , key = " queues/jobs.json" )
435420
436421# Explicit region
437422storage = S3Storage(bucket = " my-bucket" , key = " jobs.json" , region_name = " eu-central-1" )
438423
439- # S3-compatible (MinIO, Cloudflare R2, Tigris, … )
424+ # S3-compatible (MinIO, Cloudflare R2, Tigris, ... )
440425storage = S3Storage(
441426 bucket = " my-bucket" ,
442427 key = " jobs.json" ,
@@ -455,7 +440,7 @@ storage = S3Storage(
455440)
456441```
457442
458- Uses aioboto3 (async) with ` IfMatch ` conditional PutObject — the S3 conditional write
443+ Uses aioboto3 (async) with ` IfMatch ` conditional PutObject. This is the S3 conditional write
459444feature [ released in August 2024] ( https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ ) .
460445The etag is the S3 ` ETag ` response header.
461446
@@ -479,7 +464,7 @@ storage = GCSStorage(
479464```
480465
481466Uses ` if_generation_match ` for conditional writes. Generation 0 means "blob must not
482- exist yet" — used for the first write. The etag is the GCS object generation number
467+ exist yet", used for the first write. The etag is the GCS object generation number
483468(stringified integer).
484469
485470Since ` google-cloud-storage ` is synchronous, all GCS operations are wrapped in
@@ -512,11 +497,9 @@ class MyStorage:
512497 ...
513498```
514499
515- No base class, no registration — structural subtyping (duck typing) is sufficient.
500+ No base class, no registration. Structural subtyping (duck typing) is sufficient.
516501Pass your adapter directly to ` DirectQueue ` or ` BrokerQueue ` .
517502
518- ---
519-
520503## API reference
521504
522505### ` BrokerQueue ` / ` DirectQueue `
@@ -532,26 +515,26 @@ job: Job = await q.enqueue(
532515 priority: int = 0 , # lower = processed first
533516)
534517
535- # Dequeue — marks jobs IN_PROGRESS, returns empty list if none available
518+ # Dequeue: marks jobs IN_PROGRESS, returns empty list if none available
536519jobs: list[Job] = await q.dequeue(
537520 entrypoint: str | None = None , # None = any entrypoint
538521 * ,
539522 batch_size: int = 1 ,
540523)
541524
542- # Acknowledge — remove a completed job
525+ # Acknowledge: remove a completed job
543526await q.ack(job_id: str )
544527
545- # Negative-acknowledge — return a job to QUEUED for retry
528+ # Negative-acknowledge: return a job to QUEUED for retry
546529await q.nack(job_id: str )
547530
548- # Heartbeat — refresh the IN_PROGRESS timestamp
531+ # Heartbeat: refresh the IN_PROGRESS timestamp
549532await q.heartbeat(job_id: str )
550533
551534# Read-only snapshot (no CAS, no locking)
552535state: QueueState = await q.read_state()
553536
554- # DirectQueue only — explicit stale sweep
537+ # DirectQueue only: explicit stale sweep
555538requeued: int = await q.requeue_stale(timeout: timedelta)
556539```
557540
@@ -573,36 +556,34 @@ The task is cancelled when the context exits. If `heartbeat` raises `JQueueError
573556### ` Job `
574557
575558``` python
576- job.id # str — stable UUID assigned at enqueue time
559+ job.id # str, stable UUID assigned at enqueue time
577560job.entrypoint # str
578561job.payload # bytes
579562job.status # JobStatus.QUEUED | IN_PROGRESS | DEAD
580- job.priority # int — lower = higher priority
563+ job.priority # int, lower = higher priority
581564job.created_at # datetime (UTC)
582565job.heartbeat_at # datetime | None
583566```
584567
585- ` Job ` is a frozen Pydantic model — all fields are immutable.
568+ ` Job ` is a frozen Pydantic model. All fields are immutable.
586569
587570### ` QueueState `
588571
589572``` python
590573state.jobs # tuple[Job, ...]
591- state.version # int — incremented on every write
574+ state.version # int, incremented on every write
592575
593576state.queued_jobs(entrypoint = None ) # sorted by (priority, created_at)
594577state.in_progress_jobs()
595578state.find(job_id) # Job | None
596579```
597580
598- ---
599-
600581## Error handling
601582
602583``` python
603584from jqueue import (
604- JQueueError, # base class — catches all jqueue errors
605- CASConflictError, # CAS write rejected (etag mismatch) — usually retried internally
585+ JQueueError, # base class, catches all jqueue errors
586+ CASConflictError, # CAS write rejected (etag mismatch), usually retried internally
606587 JobNotFoundError, # job_id not in current state; has .job_id attribute
607588 StorageError, # I/O failure from the storage backend; has .cause attribute
608589)
@@ -618,11 +599,9 @@ present in the current queue state (e.g., it was already acked by another worker
618599try :
619600 await q.ack(job.id)
620601except JobNotFoundError:
621- pass # already removed — safe to ignore
602+ pass # already removed, safe to ignore
622603```
623604
624- ---
625-
626605## Architecture
627606
628607jqueue follows the ** Ports & Adapters** (hexagonal) pattern:
@@ -640,7 +619,7 @@ jqueue follows the **Ports & Adapters** (hexagonal) pattern:
640619│ ├── codec.py QueueState ↔ JSON bytes │
641620│ ├── direct.py DirectQueue (one CAS per op) │
642621│ ├── group_commit.py GroupCommitLoop (batched writes) │
643- │ ├── broker.py BrokerQueue (context manager façade ) │
622+ │ ├── broker.py BrokerQueue (context manager facade ) │
644623│ └── heartbeat.py HeartbeatManager │
645624├─────────────────────────────────────────────────────────────┤
646625│ adapters/storage/ │
@@ -651,27 +630,24 @@ jqueue follows the **Ports & Adapters** (hexagonal) pattern:
651630└─────────────────────────────────────────────────────────────┘
652631```
653632
654- ** Key design properties:**
655-
656- - ** Pure domain layer.** ` Job ` and ` QueueState ` are frozen Pydantic models with no
657- I/O dependencies. All mutations return new instances — no side effects.
658- - ** Protocol-based port.** ` ObjectStoragePort ` is a ` runtime_checkable ` Protocol. Any
659- two-method object satisfies it without inheritance.
660- - ** Codec separation.** ` codec.encode ` / ` codec.decode ` are the only place that knows
661- about the JSON wire format, keeping it easy to evolve.
662- - ** Zero shared state between writers.** Each CAS cycle reads a fresh snapshot.
663- There are no in-process caches that can go stale.
633+ Design notes:
664634
665- ---
635+ - ` Job ` and ` QueueState ` are frozen Pydantic models with no I/O dependencies. All
636+ mutations return new instances; no side effects.
637+ - ` ObjectStoragePort ` is a ` runtime_checkable ` Protocol. Any two-method object satisfies
638+ it without inheritance.
639+ - ` codec.encode ` / ` codec.decode ` are the only place that knows about the JSON wire
640+ format, keeping it easy to evolve.
641+ - Each CAS cycle reads a fresh snapshot. There are no in-process caches that can go stale.
666642
667643## Limitations and trade-offs
668644
669645| Concern | Detail |
670646| ---| ---|
671- | ** Throughput ceiling** | S3 conditional writes have ~ 50– 200 ms round-trip latency. ` DirectQueue ` tops out around 5– 20 ops/s; ` BrokerQueue ` can reach ~ 50– 100 ops/s by batching. |
647+ | ** Throughput ceiling** | S3 conditional writes have ~ 50- 200 ms round-trip latency. ` DirectQueue ` tops out around 5- 20 ops/s; ` BrokerQueue ` can reach ~ 50- 100 ops/s by batching. |
672648| ** Single-file bottleneck** | All operations contend on one object. This is fine for moderate workloads; for very high throughput, partition into multiple queues (one file per entrypoint). |
673649| ** Queue size** | The entire state is read and written on every operation. Keep queue depths reasonable (hundreds to low thousands of jobs). |
674650| ** No push / subscribe** | Workers must poll ` dequeue ` . There's no server-push mechanism. |
675- | ** POSIX only (filesystem)** | ` LocalFileSystemStorage ` uses ` fcntl.flock ` — Linux and macOS only, not NFS. |
651+ | ** POSIX only (filesystem)** | ` LocalFileSystemStorage ` uses ` fcntl.flock ` . Linux and macOS only, not NFS. |
676652| ** S3 conditional writes** | Requires the August 2024 S3 conditional write feature. Verify your S3-compatible backend supports ` IfMatch ` on PutObject before using ` S3Storage ` . |
677653| ** Not a database** | If you need complex queries, scheduling, or priority queues with millions of jobs, a purpose-built system (Postgres, Redis) is a better fit. |
0 commit comments