Skip to content

Commit de5a6cd

Browse files
committed
make sure we don't close over any buffers which may be freed
1 parent 1d9a63b commit de5a6cd

File tree

1 file changed

+15
-19
lines changed

1 file changed

+15
-19
lines changed

src/durable_queue.clj

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@
181181
(invalidate slab (p/+ offset 1) 1)
182182
nil)))
183183

184-
(defn- task [slab offset len lock]
184+
(defn- task [slab offset len]
185185
(Task.
186186
slab
187187
offset
@@ -218,20 +218,20 @@
218218
"Takes a slab, and returns a sequence of the tasks it contains."
219219
([slab]
220220
(slab->task-seq slab 0))
221-
([slab pos]
221+
([slab ^long pos]
222222
(with-buffer [buf slab]
223223
(try
224-
(let [^ByteBuffer
225-
buf' (.position buf pos)]
224+
(let [^ByteBuffer buf' (.position buf pos)]
226225

227226
;; is there a next task, and is there space left in the buffer?
228227
(when (and
229-
(< header-size (.remaining buf'))
228+
(<= header-size (.remaining buf'))
230229
(== 1 (.get buf')))
231230

232231
(lazy-seq
233232
(with-buffer [buf slab]
234-
(let [status (.get buf')
233+
(let [^ByteBuffer buf' (.position buf (p/inc pos))
234+
status (.get buf')
235235
checksum (.getLong buf')
236236
size (.getInt buf')]
237237

@@ -243,8 +243,7 @@
243243
(task
244244
slab
245245
pos
246-
(+ header-size size)
247-
(read-write-lock slab))
246+
(+ header-size size))
248247

249248
(slab->task-seq
250249
slab
@@ -269,22 +268,20 @@
269268
lock)
270269

271270
(buffer [this]
272-
(let [buf
273-
(or @buf
274-
(swap! buf
275-
(fn [buf]
276-
(or buf (load-buffer filename)))))]
271+
(let [buf (or @buf
272+
(swap! buf
273+
(fn [buf]
274+
(or buf (load-buffer filename)))))]
277275
(.duplicate ^ByteBuffer buf)))
278276

279277
(mapped? [_]
280278
(boolean @buf))
281279

282280
(unmap [_]
283281
(with-exclusive-lock lock
284-
(when (.exists (io/file filename))
285-
(when-let [buf @buf]
286-
(unmap-buffer buf))
287-
(reset! buf nil))))
282+
(when-let [b @buf]
283+
(reset! buf nil)
284+
(unmap-buffer b))))
288285

289286
(invalidate [_ start' len]
290287
(let [end' (+ start' len)]
@@ -327,8 +324,7 @@
327324
(task
328325
this
329326
pos
330-
(+ header-size cnt)
331-
lock)))))
327+
(+ header-size cnt))))))
332328

333329
clojure.lang.Seqable
334330
(seq [this]

0 commit comments

Comments
 (0)