Skip to content

Commit 4f9f966

Browse files
committed
use proper mmap cleanup mechanism, per @eric
1 parent d1642b9 commit 4f9f966

File tree

3 files changed

+95
-65
lines changed

3 files changed

+95
-65
lines changed

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
(defproject factual/durable-queue "0.1.0"
1+
(defproject factual/durable-queue "0.1.1-SNAPSHOT"
22
:description "a in-process task-queue that is backed by disk."
33
:license {:name "Eclipse Public License"
44
:url "http://www.eclipse.org/legal/epl-v10.html"}

src/durable_queue.clj

Lines changed: 93 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
[primitive-math :as p]
77
[taoensso.nippy :as nippy])
88
(:import
9+
[java.lang.reflect
10+
Method]
911
[java.util.concurrent
1012
LinkedBlockingQueue
1113
TimeoutException
@@ -131,46 +133,67 @@
131133
([slab]
132134
(slab->task-seq slab 0))
133135
([slab pos]
134-
(seq
135-
(lazy-seq
136-
(try
137-
(let [^ByteBuffer
138-
buf' (-> (buffer slab)
136+
(try
137+
(let [^ByteBuffer
138+
buf' (-> (buffer slab)
139+
.duplicate
140+
(.position pos))]
141+
142+
;; is there a next task, and is there space left in the buffer?
143+
(when (and
144+
(pos? (.remaining buf'))
145+
(== 1 (.get buf')))
146+
147+
(lazy-seq
148+
(let [status (.get buf')
149+
checksum (.getLong buf')
150+
size (.getInt buf')]
151+
(cons
152+
153+
(vary-meta
154+
(task
155+
#(-> (buffer slab)
139156
.duplicate
140-
(.position pos))]
141-
142-
;; is there a next task, and is there space left in the buffer?
143-
(when (and
144-
(pos? (.remaining buf'))
145-
(== 1 (.get buf')))
146-
147-
(let [status (.get buf')
148-
checksum (.getLong buf')
149-
size (.getInt buf')]
150-
(cons
151-
152-
(vary-meta
153-
(task
154-
#(-> (buffer slab)
155-
.duplicate
156-
(.position pos)
157-
^ByteBuffer
158-
(.limit (+ pos header-size size))
159-
.slice)
160-
(read-write-lock slab))
161-
assoc ::slab slab)
162-
163-
(slab->task-seq
164-
slab
165-
(+ pos header-size size))))))
166-
(catch Throwable e
167-
;; this implies unrecoverable corruption
168-
nil
169-
))))))
157+
(.position pos)
158+
^ByteBuffer
159+
(.limit (+ pos header-size size))
160+
.slice)
161+
(read-write-lock slab))
162+
assoc ::slab slab)
163+
164+
(slab->task-seq
165+
slab
166+
(+ pos header-size size)))))))
167+
(catch Throwable e
168+
;; this implies unrecoverable corruption
169+
nil
170+
))))
171+
172+
(let [clean (delay
173+
(doto (.getMethod
174+
(Class/forName "sun.misc.Cleaner")
175+
"clean"
176+
nil)
177+
(.setAccessible true)))]
178+
(defn- unmap-buffer
179+
"A delightful endrun on the JVM's mmap GC mechanism"
180+
[^ByteBuffer buf]
181+
(when (.isDirect buf)
182+
(try
183+
184+
(let [^Method clean @clean
185+
cleaner (doto (.getMethod (class buf) "cleaner" nil)
186+
(.setAccessible true))]
187+
(.invoke ^Method clean
188+
(.invoke cleaner buf nil)
189+
nil))
190+
(catch Throwable e
191+
;; not much we can do here, sadly
192+
)))))
170193

171194
(deftype TaskSlab
172195
[filename
173-
buf+fc+raf ;; a clearable atom-thunk holding the resources associated with the buffer
196+
buf ;; a clearable atom holding the buffer
174197
position ;; an atom storing the write position of the slab
175198
lock]
176199
ITaskSlab
@@ -179,31 +202,30 @@
179202
lock)
180203

181204
(buffer [_]
182-
(locking buf+fc+raf
183-
(if-let [x @buf+fc+raf]
184-
(first x)
185-
(first
186-
(swap! buf+fc+raf
187-
(fn [x]
188-
(or x
189-
(let [_ (assert (.exists (io/file filename)))
190-
raf (RandomAccessFile. (io/file filename) "rw")
191-
fc (.getChannel raf)
192-
buf (.map fc FileChannel$MapMode/READ_WRITE 0 (.length raf))]
193-
[buf fc raf]))))))))
205+
(or @buf
206+
(swap! buf
207+
(fn [buf]
208+
(or buf
209+
(let [_ (assert (.exists (io/file filename)))
210+
raf (RandomAccessFile. (io/file filename) "rw")]
211+
(try
212+
(let [fc (.getChannel raf)]
213+
(try
214+
(.map fc FileChannel$MapMode/READ_WRITE 0 (.length raf))
215+
(finally
216+
(.close fc))))
217+
(finally
218+
(.close raf)))))))))
194219

195220
(mapped? [_]
196-
(boolean @buf+fc+raf))
221+
(boolean @buf))
197222

198223
(unmap [_]
199224
(with-exclusive-lock lock
200-
(when-let [x @buf+fc+raf]
201-
(when (.exists (io/file filename))
202-
(let [[^MappedByteBuffer buf ^FileChannel fc ^RandomAccessFile raf] x]
203-
(.force buf)
204-
(.close raf)
205-
(.close fc)
206-
(reset! buf+fc+raf nil))))))
225+
(when (.exists (io/file filename))
226+
(when-let [buf @buf]
227+
(unmap-buffer buf))
228+
(reset! buf nil))))
207229

208230
(append-to-slab! [this descriptor]
209231
(locking this
@@ -285,13 +307,21 @@
285307
(throw (IOException. (str "Could not create new slab file at " (.getAbsolutePath f)))))
286308

287309
(let [raf (doto (RandomAccessFile. f "rw")
288-
(.setLength size))
289-
fc (.getChannel raf)
290-
buf (.map fc FileChannel$MapMode/READ_WRITE 0 size)]
291-
(doto buf
292-
(.put 0 (byte 0))
293-
.force)
294-
(TaskSlab. (.getAbsolutePath f) (atom [buf fc raf]) (atom 0) (ReentrantReadWriteLock.)))))))
310+
(.setLength size))]
311+
(try
312+
(let [fc (.getChannel raf)]
313+
(try
314+
(let [buf (.map fc FileChannel$MapMode/READ_WRITE 0 size)]
315+
(doto buf
316+
(.put 0 (byte 0))
317+
.force)
318+
(TaskSlab. (.getAbsolutePath f) (atom buf) (atom 0) (ReentrantReadWriteLock.)))
319+
(finally
320+
(.close fc))))
321+
(finally
322+
(.close raf)))
323+
324+
)))))
295325

296326
(defn- file->slab
297327
"Transforms a file into a slab representing that file's contents."

test/durable_queue_test.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
(let [ary (byte-array 1e6)]
9898
(dotimes [i 1e6]
9999
(aset ary i (byte (rand-int 127))))
100-
(dotimes [_ 1e3]
100+
(dotimes [_ 1e5]
101101
(put! q :stress ary))))
102102

103103
(with-open [q (queues "/tmp" {:complete? (constantly false)})]

0 commit comments

Comments
 (0)