|
1 | 1 | (ns durable-queue-test
|
2 | 2 | (:require
|
3 | 3 | [clojure.java.io :as io]
|
4 |
| - [clojure.test :refer :all] |
5 |
| - [durable-queue :refer :all] |
| 4 | + [clojure.test :refer [deftest is]] |
| 5 | + [durable-queue :as dq] |
6 | 6 | [criterium.core :as c]))
|
7 | 7 |
|
8 | 8 | (defn clear-tmp-directory []
|
|
13 | 13 |
|
14 | 14 | (deftest test-basic-put-take
|
15 | 15 | (clear-tmp-directory)
|
16 |
| - (let [q (queues "/tmp" {:slab-size 1024}) |
| 16 | + (let [q (dq/queues "/tmp" {:slab-size 1024}) |
17 | 17 | tasks (range 1e4)]
|
18 | 18 | (doseq [t tasks]
|
19 |
| - (put! q :foo t)) |
20 |
| - (is (= tasks (map deref (immediate-task-seq q :foo)))) |
21 |
| - (delete! q))) |
| 19 | + (dq/put! q :foo t)) |
| 20 | + (is (= tasks (map deref (dq/immediate-task-seq q :foo)))) |
| 21 | + (dq/delete! q))) |
22 | 22 |
|
23 | 23 | (deftest test-partial-slab-writes
|
24 | 24 | (clear-tmp-directory)
|
25 | 25 | (dotimes [i 10]
|
26 |
| - (put! (queues "/tmp") :foo i)) |
27 |
| - (is (= (range 10) (map deref (immediate-task-seq (queues "/tmp") :foo))))) |
| 26 | + (dq/put! (dq/queues "/tmp") :foo i)) |
| 27 | + (is (= (range 10) (map deref (dq/immediate-task-seq (dq/queues "/tmp") :foo))))) |
28 | 28 |
|
29 | 29 | (deftest test-retry
|
30 | 30 | (clear-tmp-directory)
|
31 |
| - (with-open [^java.io.Closeable q (queues "/tmp")] |
| 31 | + (with-open [^java.io.Closeable q (dq/queues "/tmp")] |
32 | 32 |
|
33 | 33 | (doseq [t (range 10)]
|
34 |
| - (put! q :foo t)) |
| 34 | + (dq/put! q :foo t)) |
35 | 35 |
|
36 |
| - (let [tasks' (immediate-task-seq q :foo)] |
| 36 | + (let [tasks' (dq/immediate-task-seq q :foo)] |
37 | 37 | (is (= (range 10) (map deref tasks')))
|
38 | 38 | (doseq [t (take 5 tasks')]
|
39 |
| - (complete! t)) |
| 39 | + (dq/complete! t)) |
40 | 40 | (doseq [t (range 10 15)]
|
41 |
| - (put! q :foo t)))) |
| 41 | + (dq/put! q :foo t)))) |
42 | 42 |
|
43 | 43 | ;; create a new manager, which will mark all in-progress tasks as incomplete
|
44 |
| - (with-open [^java.io.Closeable q (queues "/tmp")] |
45 |
| - (let [tasks' (immediate-task-seq q :foo)] |
| 44 | + (with-open [^java.io.Closeable q (dq/queues "/tmp")] |
| 45 | + (let [tasks' (dq/immediate-task-seq q :foo)] |
46 | 46 | (is (= (range 5 15) (map deref tasks')))
|
47 | 47 | (doseq [t (take 5 tasks')]
|
48 |
| - (complete! t)))) |
| 48 | + (dq/complete! t)))) |
49 | 49 |
|
50 |
| - (with-open [^java.io.Closeable q (queues "/tmp")] |
51 |
| - (let [tasks' (immediate-task-seq q :foo)] |
| 50 | + (with-open [^java.io.Closeable q (dq/queues "/tmp")] |
| 51 | + (let [tasks' (dq/immediate-task-seq q :foo)] |
52 | 52 | (is (= (range 10 15) (map deref tasks')))
|
53 | 53 | (doseq [t (range 15 20)]
|
54 |
| - (put! q :foo t)))) |
| 54 | + (dq/put! q :foo t)))) |
55 | 55 |
|
56 |
| - (let [q (queues "/tmp" {:complete? even?})] |
57 |
| - (is (= (remove even? (range 10 20)) (map deref (immediate-task-seq q :foo)))))) |
| 56 | + (let [q (dq/queues "/tmp" {:complete? even?})] |
| 57 | + (is (= (remove even? (range 10 20)) (map deref (dq/immediate-task-seq q :foo)))))) |
58 | 58 |
|
59 | 59 | ;;;
|
60 | 60 |
|
61 | 61 | (deftest ^:benchmark benchmark-put-take
|
62 | 62 | (clear-tmp-directory)
|
63 | 63 |
|
64 | 64 | (println "\n\n-- sync both")
|
65 |
| - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? true})] |
| 65 | + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? true})] |
66 | 66 | (c/quick-bench
|
67 | 67 | (do
|
68 |
| - (put! q :foo 1) |
69 |
| - (complete! (take! q :foo))))) |
| 68 | + (dq/put! q :foo 1) |
| 69 | + (dq/complete! (dq/take! q :foo))))) |
70 | 70 |
|
71 | 71 | (println "\n\n-- sync take")
|
72 |
| - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? true})] |
| 72 | + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? true})] |
73 | 73 | (c/quick-bench
|
74 | 74 | (do
|
75 |
| - (put! q :foo 1) |
76 |
| - (complete! (take! q :foo))))) |
| 75 | + (dq/put! q :foo 1) |
| 76 | + (dq/complete! (dq/take! q :foo))))) |
77 | 77 |
|
78 | 78 | (println "\n\n-- sync put")
|
79 |
| - (let [q (queues "/tmp" {:fsync-put? true, :fsync-take? false})] |
| 79 | + (let [q (dq/queues "/tmp" {:fsync-put? true, :fsync-take? false})] |
80 | 80 | (c/quick-bench
|
81 | 81 | (do
|
82 |
| - (put! q :foo 1) |
83 |
| - (complete! (take! q :foo))))) |
| 82 | + (dq/put! q :foo 1) |
| 83 | + (dq/complete! (dq/take! q :foo))))) |
84 | 84 |
|
85 | 85 | (println "\n\n-- sync every 10 writes")
|
86 |
| - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] |
| 86 | + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 10})] |
87 | 87 | (c/quick-bench
|
88 | 88 | (do
|
89 |
| - (put! q :foo 1) |
90 |
| - (complete! (take! q :foo))))) |
| 89 | + (dq/put! q :foo 1) |
| 90 | + (dq/complete! (dq/take! q :foo))))) |
91 | 91 |
|
92 | 92 | (println "\n\n-- sync every 100 writes")
|
93 |
| - (let [q (queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] |
| 93 | + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-threshold 100})] |
94 | 94 | (c/quick-bench
|
95 | 95 | (do
|
96 |
| - (put! q :foo 1) |
97 |
| - (complete! (take! q :foo))))) |
| 96 | + (dq/put! q :foo 1) |
| 97 | + (dq/complete! (dq/take! q :foo))))) |
98 | 98 |
|
99 | 99 | (println "\n\n-- sync every 100ms")
|
100 |
| - (let [q (queues "/tmp" {:fsync-put? false, :fsync-interval 100})] |
| 100 | + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-interval 100})] |
101 | 101 | (c/quick-bench
|
102 | 102 | (do
|
103 |
| - (put! q :foo 1) |
104 |
| - (complete! (take! q :foo))))) |
| 103 | + (dq/put! q :foo 1) |
| 104 | + (dq/complete! (dq/take! q :foo))))) |
105 | 105 |
|
106 | 106 | (println "\n\n-- sync neither")
|
107 |
| - (let [q (queues "/tmp" {:fsync-put? false, :fsync-take? false})] |
| 107 | + (let [q (dq/queues "/tmp" {:fsync-put? false, :fsync-take? false})] |
108 | 108 | (c/quick-bench
|
109 | 109 | (do
|
110 |
| - (put! q :foo 1) |
111 |
| - (complete! (take! q :foo)))))) |
| 110 | + (dq/put! q :foo 1) |
| 111 | + (dq/complete! (dq/take! q :foo)))))) |
112 | 112 |
|
113 | 113 | ;;;
|
114 | 114 |
|
115 | 115 | (deftest ^:stress stress-queue-size
|
116 | 116 | (clear-tmp-directory)
|
117 | 117 |
|
118 |
| - (with-open [^java.io.Closeable q (queues "/tmp")] |
| 118 | + (with-open [^java.io.Closeable q (dq/queues "/tmp")] |
119 | 119 | (let [ary (byte-array 1e6)]
|
120 | 120 | (dotimes [i 1e6]
|
121 | 121 | (aset ary i (byte (rand-int 127))))
|
122 | 122 | (dotimes [_ 1e5]
|
123 |
| - (put! q :stress ary)))) |
| 123 | + (dq/put! q :stress ary)))) |
124 | 124 |
|
125 |
| - (with-open [^java.io.Closeable q (queues "/tmp" {:complete? (constantly false)})] |
126 |
| - (let [s (doall (immediate-task-seq q :stress))] |
| 125 | + (with-open [^java.io.Closeable q (dq/queues "/tmp" {:complete? (constantly false)})] |
| 126 | + (let [s (doall (dq/immediate-task-seq q :stress))] |
127 | 127 | (doseq [t s]
|
128 |
| - (retry! t))) |
129 |
| - (let [s (immediate-task-seq q :stress)] |
| 128 | + (dq/retry! t))) |
| 129 | + (let [s (dq/immediate-task-seq q :stress)] |
130 | 130 | (doseq [t s]
|
131 |
| - (complete! t)))) |
| 131 | + (dq/complete! t)))) |
132 | 132 |
|
133 | 133 | (clear-tmp-directory))
|
0 commit comments