Skip to content

Commit 8a6e143

Browse files
committed
Add deep-dive section to README covering internals, scaling, and production guidance
Adds a conceptual deep-dive section explaining the group commit algorithm, storage adapter etag strategies, performance characteristics, stale job recovery mechanics, and production deployment patterns. Also fixes stale references to st_mtime_ns (now SHA-256) and corrects the DirectQueue retry back-off formula.
1 parent 0cc9175 commit 8a6e143

File tree

1 file changed

+336
-3
lines changed

1 file changed

+336
-3
lines changed

README.md

Lines changed: 336 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ Inspired by the [turbopuffer object-storage queue pattern](https://turbopuffer.c
2727
- [API reference](#api-reference)
2828
- [Error handling](#error-handling)
2929
- [Architecture](#architecture)
30+
- [Deep dive](#deep-dive)
31+
- [The group commit algorithm](#the-group-commit-algorithm)
32+
- [Storage adapters and etag strategies](#storage-adapters-and-etag-strategies)
33+
- [Performance and scaling](#performance-and-scaling)
34+
- [Stale job recovery in depth](#stale-job-recovery-in-depth)
35+
- [Production deployment](#production-deployment)
3036
- [Limitations and trade-offs](#limitations-and-trade-offs)
3137

3238
## Why object storage?
@@ -212,7 +218,7 @@ The **etag** is an opaque version token returned by the storage backend:
212218
|---|---|
213219
| S3 / MinIO / R2 | HTTP `ETag` response header |
214220
| GCS | Object `generation` number (integer, stringified) |
215-
| Filesystem | `st_mtime_ns` (nanosecond mtime) |
221+
| Filesystem | SHA-256 content hash |
216222
| InMemory | Monotonic integer counter |
217223

218224
Because two writers can't both satisfy the same `If-Match` condition, the CAS protocol
@@ -245,7 +251,7 @@ enqueue("task", b"payload")
245251
```
246252

247253
**Retry policy:** up to 10 retries on `CASConflictError`, with linear back-off
248-
(10 ms x attempt number).
254+
(10 ms, 20 ms, 30 ms, ...).
249255

250256
Use `DirectQueue` when:
251257
- throughput is ~1-5 ops/s
@@ -407,7 +413,8 @@ storage = LocalFileSystemStorage("/var/lib/myapp/queue.json")
407413
# accepts str or pathlib.Path; parent directories are created automatically
408414
```
409415

410-
Uses `fcntl.flock` for POSIX exclusive locking. The etag is the file's `st_mtime_ns`.
416+
Uses `fcntl.flock` for POSIX exclusive locking. The etag is a SHA-256 hex digest of
417+
the file content.
411418
**POSIX-only** (Linux, macOS). Not safe across machines or on NFS.
412419

413420
### S3Storage
@@ -640,6 +647,332 @@ Design notes:
640647
format, keeping it easy to evolve.
641648
- Each CAS cycle reads a fresh snapshot. There are no in-process caches that can go stale.
642649

650+
## Deep dive
651+
652+
### The group commit algorithm
653+
654+
The fundamental problem `GroupCommitLoop` solves is **amortising storage latency across
655+
concurrent callers**. An S3 PUT takes ~100 ms. If ten coroutines each enqueue a job
656+
independently, they would spend a full second in serialised round-trips. Group commit
657+
folds all ten mutations into a single read-modify-write cycle, bringing the wall-clock
658+
cost down to ~100 ms regardless of how many operations were buffered.
659+
660+
The idea comes from database write-ahead log design: instead of flushing to disk on
661+
every transaction commit, the database groups concurrent commits and flushes once. jqueue
662+
applies the same principle to object storage writes (see `core/group_commit.py`).
663+
664+
**How callers interact with the writer.** Every public method (`enqueue`, `dequeue`,
665+
`ack`, etc.) creates a pure function `QueueState -> QueueState` and hands it to the
666+
internal `_submit` method. `_submit` does three things: appends the function and a
667+
`Future` to a pending buffer, wakes a background writer task via an `asyncio.Event`, and
668+
then suspends the caller on the future. The caller doesn't touch storage at all — it
669+
simply waits for the writer to tell it what happened.
670+
671+
An important detail: `enqueue` allocates the job UUID *before* submitting the mutation
672+
(`core/group_commit.py:113`). This means the caller gets a stable job ID back even if
673+
the batch is retried multiple times due to CAS conflicts. The ID is captured by the
674+
closure and replayed identically on every attempt.
675+
676+
**The writer loop** is a single `asyncio.Task` that runs for the lifetime of the
677+
`BrokerQueue` context. When work arrives it drains the entire pending buffer into a
678+
list, clears the buffer, and calls `_commit_batch`. While `_commit_batch` is running
679+
(blocked on storage I/O), new callers keep appending to the now-empty buffer — they
680+
become the *next* batch. This is what produces the pipelining effect: I/O and mutation
681+
accumulation happen in parallel.
682+
683+
On shutdown, the loop condition `while not self._stopped or self._pending` guarantees
684+
that any operations submitted before `stop()` are flushed before the task exits.
685+
686+
**The commit cycle** (`_commit_batch`) does three things per attempt:
687+
688+
1. **Read and decode.** Fetch the current state blob and its etag. Before applying
689+
the batch, sweep stale IN_PROGRESS jobs — this piggybacks on the write that's about
690+
to happen, so stale recovery is free.
691+
2. **Apply mutations sequentially.** Each mutation function is called against the
692+
evolving state. If one raises (e.g. `nack` on a job that doesn't exist), the
693+
exception is captured in a per-index map but the remaining mutations still run.
694+
This is the **per-operation error isolation** guarantee: a bad operation in a batch
695+
doesn't poison the good ones.
696+
3. **CAS write.** The new state is written with `if_match=etag`. On success, each
697+
caller's future is resolved — either with `None` (success) or the captured
698+
exception. On `CASConflictError`, the entire batch is retried on fresh state.
699+
700+
The alternative design would be to reject the whole batch on any single failure, but
701+
that creates a fairness problem: one misbehaving caller could block everyone else in the
702+
same batch window.
703+
704+
**Retry strategies** differ between the two queue implementations. `DirectQueue` uses
705+
linear back-off (`10 ms * (attempt + 1)`, max 10 retries) — simple, predictable, and
706+
sufficient when only one or two writers contend. `GroupCommitLoop` uses exponential
707+
back-off (`5 ms * 2^min(attempt, 6)`, capped at ~320 ms, max 20 retries) because it is the
708+
more likely choice under higher contention where thundering-herd effects matter. The
709+
exponential curve spreads competing writers apart in time more effectively than a
710+
linear ramp.
711+
712+
### Storage adapters and etag strategies
713+
714+
The storage port (`ports/storage.py`) is deliberately minimal: `read` returns bytes and
715+
an opaque etag; `write` accepts bytes and an optional `if_match` etag, raising
716+
`CASConflictError` when the condition fails. Everything else — retry logic, batching,
717+
serialisation — lives outside the adapter. This makes it possible to write a new backend
718+
in roughly 30 lines without understanding any queue internals.
719+
720+
**What is an etag and why does it matter?** An etag is a version token that represents
721+
the state of the stored blob at a specific point in time. By passing it back on the
722+
next write (`if_match`), you tell the storage backend "only accept this write if nobody
723+
else has modified the object since I read it." If someone has, the write is rejected and
724+
the caller retries with fresh state. This is the compare-and-set (CAS) guarantee that
725+
makes the whole system work without locks or coordination.
726+
727+
Each backend produces etags differently because each has different native versioning
728+
primitives:
729+
730+
| Backend | Etag | Why this approach |
731+
|---|---|---|
732+
| InMemory | Monotonic counter | Cheapest possible; perfectly ordered |
733+
| Filesystem | SHA-256 of file content | Content-based — handles rapid writes where mtime can collide |
734+
| S3 | HTTP `ETag` header | S3 computes this natively on every PUT |
735+
| GCS | Object generation number | GCS increments this atomically on every write |
736+
737+
The filesystem choice deserves elaboration. A content hash is used instead of a
738+
timestamp like `mtime` because two rapid writes can produce identical timestamps,
739+
silently breaking CAS. A SHA-256 digest always differs when the content changes. The
740+
trade-off is one hash computation per read and write, but for the file sizes jqueue
741+
produces (kilobytes) this is negligible.
742+
743+
**First-write semantics.** When the queue is brand new and no blob exists yet, the
744+
adapter receives `if_match=None`. S3 and GCS handle this differently. S3 simply omits
745+
the `IfMatch` header, making the first write unconditional — if two processes race to
746+
create the blob, the last one wins silently. GCS uses `if_generation_match=0`, which is
747+
a GCS convention meaning "only succeed if the object does not exist yet." This means
748+
GCS's first write is itself conditional — a useful extra safety net against concurrent
749+
initialisers, but it also means you can get a `CASConflictError` on the very first write
750+
if two processes start simultaneously.
751+
752+
In practice this rarely matters because the retry loop handles it, but it's worth
753+
understanding if you're debugging queue initialisation issues on GCS.
754+
755+
**Error classification.** The adapters distinguish three kinds of failures:
756+
`CASConflictError` (expected, retried automatically), `StorageError` (I/O problem with
757+
the backend, wraps the original exception in `.cause`), and everything else (let
758+
through as-is). Both cloud adapters use the same pattern: catch the specific
759+
precondition-failure error from their SDK and translate it to `CASConflictError`, then
760+
wrap anything unexpected in `StorageError`. The S3 adapter needs a defensive helper
761+
(`s3.py:124-135`) to extract error codes from botocore's `ClientError` because the
762+
SDK's error structure is deeply nested and inconsistent across error types.
763+
764+
**Sync-to-async wrapping.** The GCS Python SDK (`google-cloud-storage`) is fully
765+
synchronous. Calling it directly from an async context would block the event loop. Both
766+
the GCS and filesystem adapters solve this by running their synchronous implementations
767+
in a thread-pool worker via `asyncio.to_thread`. This keeps the event loop free to
768+
process heartbeats, dequeues, and other concurrent work while waiting on file I/O or
769+
HTTP calls.
770+
771+
**Writing your own adapter** requires no base class. `ObjectStoragePort` is a
772+
`runtime_checkable` Protocol: if your class has the right `read` and `write` signatures,
773+
it satisfies the interface via structural subtyping. Pass it directly to `DirectQueue`
774+
or `BrokerQueue`.
775+
776+
### Performance and scaling
777+
778+
**Throughput depends almost entirely on storage round-trip time.** CPU time for
779+
JSON serialisation and in-memory mutation is negligible compared to the I/O.
780+
`DirectQueue` performs one round-trip per operation; `BrokerQueue` performs one
781+
round-trip per batch:
782+
783+
| Backend | Round-trip latency | DirectQueue | BrokerQueue (batch ~10) |
784+
|---|---|---|---|
785+
| InMemory | < 1 ms | ~500 ops/s | ~2 000 ops/s |
786+
| Filesystem | 1-5 ms | ~100 ops/s | ~500 ops/s |
787+
| S3 / GCS (same region) | 50-150 ms | ~5-15 ops/s | ~50-100 ops/s |
788+
789+
The key multiplier for `BrokerQueue` is the average batch size, which grows naturally
790+
with concurrency: the more callers are waiting while a write is in-flight, the larger
791+
the next batch. Under light load the batch size is 1 and `BrokerQueue` behaves like
792+
`DirectQueue` plus some overhead. Under heavy load it can collapse 50+ operations into
793+
a single write.
794+
795+
**The real scaling bottleneck is the full-state read-write cycle.** Every operation
796+
reads the entire JSON blob, deserialises it, applies the mutation, serialises it back,
797+
and writes the whole blob. As the queue grows, so does the blob. At ~1 000 jobs the
798+
JSON payload is roughly 200-400 KB — still fast to transfer and parse. Beyond that,
799+
serialisation time and transfer size start to matter, and CAS conflict rates climb
800+
because longer writes create wider windows for races.
801+
802+
Three design choices in the domain layer help keep per-operation cost low:
803+
804+
1. **Immutable tuples for the job list** (`domain/models.py:100`). Using
805+
`tuple[Job, ...]` instead of `list[Job]` prevents accidental in-place mutation,
806+
which would corrupt shared state during batch application where the same state
807+
object is transformed by multiple closures in sequence.
808+
809+
2. **Early return on no-change** (`domain/models.py:175-176`). `requeue_stale()` is
810+
called on every write cycle. Most of the time no jobs are stale. When that's the
811+
case, it returns `self` without incrementing the version, which avoids a
812+
pointless JSON encode/write cycle.
813+
814+
3. **Lazy generator chains for queries** (`domain/models.py:109-112`). `queued_jobs()`
815+
chains generators for filtering by status and entrypoint before sorting. This avoids
816+
allocating intermediate lists — the only materialised collection is the final sorted
817+
tuple.
818+
819+
**When to partition.** If your workload exceeds what a single blob can handle, the
820+
simplest fix is to use one JSON file per entrypoint. Each file has its own contention
821+
domain, so an email queue and an SMS queue no longer race against each other:
822+
823+
```python
824+
email_q = BrokerQueue(S3Storage(bucket="b", key="queues/email.json"))
825+
sms_q = BrokerQueue(S3Storage(bucket="b", key="queues/sms.json"))
826+
```
827+
828+
Partition when queue depth routinely exceeds ~1 000 jobs, when throughput exceeds
829+
~50 ops/s on S3/GCS, or when logically independent workloads share a queue for no
830+
good reason.
831+
832+
### Stale job recovery in depth
833+
834+
When `dequeue` claims a job, it sets `heartbeat_at` to `now`. If the worker then
835+
crashes or gets partitioned from storage, heartbeats stop arriving and the job becomes
836+
**stale** — still marked `IN_PROGRESS` but no longer making progress.
837+
838+
The detection rule (`domain/models.py:160-177`) is: an `IN_PROGRESS` job is stale if
839+
its `heartbeat_at` is either `None` or older than the cutoff. The `None` case matters:
840+
it covers workers that crash between `dequeue` (which sets the initial `heartbeat_at`)
841+
and the first `HeartbeatManager` tick. Without it, such jobs would be stuck in
842+
`IN_PROGRESS` forever.
843+
844+
When no jobs are stale, `requeue_stale` returns the state object unchanged (same
845+
identity, no version increment). This is a deliberate optimisation: the method is
846+
called on *every* write cycle inside `GroupCommitLoop`, so the common-case fast path
847+
must be cheap.
848+
849+
**Automatic vs manual recovery.** `BrokerQueue` sweeps stale jobs inside `_commit_batch`
850+
on every write cycle, just before applying the caller's mutations. This costs zero
851+
extra I/O — it piggybacks on writes that are already happening. If no callers are
852+
submitting operations, no writes happen and no sweep runs, but that also means no jobs
853+
are being created or claimed, so there's nothing to recover.
854+
855+
`DirectQueue` has no background task, so stale recovery is an explicit call:
856+
`await q.requeue_stale(timeout=timedelta(minutes=5))`. You can call this from a cron
857+
job, a health-check endpoint, or a periodic asyncio task. It performs its own CAS
858+
cycle, so it costs one storage round-trip.
859+
860+
**HeartbeatManager and the `_HasHeartbeat` Protocol.** `HeartbeatManager` doesn't know
861+
or care which queue implementation it talks to. It's typed against a structural
862+
Protocol with a single method: `async heartbeat(job_id: str) -> None`. Any object
863+
satisfying that signature works — `BrokerQueue`, `DirectQueue`, `GroupCommitLoop`, or a
864+
test double.
865+
866+
Inside the manager, a background task sleeps for `interval` seconds, then calls
867+
`heartbeat`. If the call raises `JQueueError` (which includes `JobNotFoundError` and
868+
`StorageError`), the task exits silently — the assumption is that the job has already
869+
been handled or the connection is lost and there's no useful action to take. On context
870+
exit, the task is cancelled.
871+
872+
**Tuning guidelines:**
873+
874+
| Job duration | Heartbeat interval | Stale timeout |
875+
|---|---|---|
876+
| < 1 minute | 10 s | 30 s |
877+
| 1 - 10 minutes | 30 s | 2 min |
878+
| > 10 minutes | 60 s | 5 - 10 min |
879+
880+
The rule of thumb is **`stale_timeout >= 3 * heartbeat_interval`**. The multiplier
881+
accounts for transient failures: a single missed heartbeat shouldn't trigger recovery.
882+
Two missed heartbeats might mean a problem. Three is a strong signal the worker is gone.
883+
Setting the timeout too tight causes premature requeueing of healthy jobs during GC
884+
pauses, network blips, or temporary storage outages.
885+
886+
**Failure scenario — network partition.** This is the most important edge case to
887+
understand. A worker is processing a job and sending heartbeats normally. Then a
888+
network partition separates it from storage. What happens next:
889+
890+
1. The `_beat` coroutine tries to send a heartbeat, which fails with `StorageError`.
891+
2. Because `StorageError` is a subclass of `JQueueError` (`domain/errors.py:34`), the
892+
`except JQueueError` handler catches it and the heartbeat task exits silently.
893+
3. The worker continues processing the job, unaware that heartbeats have stopped.
894+
4. After `stale_timeout`, `BrokerQueue` (on a different machine) requeues the job.
895+
5. Another worker picks it up — now two workers are processing the same job.
896+
6. When the original worker finishes and calls `ack()`, the job may no longer exist
897+
(the second worker already acked it), raising `JobNotFoundError`.
898+
899+
The implication is clear: **job handlers must be idempotent**, and `ack()` should catch
900+
`JobNotFoundError` as a benign condition.
901+
902+
### Production deployment
903+
904+
**Architecture.** In a distributed setup, web servers enqueue jobs and a pool of
905+
workers dequeue and process them. All processes point their `BrokerQueue` at the same
906+
S3/GCS blob. Each process runs its own `GroupCommitLoop`, which batches operations
907+
*within* that process; the CAS protocol serialises writes *across* processes.
908+
909+
```
910+
Web servers ──enqueue──> ┌─────────────────────┐ <──dequeue── Workers
911+
│ S3 / GCS bucket │
912+
│ queue.json blob │
913+
└─────────────────────┘
914+
```
915+
916+
There is no leader election, no coordinator, and no discovery mechanism. Processes
917+
don't even need to know about each other. The CAS write is the only synchronisation
918+
point — if your write succeeds, you held the "lock." If it fails, you re-read and
919+
retry.
920+
921+
**Monitoring.** `read_state()` performs a single storage read without entering the write
922+
pipeline, so it's safe to call from a health-check or metrics endpoint:
923+
924+
```python
925+
async def collect_metrics(q: BrokerQueue) -> dict:
926+
state = await q.read_state()
927+
now = datetime.now(UTC)
928+
queued = state.queued_jobs()
929+
return {
930+
"queue_depth": len(queued),
931+
"in_progress": len(state.in_progress_jobs()),
932+
"version": state.version,
933+
"oldest_queued_age_s": (
934+
(now - min(j.created_at for j in queued)).total_seconds()
935+
if queued else 0
936+
),
937+
}
938+
```
939+
940+
The metrics worth watching: **queue depth** (is work piling up faster than workers
941+
drain it?), **in-progress count** (are workers keeping busy?), **oldest queued age**
942+
(is any job stuck waiting too long?), and **version** (is the queue making forward
943+
progress at all?).
944+
945+
**Capacity planning:**
946+
947+
| Peak throughput | Recommended queue | Notes |
948+
|---|---|---|
949+
| < 5 ops/s | `DirectQueue` | Simplest code path, no background task |
950+
| 5 - 50 ops/s | `BrokerQueue` | Group commit absorbs contention |
951+
| > 50 ops/s | Multiple `BrokerQueue`s | Partition by entrypoint |
952+
953+
To estimate the number of workers needed: `peak_ops_per_second * avg_job_duration_seconds`.
954+
For example, 10 jobs/s with 5 seconds of processing each requires 50 concurrent workers
955+
to keep up.
956+
957+
**Common pitfalls:**
958+
959+
1. **Missing `HeartbeatManager` on long jobs.** Any job running longer than
960+
`stale_timeout` (default 5 minutes) will be requeued while the original worker is
961+
still processing it. Always wrap long-running work in a `HeartbeatManager`.
962+
963+
2. **`stale_timeout` shorter than job duration.** If your jobs routinely take 10
964+
minutes but `stale_timeout` is 5 minutes, healthy jobs will be requeued even with
965+
heartbeats disabled. Either increase the timeout or — better — add heartbeats.
966+
967+
3. **Crashing on `JobNotFoundError` during `ack()`.** In any system with multiple
968+
workers, a job can be processed and acked by someone else before you finish.
969+
This is normal — treat `JobNotFoundError` on `ack` as a no-op, not a crash.
970+
971+
4. **Unbounded queue growth.** The entire state is serialised on every operation, so
972+
performance degrades as queue depth increases. Monitor queue depth and ensure
973+
workers keep pace. If the queue consistently exceeds ~1 000 jobs, partition by
974+
entrypoint or add workers.
975+
643976
## Limitations and trade-offs
644977

645978
| Concern | Detail |

0 commit comments

Comments
 (0)