Skip to content

Commit 1d9a63b

Browse files
committed
more mutexes
1 parent a66eee6 commit 1d9a63b

File tree

1 file changed

+66
-63
lines changed

1 file changed

+66
-63
lines changed

src/durable_queue.clj

Lines changed: 66 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@
8181
(^:private append-to-slab! [_ descriptor])
8282
(^:private read-write-lock [_]))
8383

84+
(defmacro ^:private with-buffer [[buf slab] & body]
85+
`(with-lock (read-write-lock ~slab)
86+
(when-let [~buf (buffer ~slab)]
87+
~@body)))
88+
8489
;;;
8590

8691
(defn create-buffer [filename size]
@@ -157,22 +162,24 @@
157162
(deserializer))
158163
ITask
159164
(status [_]
160-
(or @status
161-
(let [s (case (.get ^ByteBuffer (buffer slab) (p/+ offset 1))
162-
0 :incomplete
163-
1 :in-progress
164-
2 :complete)]
165-
(reset! status s)
166-
s)))
165+
(with-buffer [buf slab]
166+
(or @status
167+
(let [s (case (.get buf (p/+ offset 1))
168+
0 :incomplete
169+
1 :in-progress
170+
2 :complete)]
171+
(reset! status s)
172+
s))))
167173
(status! [_ s]
168-
(reset! status s)
169-
(.put ^ByteBuffer (buffer slab) (p/+ offset 1)
170-
(case s
171-
:incomplete 0
172-
:in-progress 1
173-
:complete 2))
174-
(invalidate slab (p/+ offset 1) 1)
175-
nil))
174+
(with-buffer [buf slab]
175+
(reset! status s)
176+
(.put buf (p/+ offset 1)
177+
(case s
178+
:incomplete 0
179+
:in-progress 1
180+
:complete 2))
181+
(invalidate slab (p/+ offset 1) 1)
182+
nil)))
176183

177184
(defn- task [slab offset len lock]
178185
(Task.
@@ -181,8 +188,8 @@
181188
len
182189
(atom nil)
183190
(fn []
184-
(with-lock lock
185-
(let [^ByteBuffer buf (-> (buffer slab)
191+
(with-buffer [buf slab]
192+
(let [^ByteBuffer buf (-> buf
186193
(.position offset)
187194
^ByteBuffer
188195
(.limit (+ offset len))
@@ -212,42 +219,40 @@
212219
([slab]
213220
(slab->task-seq slab 0))
214221
([slab pos]
215-
(let [lock (read-write-lock slab)]
216-
(with-lock lock
217-
(try
218-
(let [^ByteBuffer
219-
buf' (-> (buffer slab)
220-
(.position pos))]
221-
222-
;; is there a next task, and is there space left in the buffer?
223-
(when (and
224-
(< header-size (.remaining buf'))
225-
(== 1 (.get buf')))
226-
227-
(lazy-seq
228-
(with-lock lock
229-
(let [status (.get buf')
230-
checksum (.getLong buf')
231-
size (.getInt buf')]
232-
233-
;; this shouldn't be necessary, but let's not gratuitously
234-
;; overreach our bounds
235-
(when (< size (.remaining buf'))
236-
(cons
237-
238-
(task
239-
slab
240-
pos
241-
(+ header-size size)
242-
lock)
243-
244-
(slab->task-seq
245-
slab
246-
(+ pos header-size size)))))))))
247-
(catch Throwable e
248-
;; this implies unrecoverable corruption
249-
nil
250-
))))))
222+
(with-buffer [buf slab]
223+
(try
224+
(let [^ByteBuffer
225+
buf' (.position buf pos)]
226+
227+
;; is there a next task, and is there space left in the buffer?
228+
(when (and
229+
(< header-size (.remaining buf'))
230+
(== 1 (.get buf')))
231+
232+
(lazy-seq
233+
(with-buffer [buf slab]
234+
(let [status (.get buf')
235+
checksum (.getLong buf')
236+
size (.getInt buf')]
237+
238+
;; this shouldn't be necessary, but let's not gratuitously
239+
;; overreach our bounds
240+
(when (< size (.remaining buf'))
241+
(cons
242+
243+
(task
244+
slab
245+
pos
246+
(+ header-size size)
247+
(read-write-lock slab))
248+
249+
(slab->task-seq
250+
slab
251+
(+ pos header-size size)))))))))
252+
(catch Throwable e
253+
;; this implies unrecoverable corruption
254+
nil
255+
)))))
251256

252257
(deftype TaskSlab
253258
[filename
@@ -287,23 +292,21 @@
287292
(fn [[start end]]
288293
[(min start start') (max end end')]))))
289294

290-
(sync! [_]
295+
(sync! [this]
291296
(let [[start end] @dirty]
292297
(when (< start end)
293-
(when-let [^MappedByteBuffer buf @buf]
294-
(force-buffer buf start (- end start))
295-
(compare-and-set! dirty [start end] [Integer/MAX_VALUE 0])
296-
nil))))
298+
(with-buffer [_ this]
299+
(let [buf @buf]
300+
(force-buffer buf start (- end start))
301+
(compare-and-set! dirty [start end] [Integer/MAX_VALUE 0])
302+
nil)))))
297303

298304
(append-to-slab! [this descriptor]
299-
(with-exclusive-lock lock
305+
(with-buffer [buf this]
300306
(let [ary (nippy/freeze descriptor)
301307
cnt (count ary)
302308
pos @position
303-
304-
^ByteBuffer
305-
buf (-> (buffer this)
306-
(.position pos))]
309+
^ByteBuffer buf (.position buf pos)]
307310

308311
(when (> (.remaining buf) (+ (count ary) header-size))
309312
;; write to the buffer

0 commit comments

Comments
 (0)