diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 0000000..9307cdb --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1 @@ +{:lint-as {clj-commons.durable-queue/with-buffer clojure.core/let}} diff --git a/.gitignore b/.gitignore index 7d70dce..ccb6f89 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,8 @@ pom.xml.asc /.nrepl-port .DS_Store /doc -push \ No newline at end of file +push +*.iml +.idea/ +.cpcache/ +.clj-kondo/.cache diff --git a/README.md b/README.md index 90a2e97..86da406 100644 --- a/README.md +++ b/README.md @@ -8,27 +8,33 @@ This library implements a disk-backed task queue, allowing for queues that can s ### usage +Leiningen: + ```clj -[factual/durable-queue "0.1.5"] +[factual/durable-queue "0.1.6"] ``` +deps.edn +```clj +io.github.vedang/durable-queue {:git/sha "8dfc74193b5a78608cfe4fba821dab8b8b5644be"} +``` To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map: ```clj -> (require '[durable-queue :refer :all]) +> (require '[clj-commons.durable-queue :as dq]) nil -> (def q (queues "/tmp" {})) +> (def q (dq/queues "/tmp" {})) #'q ``` This allows us to `put!` and `take!` tasks from named queues. `take!` is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses: ```clj -> (take! q :foo 10 :timed-out!) +> (dq/take! q :foo 10 :timed-out!) :timed-out! -> (put! q :foo "a task") +> (dq/put! q :foo "a task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "a task" > > (deref *1) "a task" @@ -39,20 +45,20 @@ Notice that the task has a value describing its progress, and a value describing Calling `take!` removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as `complete!`. ```clj -> (put! q :foo "another task") +> (dq/put! q :foo "another task") true -> (take! q :foo) +> (dq/take! q :foo) < :in-progress | "another task" > -> (complete! *1) +> (dq/complete! *1) true ``` -If our task fails and we want to re-enqueue it to be tried again, we can instead call `(retry! task)`. Tasks which are marked for retry are added to the end of the current queue. +If our task fails and we want to re-enqueue it to be tried again, we can instead call `(dq/retry! task)`. Tasks which are marked for retry are added to the end of the current queue. To get a description of the current state of the queue, we can use `stats`, which returns a map of queue names onto various counts: ```clj -> (stats q) +> (dq/stats q) {:enqueued 2, :retried 0, :completed 1, diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..0934271 --- /dev/null +++ b/deps.edn @@ -0,0 +1,25 @@ +{:paths ["src" "resources"] + + :deps + {com.taoensso/nippy {:mvn/version "3.2.0"} + org.clj-commons/byte-streams {:mvn/version "0.3.2"} + org.clj-commons/primitive-math {:mvn/version "1.0.1-rc1"}} + + :aliases + {:test + {:extra-paths ["test"] + :extra-deps {criterium/criterium {:mvn/version "0.4.6"}}} + ;;; Run tests with cognitect-labs/test-runner + ;; clojure -X:test:congnitect :excludes '[:stress :benchmark]' + ;; NOTE: :stress and :benchmark generate a lot of slab files and + ;; need a huge amount of disk space. + :cognitect + {:extra-deps {io.github.cognitect-labs/test-runner + {:git/tag "v0.5.1" :git/sha "dfb30dd"}} + :main-opts ["-m" "cognitect.test-runner"] + :exec-fn cognitect.test-runner.api/test}} + + :description "a in-process task-queue that is backed by disk." + :url "https://github.com/clj-commons/durable-queue" + :license {:name "Eclipse Public License" + :url "http://www.eclipse.org/legal/epl-v10.html"}} diff --git a/project.clj b/project.clj deleted file mode 100644 index acf2042..0000000 --- a/project.clj +++ /dev/null @@ -1,23 +0,0 @@ -(defproject org.clj-commons/durable-queue - (or (System/getenv "PROJECT_VERSION") "0.1.6") - :description "a in-process task-queue that is backed by disk." - :url "https://github.com/clj-commons/durable-queue" - :license {:name "Eclipse Public License" - :url "http://www.eclipse.org/legal/epl-v10.html"} - :deploy-repositories [["clojars" {:url "https://repo.clojars.org" - :username :env/clojars_username - :password :env/clojars_password - :sign-releases true}]] - :dependencies [[com.taoensso/nippy "2.8.0"] - [primitive-math "0.1.4"] - [byte-streams "0.2.2"]] - :profiles {:dev {:dependencies [[org.clojure/clojure "1.10.0"] - [criterium "0.4.6"] -]}} - :global-vars {*warn-on-reflection* true} - :test-selectors {:default #(not (some #{:benchmark :stress} (keys %))) - :benchmark :benchmark - :stress :stress} - :codox {:writer codox-md.writer/write-docs - :include [durable-queue]} - :jvm-opts ^:replace ["-server" "-Xmx100m"]) diff --git a/src/clj_commons/durable_queue.clj b/src/clj_commons/durable_queue.clj new file mode 100644 index 0000000..20bc9b0 --- /dev/null +++ b/src/clj_commons/durable_queue.clj @@ -0,0 +1,796 @@ +(ns clj-commons.durable-queue + (:require + [clj-commons.byte-streams :as bs] + [clj-commons.primitive-math :as p] + [clojure.java.io :as io] + [taoensso.nippy :as nippy]) + (:import + (java.io File IOException RandomAccessFile Writer) + (java.lang.ref WeakReference) + (java.lang.reflect Method) + (java.nio ByteBuffer MappedByteBuffer) + (java.nio.channels FileChannel FileChannel$MapMode) + (java.util.concurrent LinkedBlockingQueue TimeoutException TimeUnit) + (java.util.concurrent.atomic AtomicLong) + (java.util.concurrent.locks ReentrantReadWriteLock) + (java.util.zip CRC32))) + +;;; + +(defmacro ^:private with-lock [lock & body] + `(let [^ReentrantReadWriteLock lock# ~lock + read-lock# (.readLock lock#)] + (do + (.lock read-lock#) + (try + ~@body + (finally + (.unlock read-lock#)))))) + +(defmacro ^:private with-exclusive-lock [lock & body] + `(let [^ReentrantReadWriteLock lock# ~lock + write-lock# (.writeLock lock#)] + (do + (.lock write-lock#) + (try + ~@body + (finally + (.unlock write-lock#)))))) + +;;; + +(defn- checksum ^long [^long length ^bytes ary] + (let [crc (CRC32.)] + (dotimes [i 4] + (.update crc (p/>> length i))) + (.update crc ary) + (.getValue crc))) + +;;; + +(def ^:private ^:const header-size 14) + +(defprotocol ITask + (^:private status [_] "Returns the task status") + (^:private status! [_ status] "Sets the task status")) + +(defprotocol ITaskSlab + (^:private unmap [_] "Temporarily releases mapped byte buffer until it's needed again.") + (^:private mapped? [_] "Returns true if the slab is actively mapped into memory.") + (^:private sync! [_]) + (^:private invalidate [_ offset len]) + (^:private ^ByteBuffer buffer [_]) + (^:private append-to-slab! [_ task-descriptor]) + (^:private read-write-lock [_])) + +(defmacro ^:private with-buffer [[buf slab] & body] + `(with-lock (read-write-lock ~slab) + (when-let [~buf (buffer ~slab)] + ~@body))) + +;;; + +(defn create-buffer [filename size] + (let [raf (doto (RandomAccessFile. (io/file filename) "rw") + (.setLength size))] + (try + (let [fc (.getChannel raf)] + (try + (let [buf (.map fc FileChannel$MapMode/READ_WRITE 0 size)] + (doto buf + (.put 0 (byte 0)) + .force)) + (finally + (.close fc)))) + (finally + (.close raf))))) + +(defn load-buffer + ([filename] + (load-buffer filename nil nil)) + ([filename offset length] + (let [_ (assert (.exists (io/file filename))) + raf (RandomAccessFile. (io/file filename) "rw")] + (try + (let [fc (.getChannel raf)] + (try + (.map fc + FileChannel$MapMode/READ_WRITE + (or offset 0) + (or length (.length raf))) + (finally + (.close fc)))) + (finally + (.close raf)))))) + +(let [clean (delay + (doto (.getMethod + (Class/forName "sun.misc.Cleaner") + "clean" + nil) + (.setAccessible true)))] + (defn- unmap-buffer + "A delightful endrun on the JVM's mmap GC mechanism" + [^ByteBuffer buf] + (when (.isDirect buf) + (try + + (let [^Method clean @clean + cleaner (doto (.getMethod (class buf) "cleaner" nil) + (.setAccessible true))] + (.invoke clean + (.invoke cleaner buf nil) + nil)) + ;; not much we can do here, sadly + (catch Throwable _e))))) + +(defn- force-buffer + ;; We probably wanted something to do with offset and length, but we + ;; have forgotten. @TODO: fix this code when you understand it. + [^MappedByteBuffer buf _offset _length] + (.force buf)) + +;;; + +;; a single task within a slab, assumes that the buffer is sliced around +;; the task's boundaries +(defrecord Task + [slab + ^long offset + ^long length + status + deserializer] + clojure.lang.IDeref + (deref [_] + (deserializer)) + ITask + (status [_] + (with-buffer [buf slab] + (or @status + (let [s (case (.get buf (p/+ offset 1)) + 0 :incomplete + 1 :in-progress + 2 :complete)] + (reset! status s) + s)))) + (status! [_ s] + (with-buffer [buf slab] + (reset! status s) + (.put buf (p/+ offset 1) + (byte + (case s + :incomplete 0 + :in-progress 1 + :complete 2))) + (invalidate slab (p/+ offset 1) 1) + nil))) + +(defn- task [slab offset len] + (Task. + slab + offset + len + (atom nil) + (fn [] + (with-buffer [buf slab] + (let [^ByteBuffer buf (-> buf + (.position ^Long offset) + ^ByteBuffer + (.limit ^Long (+ offset len)) + .slice) + checksum' (.getLong buf 2) + ary (bs/to-byte-array (.position buf header-size))] + (when-not (== (checksum (.getInt buf 10) ary) checksum') + (throw (IOException. "checksum mismatch"))) + (nippy/thaw ary)))))) + +(defmethod print-method Task [t ^Writer w] + (.write w + (str "< " (status t) " | " (pr-str @t) " >"))) + +;;; + +;; the byte layout is +;; [ exists? : int8 +;; state : int8 +;; checksum : int64 +;; size : int32 +;; payload : array ] +;; valid values for 'exists' is 0 (no), 1 (yes) +;; valid values for 'state' is 0 (unclaimed), 1 (in progress), 2 (complete) +(defn- slab->task-seq + "Takes a slab, and returns a sequence of the tasks it contains." + ([slab] + (slab->task-seq slab 0)) + ([slab ^long pos] + (with-buffer [buf slab] + (try + (let [^ByteBuffer buf' (.position buf pos)] + + ;; is there a next task, and is there space left in the buffer? + (when (and + (<= header-size (.remaining buf')) + (== 1 (.get buf'))) + + (lazy-seq + (with-buffer [buf slab] + (let [^ByteBuffer buf' (.position buf (p/inc pos)) + _status (.get buf') + _checksum (.getLong buf') + size (.getInt buf')] + + ;; this shouldn't be necessary, but let's not gratuitously + ;; overreach our bounds + (when (< size (.remaining buf')) + (cons + + (task + slab + pos + (+ header-size size)) + + (slab->task-seq + slab + (+ pos header-size size))))))))) + ;; this implies unrecoverable corruption. @TODO: throw an + ;; exception which implies unrecoverable corruption + (catch Throwable _e))))) + +(deftype TaskSlab + [filename + q-name + queue + buf ;; a clearable atom holding the buffer + position ;; an atom storing the write position of the slab + lock + dirty ;; an atom containing an interval of dirty bytes + ] + ITaskSlab + + (read-write-lock [_] + lock) + + (buffer [_this] + (let [buf (or @buf + (swap! buf + (fn [buf] + (or buf (load-buffer filename)))))] + (.duplicate ^ByteBuffer buf))) + + (mapped? [_] + (boolean @buf)) + + (unmap [_] + (with-exclusive-lock lock + (when-let [b @buf] + (reset! buf nil) + (unmap-buffer b)))) + + (invalidate [_ start' len] + (let [end' (+ start' len)] + (swap! dirty + (fn [[start end]] + [(min start start') (max end end')])))) + + (sync! [this] + (let [[start end] @dirty] + (when (< start end) + (with-buffer [_buf this] + (let [buf @buf] + (force-buffer buf start (- end start)) + (compare-and-set! dirty [start end] [Integer/MAX_VALUE 0]) + nil))))) + + (append-to-slab! [this task-descriptor] + (with-buffer [buf this] + (let [ary (nippy/freeze task-descriptor) + cnt (count ary) + pos @position + ^ByteBuffer buf (.position buf ^Long pos)] + + (when (> (.remaining buf) (+ (count ary) header-size)) + ;; write to the buffer + (doto buf + (.position ^Long pos) + (.put (byte 1)) ;; exists + (.put (byte 0)) ;; incomplete + (.putLong (checksum cnt ary)) + (.putInt cnt) + (.put ^bytes ary) + (.put (byte 0))) ;; next doesn't exist + + (swap! position + header-size cnt) + + (invalidate this pos (+ header-size cnt)) + + ;; return a task to enqueue in-memory + (task + this + pos + (+ header-size cnt)))))) + + clojure.lang.Seqable + (seq [this] + (slab->task-seq this)) + + Comparable + (compareTo [_ x] + (assert (instance? TaskSlab x)) + (compare filename (.filename ^TaskSlab x)))) + +(def ^:private fs-monitor (Object.)) + +(defn- delete-slab + [^TaskSlab slab] + (locking fs-monitor + (unmap slab) + (.delete (io/file (.filename slab))))) + +(defn- create-slab + "Creates a new slab file, ensuring a new file name that is lexicographically greater than + any existing files for that queue name." + ([directory q-name queue size] + (locking fs-monitor + (let [pattern (re-pattern (str "^" q-name "_(\\d{6}$)")) + last-number (->> directory + io/file + .listFiles + (map #(.getName ^File %)) + (map #(second (re-find pattern %))) + (remove nil?) + (map #(Long/parseLong %)) + sort + last) + n (if last-number (inc last-number) 0) + f (io/file (str directory "/" q-name "_" (format "%06d" n)))] + + (when-not (.createNewFile f) + (throw (IOException. (str "Could not create new slab file at " (.getAbsolutePath f))))) + + (TaskSlab. + (.getAbsolutePath f) + q-name + queue + (atom (create-buffer f size)) + (atom 0) + (ReentrantReadWriteLock.) + (atom [Integer/MAX_VALUE 0])))))) + +(defn- file->slab + "Transforms a file into a slab representing that file's contents." + [filename q-name queue] + (let [pos (atom 0) + slab (TaskSlab. + filename + q-name + queue + (atom nil) + pos + (ReentrantReadWriteLock.) + (atom [Integer/MAX_VALUE 0])) + len (->> slab + (map :length) + (reduce +))] + (reset! pos len) + (unmap slab) + slab)) + +(defn- directory->queue-name->slab-files + "Returns a map of queue names onto slab files for that queue." + [directory] + (let [queue->file (->> directory + io/file + .listFiles + (filter #(re-find #"^\w+_\d{6}$" (.getName ^File %))) + (group-by #(second (re-find #"^(\w+)_\d{6}$" (.getName ^File %)))))] + (zipmap + (keys queue->file) + (map + (fn [files] + (->> files + (map #(.getAbsolutePath ^File %)) + sort)) + (vals queue->file))))) + +;;; + +(defn- initial-stats [^long count] + {:enqueued (AtomicLong. count) + :retried (AtomicLong. 0) + :completed (AtomicLong. 0)}) + +(defn- immediate-stats [^LinkedBlockingQueue q {:keys [enqueued retried completed]}] + (let [cnt (.size q) + completed (.get ^AtomicLong completed) + enqueued (.get ^AtomicLong enqueued)] + {:enqueued enqueued + :retried (.get ^AtomicLong retried) + :completed completed + :in-progress (- (- enqueued completed) cnt)})) + +;;; + +(defprotocol IQueues + (^:private mark-complete! [_ q-name]) + (^:private mark-retry! [_ q-name]) + (delete! [_] + "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") + (stats [_] + "Returns a map of queue names onto information about the immediate state of the queue.") + (fsync [_] + "Forces an fsync on all modified files.") + (take! + [_ q-name] + [_ q-name timeout timeout-val] + "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if + no task is available within `timeout` milliseconds.") + (put! + [_ q-name task-descriptor] + [_ q-name task-descriptor timeout] + "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to + enqueue within `timeout` milliseconds, `true` otherwise.")) + +(defn queues + "Creates a point of interaction for queues, backed by disk storage in `directory`. + + The following options can be specified: + + max-queue-size - the maximum number of elements that can be in the queue before `put!` + blocks. Defaults to `Integer/MAX_VALUE`. + + complete? - a predicate that is run on pre-existing tasks to check if they were already + completed. If the tasks in the queue are non-idempotent, this must be + specified for correct behavior. Defaults to always returning false. + + slab-size - The size, in bytes, of the backing files for the queue. Defaults to 16mb. + + fsync-put? - if true, each `put!` will force an fsync. Defaults to true. + + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed. " + ([directory] + (queues directory nil)) + ([directory + {:keys [max-queue-size + complete? + slab-size + fsync-put? + fsync-take? + fsync-threshold + fsync-interval] + :or {max-queue-size Integer/MAX_VALUE + complete? nil + slab-size (* 64 1024 1024) + fsync-put? true + fsync-take? false}}] + + (assert + (not + (and + (or fsync-threshold fsync-interval) + (or fsync-take? fsync-put?))) + "Both batch and per-task fsync options are enabled, which is probably not what you intended.") + + (.mkdirs (io/file directory)) + + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + queue-name->files (directory->queue-name->slab-files directory) + + ;; core state stores + queue-name->slabs (atom + (zipmap + (keys queue-name->files) + (->> queue-name->files + (map + (fn [[queue-name files]] + (map #(file->slab % queue-name (queue queue-name)) files))) + vec))) + + queue-name->stats (atom + (zipmap + (keys queue-name->files) + (map + #(initial-stats (count (queue %))) + (keys queue-name->files)))) + + queue-name->current-slab (atom {}) + + ;; initialize + slabs (->> @queue-name->slabs vals (apply concat)) + _slab->count (zipmap + slabs + (map #(atom (count (seq %))) slabs)) + create-new-slab (fn [q-name] + (let [slab (create-slab directory q-name (queue q-name) slab-size) + empty-slabs (->> (@queue-name->slabs q-name) + (filter (fn [slab] + (->> slab + (remove #(= :complete (status %))) + empty?))) + set)] + + ;; delete empty slabs + (doseq [s empty-slabs] + (delete-slab s)) + + ;; update list of active slabs + (swap! queue-name->slabs update-in [q-name] + #(conj (vec (remove empty-slabs %)) slab)) + + ;; unmap all slabs but the first (which is being consumed) + ;; and the last (which is being written to) + (doseq [s (-> (@queue-name->slabs q-name) rest butlast)] + (unmap s)) + slab)) + + populate-stats! #(when-not (contains? @queue-name->stats %) + (swap! queue-name->stats assoc % (initial-stats 0))) + + this-ref (promise) + + action-counter (AtomicLong. 0) + + mark-action! (if fsync-threshold + (fn [] + (when (zero? (rem (.incrementAndGet action-counter) fsync-threshold)) + (fsync @this-ref))) + (fn []))] + + ;; + (when fsync-interval + (future + (let [ref (WeakReference. @this-ref)] + (while (.get ref) + (when-let [q (.get ref)] + (try + (let [start (System/nanoTime)] + (fsync q) + (let [end (System/nanoTime)] + (Thread/sleep (max 0 (- (* 1000000 fsync-interval) (- end start)))))) + (catch Throwable _e))))))) + + ;; populate queues with pre-existing tasks + (let [empty-slabs (atom #{})] + (doseq [[q slabs] @queue-name->slabs] + (let [^LinkedBlockingQueue q' (queue q)] + (doseq [slab slabs] + (let [tasks (->> slab + (map #(vary-meta % assoc + ::this this-ref + ::queue q' + ::queue-name q + ::fsync? fsync-take?)) + (remove #(or (= :complete (status %)) + (and complete? (complete? @%)))))] + + (if (empty? tasks) + + ;; if there aren't any active tasks, just delete the slab + (do + (delete-slab slab) + (swap! empty-slabs conj slab)) + + (do + (doseq [task tasks] + (status! task :incomplete) + (when-not (.offer q' task) + (throw + (IllegalArgumentException. + "'max-queue-size' insufficient to hold existing tasks.")))) + (unmap slab))))) + + (let [^AtomicLong counter (get-in @queue-name->stats [q :enqueued])] + (.addAndGet counter (count (queue q)))))) + + (swap! queue-name->slabs + (fn [m] + (->> m + (map + (fn [[q slabs]] + [q (remove @empty-slabs slabs)])) + (into {}))))) + + (deliver this-ref + (reify + + java.io.Closeable + (close [_] + (doseq [s (->> @queue-name->slabs vals (apply concat))] + (unmap s))) + + IQueues + + (delete! [_this] + (doseq [s (->> @queue-name->slabs vals (apply concat))] + (unmap s) + (delete-slab s))) + + (delete-q! [_this q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) + + (fsync [_] + (doseq [slab (->> @queue-name->slabs vals (apply concat))] + (sync! slab))) + + (mark-retry! [_ q-name] + (mark-action!) + (populate-stats! q-name) + (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :retried])] + (.incrementAndGet retry-counter))) + + (mark-complete! [_ q-name] + (mark-action!) + (populate-stats! q-name) + (let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :completed])] + (.incrementAndGet retry-counter))) + + (stats [_] + (let [ks (keys @queue-name->stats)] + (zipmap ks + (map + (fn [q-name] + (merge + {:num-slabs (-> @queue-name->slabs (get q-name) count) + :num-active-slabs (->> (get @queue-name->slabs q-name) + (filter mapped?) + count)} + (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) + ks)))) + + (take! [_this q-name timeout timeout-val] + (let [q-name (munge (name q-name)) + ^LinkedBlockingQueue q (queue q-name)] + (try + (if-let [t (if (zero? timeout) + (.poll q) + (.poll q timeout TimeUnit/MILLISECONDS))] + + (let [slab (:slab t)] + + ;; if we've moved onto a new slab, unmap all but the current and + ;; last slabs + (let [old-slab (@queue-name->current-slab q-name)] + (when-not (= slab old-slab) + (swap! queue-name->current-slab assoc q-name slab) + (doseq [s (->> (get @queue-name->slabs q-name) + butlast + (remove #(= slab %)))] + (unmap s)))) + + (status! t :in-progress) + ;; we don't need to fsync here, because in-progress and incomplete + ;; are effectively equivalent on restart + + t) + timeout-val) + (catch TimeoutException _ + timeout-val)))) + + (take! [this q-name] + (take! this q-name Long/MAX_VALUE nil)) + + (put! [_ q-name task-descriptor timeout] + (let [q-name (munge (name q-name)) + ^LinkedBlockingQueue q (queue q-name) + slab! (fn [] + (let [slabs (@queue-name->slabs q-name) + slab (last slabs) + task (when slab + (append-to-slab! slab task-descriptor)) + + ;; if no task was created, we need to create a new slab file + ;; and try again + slab (if task + slab + (create-new-slab q-name)) + task (or task (append-to-slab! slab task-descriptor))] + + (when-not task + (throw + (IllegalArgumentException. + (str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size)))) + + (when fsync-put? + (sync! slab)) + task)) + + queue! (fn [task] + (if (zero? timeout) + (.offer q task) + (.offer q task timeout TimeUnit/MILLISECONDS)))] + (if-let [_val (locking q + (queue! + (vary-meta (slab!) assoc + ::this this-ref + ::queue-name q-name + ::queue q + ::fsync? fsync-take?)))] + (do + (populate-stats! q-name) + (let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])] + (.incrementAndGet counter)) + true) + false))) + + (put! [this q-name task-descriptor] + (put! this q-name task-descriptor Long/MAX_VALUE)))) + + @this-ref))) + +;;; + +(defn task-seq + "Returns an infinite lazy sequence of tasks for `q-name`." + [qs q-name] + (lazy-seq + (cons + (take! qs q-name) + (task-seq qs q-name)))) + +(defn immediate-task-seq + "Returns a finite lazy sequence of tasks for `q-name` which terminates once there are + no more tasks immediately available." + [qs q-name] + (lazy-seq + (let [task (take! qs q-name 0 ::none)] + (when-not (= ::none task) + (cons + task + (immediate-task-seq qs q-name)))))) + +(defn interval-task-seq + "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will + terminate after that time has elapsed, even if there are still tasks immediately available." + [qs q-name interval] + (let [now (System/nanoTime)] + (lazy-seq + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] + (when (pos? remaining) + (let [task (take! qs q-name remaining ::none)] + (when-not (= ::none task) + (cons + task + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) + +(defn complete! + "Marks a task as complete." + [task] + (if (identical? :complete (status task)) + false + (do + (status! task :complete) + (when (-> task meta ::fsync?) + (sync! (:slab task))) + (mark-complete! @(-> task meta ::this) (-> task meta ::queue-name)) + true))) + +(defn retry! + "Marks a task as available for retry." + [task] + (if (or + (identical? :complete (status task)) + (identical? :incomplete (status task))) + false + (do + (status! task :incomplete) + (when (-> task meta ::fsync?) + (sync! (:slab task))) + (mark-retry! @(-> task meta ::this) (-> task meta ::queue-name)) + (let [^LinkedBlockingQueue q (-> task meta ::queue)] + (.put q task)) + true))) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d468a38..8fdcde5 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -1,10 +1,14 @@ (ns durable-queue + "Use clj-commons.durable-queue instead." + {:deprecated "1.0.0" + :superseded-by "clj-commons.durable-queue" + :no-doc true} (:require - [clojure.java.io :as io] - [byte-streams :as bs] - [clojure.string :as str] - [primitive-math :as p] - [taoensso.nippy :as nippy]) + [clojure.java.io :as io] + [clj-commons.byte-streams :as bs] + [clojure.string :as str] + [clj-commons.primitive-math :as p] + [taoensso.nippy :as nippy]) (:import [java.lang.reflect Method @@ -174,10 +178,11 @@ (with-buffer [buf slab] (reset! status s) (.put buf (p/+ offset 1) - (case s - :incomplete 0 - :in-progress 1 - :complete 2)) + (byte + (case s + :incomplete 0 + :in-progress 1 + :complete 2))) (invalidate slab (p/+ offset 1) 1) nil))) @@ -190,9 +195,9 @@ (fn [] (with-buffer [buf slab] (let [^ByteBuffer buf (-> buf - (.position offset) + (.position ^Long offset) ^ByteBuffer - (.limit (+ offset len)) + (.limit ^Long (+ offset len)) .slice) checksum' (.getLong buf 2) ary (bs/to-byte-array (.position buf header-size))] @@ -303,17 +308,17 @@ (let [ary (nippy/freeze descriptor) cnt (count ary) pos @position - ^ByteBuffer buf (.position buf pos)] + ^ByteBuffer buf (.position buf ^Long pos)] (when (> (.remaining buf) (+ (count ary) header-size)) ;; write to the buffer (doto buf - (.position pos) + (.position ^Long pos) (.put (byte 1)) ;; exists (.put (byte 0)) ;; incomplete (.putLong (checksum cnt ary)) (.putInt cnt) - (.put ary) + (.put ^bytes ary) (.put (byte 0))) ;; next doesn't exist (swap! position + header-size cnt) @@ -432,6 +437,8 @@ (^:private mark-retry! [_ q-name]) (delete! [_] "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -463,7 +470,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed. " ([directory] (queues directory nil)) ([directory @@ -489,9 +502,7 @@ (.mkdirs (io/file directory)) - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) queue-name->files (directory->queue-name->slab-files directory) ;; core state stores @@ -561,10 +572,10 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (max 0 (- fsync-interval (- end start)))))) + (let [end (System/nanoTime)] + (Thread/sleep (max 0 (- (* 1000000 fsync-interval) (- end start)))))) (catch Throwable e ))))))) @@ -624,6 +635,16 @@ (unmap s) (delete-slab s))) + (delete-q! [this q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab))) @@ -758,16 +779,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete." diff --git a/test/clj_commons/durable_queue_test.clj b/test/clj_commons/durable_queue_test.clj new file mode 100644 index 0000000..6ae74c1 --- /dev/null +++ b/test/clj_commons/durable_queue_test.clj @@ -0,0 +1,138 @@ +(ns clj-commons.durable-queue-test + (:require + [clojure.java.io :as io] + [clojure.test :as t] + [clj-commons.durable-queue :as dq] + [criterium.core :as c])) + +(defn clear-tmp-directory [] + (doseq [f (->> (#'dq/directory->queue-name->slab-files "/tmp") + vals + (apply concat))] + (.delete (io/file f)))) + +(t/deftest test-basic-put-take + (clear-tmp-directory) + (let [q (dq/queues "/tmp" {:slab-size 1024}) + tasks (range 1e4)] + (doseq [t tasks] + (dq/put! q :foo t)) + (t/is (= tasks (map deref (dq/immediate-task-seq q :foo)))) + (dq/delete! q))) + +(t/deftest test-partial-slab-writes + ( + clear-tmp-directory) + (dotimes [i 10] + (dq/put! (dq/queues "/tmp") :foo i)) + (t/is (= (range 10) + (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) + +(t/deftest test-retry + (clear-tmp-directory) + (with-open [q (dq/queues "/tmp")] + + (doseq [t (range 10)] + (dq/put! q :foo t)) + + (let [tasks' (dq/immediate-task-seq q :foo)] + (t/is (= (range 10) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)) + (doseq [t (range 10 15)] + (dq/put! q :foo t)))) + + ;; create a new manager, which will mark all in-progress tasks as incomplete + (with-open [q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (t/is (= (range 5 15) (map deref tasks'))) + (doseq [t (take 5 tasks')] + (dq/complete! t)))) + + (with-open [q (dq/queues "/tmp")] + (let [tasks' (dq/immediate-task-seq q :foo)] + (t/is (= (range 10 15) (map deref tasks'))) + (doseq [t (range 15 20)] + (dq/put! q :foo t)))) + + (let [q (dq/queues "/tmp" {:complete? even?})] + (t/is (= (remove even? (range 10 20)) + (map deref (dq/immediate-task-seq q :foo)))))) + +;;; + +(t/deftest ^:benchmark benchmark-put-take + (clear-tmp-directory) + + (println "\n\n-- sync both") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync take") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync put") + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 10 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100 writes") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync every 100ms") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (println "\n\n-- sync neither") + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] + (c/quick-bench + (do + (dq/put! q :foo 1) + (dq/complete! (dq/take! q :foo))))) + + (clear-tmp-directory)) + +;;; + +(t/deftest ^:stress stress-queue-size + (clear-tmp-directory) + + (with-open [q (dq/queues "/tmp")] + (let [ary (byte-array 1e6)] + (dotimes [i 1e6] + (aset ary i (byte (rand-int 127)))) + (dotimes [_ 1e5] + (dq/put! q :stress ary)))) + + (with-open [q (dq/queues "/tmp" {:complete? (constantly false)})] + (let [s (doall (dq/immediate-task-seq q :stress))] + (doseq [t s] + (dq/retry! t))) + (let [s (dq/immediate-task-seq q :stress)] + (doseq [t s] + (dq/complete! t)))) + + (clear-tmp-directory)) diff --git a/test/durable_queue_test.clj b/test/durable_queue_test.clj deleted file mode 100644 index b580ca1..0000000 --- a/test/durable_queue_test.clj +++ /dev/null @@ -1,133 +0,0 @@ -(ns durable-queue-test - (:require - [clojure.java.io :as io] - [clojure.test :refer :all] - [durable-queue :refer :all] - [criterium.core :as c])) - -(defn clear-tmp-directory [] - (doseq [f (->> (#'durable-queue/directory->queue-name->slab-files "/tmp") - vals - (apply concat))] - (.delete (io/file f)))) - -(deftest test-basic-put-take - (clear-tmp-directory) - (let [q (queues "/tmp" {:slab-size 1024}) - tasks (range 1e4)] - (doseq [t tasks] - (put! q :foo t)) - (is (= tasks (map deref (immediate-task-seq q :foo)))) - (delete! q))) - -(deftest test-partial-slab-writes - (clear-tmp-directory) - (dotimes [i 10] - (put! (queues "/tmp") :foo i)) - (is (= (range 10) (map deref (immediate-task-seq (queues "/tmp") :foo))))) - -(deftest test-retry - (clear-tmp-directory) - (with-open [q (queues "/tmp")] - - (doseq [t (range 10)] - (put! q :foo t)) - - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 10) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (complete! t)) - (doseq [t (range 10 15)] - (put! q :foo t)))) - - ;; create a new manager, which will mark all in-progress tasks as incomplete - (with-open [q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 5 15) (map deref tasks'))) - (doseq [t (take 5 tasks')] - (complete! t)))) - - (with-open [q (queues "/tmp")] - (let [tasks' (immediate-task-seq q :foo)] - (is (= (range 10 15) (map deref tasks'))) - (doseq [t (range 15 20)] - (put! q :foo t)))) - - (let [q (queues "/tmp" {:complete? even?})] - (is (= (remove even? (range 10 20)) (map deref (immediate-task-seq q :foo)))))) - -;;; - -(deftest ^:benchmark benchmark-put-take - (clear-tmp-directory) - - (println "\n\n-- sync both") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? true})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync take") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? true})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync put") - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? false})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 10 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 100 writes") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync every 100ms") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-interval 100})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo))))) - - (println "\n\n-- sync neither") - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? false})] - (c/quick-bench - (do - (put! q :foo 1) - (complete! (take! q :foo)))))) - -;;; - -(deftest ^:stress stress-queue-size - (clear-tmp-directory) - - (with-open [q (queues "/tmp")] - (let [ary (byte-array 1e6)] - (dotimes [i 1e6] - (aset ary i (byte (rand-int 127)))) - (dotimes [_ 1e5] - (put! q :stress ary)))) - - (with-open [q (queues "/tmp" {:complete? (constantly false)})] - (let [s (doall (immediate-task-seq q :stress))] - (doseq [t s] - (retry! t))) - (let [s (immediate-task-seq q :stress)] - (doseq [t s] - (complete! t)))) - - (clear-tmp-directory))