|
1 | 1 | (ns durable-queue
|
2 | 2 | (:require
|
3 |
| - [clojure.java.io :as io] |
4 |
| - [byte-streams :as bs] |
5 |
| - [clojure.string :as str] |
6 |
| - [primitive-math :as p] |
7 |
| - [taoensso.nippy :as nippy]) |
| 3 | + [clj-commons.byte-streams :as bs] |
| 4 | + [clj-commons.primitive-math :as p] |
| 5 | + [clojure.java.io :as io] |
| 6 | + [taoensso.nippy :as nippy]) |
8 | 7 | (:import
|
9 |
| - [java.lang.reflect |
10 |
| - Method |
11 |
| - Field] |
12 |
| - [java.util.concurrent |
13 |
| - LinkedBlockingQueue |
14 |
| - TimeoutException |
15 |
| - TimeUnit] |
16 |
| - [java.util.concurrent.atomic |
| 8 | + [java.io |
| 9 | + File |
| 10 | + IOException |
| 11 | + RandomAccessFile |
| 12 | + Writer] |
| 13 | + [java.lang.ref |
| 14 | + WeakReference] |
| 15 | + [java.lang.reflect |
| 16 | + Method] |
| 17 | + [java.nio ByteBuffer MappedByteBuffer] |
| 18 | + [java.nio.channels |
| 19 | + FileChannel$MapMode] |
| 20 | + [java.util.concurrent LinkedBlockingQueue TimeUnit TimeoutException] |
| 21 | + [java.util.concurrent.atomic |
17 | 22 | AtomicLong]
|
18 |
| - [java.util.zip |
19 |
| - CRC32] |
20 |
| - [java.util.concurrent.locks |
| 23 | + [java.util.concurrent.locks |
21 | 24 | ReentrantReadWriteLock]
|
22 |
| - [java.io |
23 |
| - Writer |
24 |
| - File |
25 |
| - RandomAccessFile |
26 |
| - IOException] |
27 |
| - [java.nio.channels |
28 |
| - FileChannel |
29 |
| - FileChannel$MapMode] |
30 |
| - [java.nio |
31 |
| - ByteBuffer |
32 |
| - MappedByteBuffer] |
33 |
| - [java.lang.ref |
34 |
| - WeakReference])) |
| 25 | + [java.util.zip |
| 26 | + CRC32])) |
35 | 27 |
|
36 | 28 | ;;;
|
37 | 29 |
|
|
78 | 70 | (^:private sync! [_])
|
79 | 71 | (^:private invalidate [_ offset len])
|
80 | 72 | (^:private ^ByteBuffer buffer [_])
|
81 |
| - (^:private append-to-slab! [_ descriptor]) |
| 73 | + (^:private append-to-slab! [_ task-descriptor]) |
82 | 74 | (^:private read-write-lock [_]))
|
83 | 75 |
|
84 | 76 | (defmacro ^:private with-buffer [[buf slab] & body]
|
|
139 | 131 | (.invoke clean
|
140 | 132 | (.invoke cleaner buf nil)
|
141 | 133 | nil))
|
142 |
| - (catch Throwable e |
| 134 | + (catch Throwable _ |
143 | 135 | ;; not much we can do here, sadly
|
144 | 136 | )))))
|
145 | 137 |
|
146 | 138 | (defn- force-buffer
|
147 |
| - [^MappedByteBuffer buf offset length] |
| 139 | + [^MappedByteBuffer buf _offset _length] |
148 | 140 | (.force buf))
|
149 | 141 |
|
150 | 142 | ;;;
|
|
232 | 224 | (lazy-seq
|
233 | 225 | (with-buffer [buf slab]
|
234 | 226 | (let [^ByteBuffer buf' (.position buf (p/inc pos))
|
235 |
| - status (.get buf') |
236 |
| - checksum (.getLong buf') |
| 227 | + _status (.get buf') |
| 228 | + _checksum (.getLong buf') |
237 | 229 | size (.getInt buf')]
|
238 | 230 |
|
239 | 231 | ;; this shouldn't be necessary, but let's not gratuitously
|
|
249 | 241 | (slab->task-seq
|
250 | 242 | slab
|
251 | 243 | (+ pos header-size size)))))))))
|
252 |
| - (catch Throwable e |
| 244 | + (catch Throwable _ |
253 | 245 | ;; this implies unrecoverable corruption
|
254 | 246 | nil
|
255 | 247 | )))))
|
|
268 | 260 | (read-write-lock [_]
|
269 | 261 | lock)
|
270 | 262 |
|
271 |
| - (buffer [this] |
| 263 | + (buffer [_] |
272 | 264 | (let [buf (or @buf
|
273 | 265 | (swap! buf
|
274 | 266 | (fn [buf]
|
|
299 | 291 | (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0])
|
300 | 292 | nil)))))
|
301 | 293 |
|
302 |
| - (append-to-slab! [this descriptor] |
| 294 | + (append-to-slab! [this task-descriptor] |
303 | 295 | (with-buffer [buf this]
|
304 |
| - (let [ary (nippy/freeze descriptor) |
| 296 | + (let [ary (nippy/freeze task-descriptor) |
305 | 297 | cnt (count ary)
|
306 | 298 | pos @position
|
307 | 299 | ^ByteBuffer buf (.position buf ^Long pos)]
|
|
515 | 507 | queue-name->current-slab (atom {})
|
516 | 508 |
|
517 | 509 | ;; initialize
|
518 |
| - slabs (->> @queue-name->slabs vals (apply concat)) |
519 |
| - slab->count (zipmap |
520 |
| - slabs |
521 |
| - (map #(atom (count (seq %))) slabs)) |
522 | 510 | create-new-slab (fn [q-name]
|
523 | 511 | (let [slab (create-slab directory q-name (queue q-name) slab-size)
|
524 | 512 | empty-slabs (->> (@queue-name->slabs q-name)
|
|
566 | 554 | (fsync q)
|
567 | 555 | (let [end (System/currentTimeMillis)]
|
568 | 556 | (Thread/sleep (long (max 0 (- fsync-interval (- end start)))))))
|
569 |
| - (catch Throwable e |
570 |
| - ))))))) |
| 557 | + (catch Throwable _))))))) |
571 | 558 |
|
572 | 559 | ;; populate queues with pre-existing tasks
|
573 | 560 | (let [empty-slabs (atom #{})]
|
|
620 | 607 |
|
621 | 608 | IQueues
|
622 | 609 |
|
623 |
| - (delete! [this] |
| 610 | + (delete! [_] |
624 | 611 | (doseq [s (->> @queue-name->slabs vals (apply concat))]
|
625 | 612 | (unmap s)
|
626 | 613 | (delete-slab s)))
|
|
654 | 641 | (immediate-stats (queue q-name) (get @queue-name->stats q-name))))
|
655 | 642 | ks))))
|
656 | 643 |
|
657 |
| - (take! [this q-name timeout timeout-val] |
| 644 | + (take! [_ q-name timeout timeout-val] |
658 | 645 | (let [q-name (munge (name q-name))
|
659 | 646 | ^LinkedBlockingQueue q (queue q-name)]
|
660 | 647 | (try
|
|
704 | 691 |
|
705 | 692 | (when-not task
|
706 | 693 | (throw
|
707 |
| - (IllegalArgumentException. |
708 |
| - (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) |
| 694 | + (IllegalArgumentException. |
| 695 | + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) |
709 | 696 |
|
710 | 697 | (when fsync-put?
|
711 | 698 | (sync! slab))
|
|
715 | 702 | (if (zero? timeout)
|
716 | 703 | (.offer q task)
|
717 | 704 | (.offer q task timeout TimeUnit/MILLISECONDS)))]
|
718 |
| - (if-let [val (locking q |
719 |
| - (queue! |
720 |
| - (vary-meta (slab!) assoc |
721 |
| - ::this this-ref |
722 |
| - ::queue-name q-name |
723 |
| - ::queue q |
724 |
| - ::fsync? fsync-take?)))] |
| 705 | + (if (locking q |
| 706 | + (queue! |
| 707 | + (vary-meta (slab!) assoc |
| 708 | + ::this this-ref |
| 709 | + ::queue-name q-name |
| 710 | + ::queue q |
| 711 | + ::fsync? fsync-take?))) |
725 | 712 | (do
|
726 | 713 | (populate-stats! q-name)
|
727 | 714 | (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])]
|
|
0 commit comments