From f0766447b0d3fdc87ade435a64da3b1da3027e5b Mon Sep 17 00:00:00 2001 From: Mark Bastian Date: Mon, 6 Apr 2020 23:35:43 -0600 Subject: [PATCH 1/4] Add type hints to breaking changes in java.nio.ByteBuffer Fixes Java 13+ compatiblity issues, as noted in clj-commons/durable-queue#23. Copying the relevant parts from the Issue: `java.nio.ByteBuffer` and the other buffer types in `java.nio` now define absolute bulk `get` and `put` methods to transfer contiguous sequences of bytes without regard to or effect on the buffer position. Due to this, running durable-queue against Java 13+ leads to the following error: No matching method put found taking 2 args for class `java.nio.DirectByteBuffer` This commit fixes the problem by adding the relevant type-hints Author: Mark Bastian Ref: https://github.com/markbastian/durable-queue Closes: clj-commons/durable-queue#23 Closes: clj-commons/durable-queue#24 --- .gitignore | 4 +++- src/durable_queue.clj | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 7d70dce..b70b08c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ pom.xml.asc /.nrepl-port .DS_Store /doc -push \ No newline at end of file +push +*.iml +.idea/ \ No newline at end of file diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d468a38..d45fc89 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -174,10 +174,11 @@ (with-buffer [buf slab] (reset! status s) (.put buf (p/+ offset 1) - (case s - :incomplete 0 - :in-progress 1 - :complete 2)) + (byte + (case s + :incomplete 0 + :in-progress 1 + :complete 2))) (invalidate slab (p/+ offset 1) 1) nil))) @@ -190,9 +191,9 @@ (fn [] (with-buffer [buf slab] (let [^ByteBuffer buf (-> buf - (.position offset) + (.position ^Long offset) ^ByteBuffer - (.limit (+ offset len)) + (.limit ^Long (+ offset len)) .slice) checksum' (.getLong buf 2) ary (bs/to-byte-array (.position buf header-size))] @@ -303,17 +304,17 @@ (let [ary (nippy/freeze descriptor) cnt (count ary) pos @position - ^ByteBuffer buf (.position buf pos)] + ^ByteBuffer buf (.position buf ^Long pos)] (when (> (.remaining buf) (+ (count ary) header-size)) ;; write to the buffer (doto buf - (.position pos) + (.position ^Long pos) (.put (byte 1)) ;; exists (.put (byte 0)) ;; incomplete (.putLong (checksum cnt ary)) (.putInt cnt) - (.put ary) + (.put ^bytes ary) (.put (byte 0))) ;; next doesn't exist (swap! position + header-size cnt) From f115a98d6ac5f41ab63f5229d9c8a33112cba71a Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Tue, 27 Nov 2018 10:00:08 +1300 Subject: [PATCH 2/4] Use nanoTime instead of currentTimeMillis for elapsed time System/currentTimeMillis is not guaranteed to progress monotonically. NTP shifts, leap seconds, and manual system time changes can all cause elapsed time calculations to be incorrect if you use currentTimeMillis. nanoTime is always calculated against a fixed point and proceeds monotonically. https://go.googlesource.com/proposal/+/master/design/12914-monotonic.md has a good discussion on monotonic time. Closes: clj-commons/durable-queue#21 --- src/durable_queue.clj | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d45fc89..24d89f3 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -562,10 +562,10 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (max 0 (- fsync-interval (- end start)))))) + (let [end (System/nanoTime)] + (Thread/sleep (max 0 (- (* 1000000 fsync-interval) (- end start)))))) (catch Throwable e ))))))) @@ -759,16 +759,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete." From 00922a3a3d38d5ecc07804d5baddde17882d990f Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Tue, 27 Nov 2018 09:52:09 +1300 Subject: [PATCH 3/4] Document fsync-threshold and fsync-interval in docstring The README contains documentation for these parameters, this commit copies them from the README into the docstring. Closes: clj-commons/durable-queue#20 --- src/durable_queue.clj | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 24d89f3..2eddc0b 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -464,7 +464,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed. " ([directory] (queues directory nil)) ([directory From cb2d51011e2826538ca6f64c13b6fe5149621b6e Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Mon, 23 Apr 2018 19:23:20 -0700 Subject: [PATCH 4/4] Implement delete-q! function to delete a single queue This function lets the client delete an individual named queue. It is useful for deleting corrupted queue files (such as that get created when the disk fills up). Closes: clj-commons/durable-queue#17 --- src/durable_queue.clj | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 2eddc0b..928170d 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -433,6 +433,8 @@ (^:private mark-retry! [_ q-name]) (delete! [_] "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -496,9 +498,7 @@ (.mkdirs (io/file directory)) - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) queue-name->files (directory->queue-name->slab-files directory) ;; core state stores @@ -631,6 +631,16 @@ (unmap s) (delete-slab s))) + (delete-q! [this q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab)))