File tree Expand file tree Collapse file tree 2 files changed +26
-14
lines changed Expand file tree Collapse file tree 2 files changed +26
-14
lines changed Original file line number Diff line number Diff line change 219219
220220 ; ; is there a next task, and is there space left in the buffer?
221221 (when (and
222- (pos? (.remaining buf'))
222+ (< header-size (.remaining buf'))
223223 (== 1 (.get buf')))
224224
225225 (lazy-seq
226226 (let [status (.get buf')
227227 checksum (.getLong buf')
228228 size (.getInt buf')]
229- (cons
230229
231- (task
232- slab
233- pos
234- (+ header-size size)
235- (read-write-lock slab))
230+ ; ; this shouldn't be necessary, but let's not gratuitously
231+ ; ; overreach our bounds
232+ (when (< size (.remaining buf'))
233+ (cons
236234
237- (slab->task-seq
238- slab
239- (+ pos header-size size)))))))
235+ (task
236+ slab
237+ pos
238+ (+ header-size size)
239+ (read-write-lock slab))
240+
241+ (slab->task-seq
242+ slab
243+ (+ pos header-size size))))))))
240244 (catch Throwable e
241245 ; ; this implies unrecoverable corruption
242246 nil
424428(defprotocol IQueues
425429 (^:private mark-complete! [_ q-name])
426430 (^:private mark-retry! [_ q-name])
431+ (delete! [_]
432+ " Deletes all files associated with the queues." )
427433 (stats [_]
428434 " Returns a map of queue names onto information about the immediate state of the queue." )
429435 (fsync [_]
611617
612618 IQueues
613619
620+ (delete! [this]
621+ (doseq [s (->> @queue-name->slabs vals (apply concat))]
622+ (unmap s)
623+ (delete-slab s)))
624+
614625 (fsync [_]
615626 (doseq [slab (->> @queue-name->slabs vals (apply concat))]
616627 (sync! slab)))
Original file line number Diff line number Diff line change 1717 tasks (range 1e4 )]
1818 (doseq [t tasks]
1919 (put! q :foo t))
20- (is (= tasks (map deref (immediate-task-seq q :foo ))))))
20+ (is (= tasks (map deref (immediate-task-seq q :foo ))))
21+ (delete! q)))
2122
2223(deftest test-partial-slab-writes
2324 (clear-tmp-directory )
4546 (is (= (range 5 15 ) (map deref tasks')))
4647 (doseq [t (take 5 tasks')]
4748 (complete! t))))
48-
49+
4950 (with-open [q (queues " /tmp" )]
5051 (let [tasks' (immediate-task-seq q :foo )]
5152 (is (= (range 10 15 ) (map deref tasks')))
5253 (doseq [t (range 15 20 )]
5354 (put! q :foo t))))
54-
55+
5556 (let [q (queues " /tmp" {:complete? even?})]
5657 (is (= (remove even? (range 10 20 )) (map deref (immediate-task-seq q :foo ))))))
5758
128129 (let [s (immediate-task-seq q :stress )]
129130 (doseq [t s]
130131 (complete! t))))
131-
132+
132133 (clear-tmp-directory ))
You can’t perform that action at this time.
0 commit comments