File tree Expand file tree Collapse file tree 2 files changed +26
-14
lines changed Expand file tree Collapse file tree 2 files changed +26
-14
lines changed Original file line number Diff line number Diff line change 219
219
220
220
; ; is there a next task, and is there space left in the buffer?
221
221
(when (and
222
- (pos? (.remaining buf'))
222
+ (< header-size (.remaining buf'))
223
223
(== 1 (.get buf')))
224
224
225
225
(lazy-seq
226
226
(let [status (.get buf')
227
227
checksum (.getLong buf')
228
228
size (.getInt buf')]
229
- (cons
230
229
231
- (task
232
- slab
233
- pos
234
- (+ header-size size)
235
- (read-write-lock slab))
230
+ ; ; this shouldn't be necessary, but let's not gratuitously
231
+ ; ; overreach our bounds
232
+ (when (< size (.remaining buf'))
233
+ (cons
236
234
237
- (slab->task-seq
238
- slab
239
- (+ pos header-size size)))))))
235
+ (task
236
+ slab
237
+ pos
238
+ (+ header-size size)
239
+ (read-write-lock slab))
240
+
241
+ (slab->task-seq
242
+ slab
243
+ (+ pos header-size size))))))))
240
244
(catch Throwable e
241
245
; ; this implies unrecoverable corruption
242
246
nil
424
428
(defprotocol IQueues
425
429
(^:private mark-complete! [_ q-name])
426
430
(^:private mark-retry! [_ q-name])
431
+ (delete! [_]
432
+ " Deletes all files associated with the queues." )
427
433
(stats [_]
428
434
" Returns a map of queue names onto information about the immediate state of the queue." )
429
435
(fsync [_]
611
617
612
618
IQueues
613
619
620
+ (delete! [this]
621
+ (doseq [s (->> @queue-name->slabs vals (apply concat))]
622
+ (unmap s)
623
+ (delete-slab s)))
624
+
614
625
(fsync [_]
615
626
(doseq [slab (->> @queue-name->slabs vals (apply concat))]
616
627
(sync! slab)))
Original file line number Diff line number Diff line change 17
17
tasks (range 1e4 )]
18
18
(doseq [t tasks]
19
19
(put! q :foo t))
20
- (is (= tasks (map deref (immediate-task-seq q :foo ))))))
20
+ (is (= tasks (map deref (immediate-task-seq q :foo ))))
21
+ (delete! q)))
21
22
22
23
(deftest test-partial-slab-writes
23
24
(clear-tmp-directory )
45
46
(is (= (range 5 15 ) (map deref tasks')))
46
47
(doseq [t (take 5 tasks')]
47
48
(complete! t))))
48
-
49
+
49
50
(with-open [q (queues " /tmp" )]
50
51
(let [tasks' (immediate-task-seq q :foo )]
51
52
(is (= (range 10 15 ) (map deref tasks')))
52
53
(doseq [t (range 15 20 )]
53
54
(put! q :foo t))))
54
-
55
+
55
56
(let [q (queues " /tmp" {:complete? even?})]
56
57
(is (= (remove even? (range 10 20 )) (map deref (immediate-task-seq q :foo ))))))
57
58
128
129
(let [s (immediate-task-seq q :stress )]
129
130
(doseq [t s]
130
131
(complete! t))))
131
-
132
+
132
133
(clear-tmp-directory ))
You can’t perform that action at this time.
0 commit comments