Skip to content

Commit a66eee6

Browse files
committed
massive reindentation, and some more aggressive locking on reads
1 parent 37e31d3 commit a66eee6

File tree

1 file changed

+129
-126
lines changed

1 file changed

+129
-126
lines changed

src/durable_queue.clj

Lines changed: 129 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -212,39 +212,42 @@
212212
([slab]
213213
(slab->task-seq slab 0))
214214
([slab pos]
215-
(try
216-
(let [^ByteBuffer
217-
buf' (-> (buffer slab)
218-
(.position pos))]
219-
220-
;; is there a next task, and is there space left in the buffer?
221-
(when (and
222-
(< header-size (.remaining buf'))
223-
(== 1 (.get buf')))
224-
225-
(lazy-seq
226-
(let [status (.get buf')
227-
checksum (.getLong buf')
228-
size (.getInt buf')]
229-
230-
;; this shouldn't be necessary, but let's not gratuitously
231-
;; overreach our bounds
232-
(when (< size (.remaining buf'))
233-
(cons
234-
235-
(task
236-
slab
237-
pos
238-
(+ header-size size)
239-
(read-write-lock slab))
240-
241-
(slab->task-seq
242-
slab
243-
(+ pos header-size size))))))))
244-
(catch Throwable e
245-
;; this implies unrecoverable corruption
246-
nil
247-
))))
215+
(let [lock (read-write-lock slab)]
216+
(with-lock lock
217+
(try
218+
(let [^ByteBuffer
219+
buf' (-> (buffer slab)
220+
(.position pos))]
221+
222+
;; is there a next task, and is there space left in the buffer?
223+
(when (and
224+
(< header-size (.remaining buf'))
225+
(== 1 (.get buf')))
226+
227+
(lazy-seq
228+
(with-lock lock
229+
(let [status (.get buf')
230+
checksum (.getLong buf')
231+
size (.getInt buf')]
232+
233+
;; this shouldn't be necessary, but let's not gratuitously
234+
;; overreach our bounds
235+
(when (< size (.remaining buf'))
236+
(cons
237+
238+
(task
239+
slab
240+
pos
241+
(+ header-size size)
242+
lock)
243+
244+
(slab->task-seq
245+
slab
246+
(+ pos header-size size)))))))))
247+
(catch Throwable e
248+
;; this implies unrecoverable corruption
249+
nil
250+
))))))
248251

249252
(deftype TaskSlab
250253
[filename
@@ -293,7 +296,7 @@
293296
nil))))
294297

295298
(append-to-slab! [this descriptor]
296-
(locking this
299+
(with-exclusive-lock lock
297300
(let [ary (nippy/freeze descriptor)
298301
cnt (count ary)
299302
pos @position
@@ -638,98 +641,98 @@
638641
(let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :completed])]
639642
(.incrementAndGet retry-counter)))
640643

641-
(stats [_]
642-
(let [ks (keys @queue-name->stats)]
643-
(zipmap ks
644-
(map
645-
(fn [q-name]
646-
(merge
647-
{:num-slabs (-> @queue-name->slabs (get q-name) count)
648-
:num-active-slabs (->> (get @queue-name->slabs q-name)
649-
(filter mapped?)
650-
count)}
651-
(immediate-stats (queue q-name) (get @queue-name->stats q-name))))
652-
ks))))
653-
654-
(take! [this q-name timeout timeout-val]
655-
(let [q-name (munge (name q-name))
656-
^LinkedBlockingQueue q (queue q-name)]
657-
(try
658-
(if-let [t (if (zero? timeout)
659-
(.poll q)
660-
(.poll q timeout TimeUnit/MILLISECONDS))]
661-
662-
(let [slab (:slab t)]
663-
664-
;; if we've moved onto a new slab, unmap all but the current and
665-
;; last slabs
666-
(let [old-slab (@queue-name->current-slab q-name)]
667-
(when-not (= slab old-slab)
668-
(swap! queue-name->current-slab assoc q-name slab)
669-
(doseq [s (->> (get @queue-name->slabs q-name)
670-
butlast
671-
(remove #(= slab %)))]
672-
(unmap s))))
673-
674-
(status! t :in-progress)
675-
;; we don't need to fsync here, because in-progress and incomplete
676-
;; are effectively equivalent on restart
677-
678-
t)
679-
timeout-val)
680-
(catch TimeoutException _
681-
timeout-val))))
682-
683-
(take! [this q-name]
684-
(take! this q-name Long/MAX_VALUE nil))
685-
686-
(put! [_ q-name task-descriptor timeout]
687-
(let [q-name (munge (name q-name))
688-
^LinkedBlockingQueue q (queue q-name)
689-
slab! (fn []
690-
(let [slabs (@queue-name->slabs q-name)
691-
slab (last slabs)
692-
task (when slab
693-
(append-to-slab! slab task-descriptor))
694-
695-
;; if no task was created, we need to create a new slab file
696-
;; and try again
697-
slab (if task
698-
slab
699-
(create-new-slab q-name))
700-
task (or task (append-to-slab! slab task-descriptor))]
701-
702-
(when-not task
703-
(throw
704-
(IllegalArgumentException.
705-
(str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size))))
706-
707-
(when fsync-put?
708-
(sync! slab))
709-
task))
710-
711-
queue! (fn [task]
712-
(if (zero? timeout)
713-
(.offer q task)
714-
(.offer q task timeout TimeUnit/MILLISECONDS)))]
715-
(if-let [val (locking q
716-
(queue!
717-
(vary-meta (slab!) assoc
718-
::this this-ref
719-
::queue-name q-name
720-
::queue q
721-
::fsync? fsync-take?)))]
722-
(do
723-
(populate-stats! q-name)
724-
(let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])]
725-
(.incrementAndGet counter))
726-
true)
727-
false)
728-
729-
nil))
730-
731-
(put! [this q-name task-descriptor]
732-
(put! this q-name task-descriptor Long/MAX_VALUE))))
644+
(stats [_]
645+
(let [ks (keys @queue-name->stats)]
646+
(zipmap ks
647+
(map
648+
(fn [q-name]
649+
(merge
650+
{:num-slabs (-> @queue-name->slabs (get q-name) count)
651+
:num-active-slabs (->> (get @queue-name->slabs q-name)
652+
(filter mapped?)
653+
count)}
654+
(immediate-stats (queue q-name) (get @queue-name->stats q-name))))
655+
ks))))
656+
657+
(take! [this q-name timeout timeout-val]
658+
(let [q-name (munge (name q-name))
659+
^LinkedBlockingQueue q (queue q-name)]
660+
(try
661+
(if-let [t (if (zero? timeout)
662+
(.poll q)
663+
(.poll q timeout TimeUnit/MILLISECONDS))]
664+
665+
(let [slab (:slab t)]
666+
667+
;; if we've moved onto a new slab, unmap all but the current and
668+
;; last slabs
669+
(let [old-slab (@queue-name->current-slab q-name)]
670+
(when-not (= slab old-slab)
671+
(swap! queue-name->current-slab assoc q-name slab)
672+
(doseq [s (->> (get @queue-name->slabs q-name)
673+
butlast
674+
(remove #(= slab %)))]
675+
(unmap s))))
676+
677+
(status! t :in-progress)
678+
;; we don't need to fsync here, because in-progress and incomplete
679+
;; are effectively equivalent on restart
680+
681+
t)
682+
timeout-val)
683+
(catch TimeoutException _
684+
timeout-val))))
685+
686+
(take! [this q-name]
687+
(take! this q-name Long/MAX_VALUE nil))
688+
689+
(put! [_ q-name task-descriptor timeout]
690+
(let [q-name (munge (name q-name))
691+
^LinkedBlockingQueue q (queue q-name)
692+
slab! (fn []
693+
(let [slabs (@queue-name->slabs q-name)
694+
slab (last slabs)
695+
task (when slab
696+
(append-to-slab! slab task-descriptor))
697+
698+
;; if no task was created, we need to create a new slab file
699+
;; and try again
700+
slab (if task
701+
slab
702+
(create-new-slab q-name))
703+
task (or task (append-to-slab! slab task-descriptor))]
704+
705+
(when-not task
706+
(throw
707+
(IllegalArgumentException.
708+
(str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size))))
709+
710+
(when fsync-put?
711+
(sync! slab))
712+
task))
713+
714+
queue! (fn [task]
715+
(if (zero? timeout)
716+
(.offer q task)
717+
(.offer q task timeout TimeUnit/MILLISECONDS)))]
718+
(if-let [val (locking q
719+
(queue!
720+
(vary-meta (slab!) assoc
721+
::this this-ref
722+
::queue-name q-name
723+
::queue q
724+
::fsync? fsync-take?)))]
725+
(do
726+
(populate-stats! q-name)
727+
(let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])]
728+
(.incrementAndGet counter))
729+
true)
730+
false)
731+
732+
nil))
733+
734+
(put! [this q-name task-descriptor]
735+
(put! this q-name task-descriptor Long/MAX_VALUE))))
733736

734737
@this-ref)))
735738

0 commit comments

Comments
 (0)