Skip to content

Commit d1642b9

Browse files
committed
per bfs, preemptively answer the 'why not kafka?' question
1 parent acbc5fd commit d1642b9

File tree

2 files changed

+37
-36
lines changed

2 files changed

+37
-36
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
![](docs/EasterIsland.jpg)
22

3-
This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size is bounded by available disk rather than memory.
3+
This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size is bounded by available disk rather than memory. It is a small, purely-Clojure implementation focused entirely on the in-process use case, meaning that it is both simpler and more easily embedded than network-aware queue implementations such as Kafka and ActiveMQ.
44

55
### usage
66

@@ -77,7 +77,7 @@ A complete list of options is as follows:
7777
| name | description |
7878
|------|-------------|
7979
| `:complete?` | a predicate for identifying already completed tasks, defaults to always returning false |
80-
| `:max-queue-size` | the maximum number of elements that can be in the queue before `put!` blocks, defaults to `Integer/MAX_VALUE` |
80+
| `:max-queue-size` | the maximum number of elements that can be in the queue before `put!` blocks, defaults to `Integer/MAX_VALUE` |
8181
| `:slab-size` | The size, in bytes, of the backing files for the queue. Defaults to 16mb. |
8282
| `:fsync-put?` | Whether an fsync should be performed for each `put!`. Defaults to true. |
8383
| `:fsync-take?` | Whether an fsync should be performed for each `take!`. Defaults to false. |

src/durable_queue.clj

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -131,41 +131,42 @@
131131
([slab]
132132
(slab->task-seq slab 0))
133133
([slab pos]
134-
(lazy-seq
135-
(try
136-
(let [^ByteBuffer
137-
buf' (-> (buffer slab)
138-
.duplicate
139-
(.position pos))]
140-
141-
;; is there a next task, and is there space left in the buffer?
142-
(when (and
143-
(pos? (.remaining buf'))
144-
(== 1 (.get buf')))
145-
146-
(let [status (.get buf')
147-
checksum (.getLong buf')
148-
size (.getInt buf')]
149-
(cons
150-
151-
(vary-meta
152-
(task
153-
#(-> (buffer slab)
134+
(seq
135+
(lazy-seq
136+
(try
137+
(let [^ByteBuffer
138+
buf' (-> (buffer slab)
154139
.duplicate
155-
(.position pos)
156-
^ByteBuffer
157-
(.limit (+ pos header-size size))
158-
.slice)
159-
(read-write-lock slab))
160-
assoc ::slab slab)
161-
162-
(slab->task-seq
163-
slab
164-
(+ pos header-size size))))))
165-
(catch Throwable e
166-
;; this implies unrecoverable corruption
167-
nil
168-
)))))
140+
(.position pos))]
141+
142+
;; is there a next task, and is there space left in the buffer?
143+
(when (and
144+
(pos? (.remaining buf'))
145+
(== 1 (.get buf')))
146+
147+
(let [status (.get buf')
148+
checksum (.getLong buf')
149+
size (.getInt buf')]
150+
(cons
151+
152+
(vary-meta
153+
(task
154+
#(-> (buffer slab)
155+
.duplicate
156+
(.position pos)
157+
^ByteBuffer
158+
(.limit (+ pos header-size size))
159+
.slice)
160+
(read-write-lock slab))
161+
assoc ::slab slab)
162+
163+
(slab->task-seq
164+
slab
165+
(+ pos header-size size))))))
166+
(catch Throwable e
167+
;; this implies unrecoverable corruption
168+
nil
169+
))))))
169170

170171
(deftype TaskSlab
171172
[filename

0 commit comments

Comments
 (0)