Skip to content

Commit c666a48

Browse files
committed
make fsync interval include time to actually fsync
1 parent 9d097ee commit c666a48

File tree

1 file changed

+34
-32
lines changed

1 file changed

+34
-32
lines changed

src/durable_queue.clj

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
[^ByteBuffer buf]
128128
(when (.isDirect buf)
129129
(try
130-
130+
131131
(let [^Method clean @clean
132132
cleaner (doto (.getMethod (class buf) "cleaner" nil)
133133
(.setAccessible true))]
@@ -203,7 +203,7 @@
203203
;; [ exists? : int8
204204
;; state : int8
205205
;; checksum : int64
206-
;; size : int32
206+
;; size : int32
207207
;; payload : array ]
208208
;; valid values for 'exists' is 0 (no), 1 (yes)
209209
;; valid values for 'state' is 0 (unclaimed), 1 (in progress), 2 (complete)
@@ -216,24 +216,24 @@
216216
(let [^ByteBuffer
217217
buf' (-> (buffer slab)
218218
(.position pos))]
219-
219+
220220
;; is there a next task, and is there space left in the buffer?
221221
(when (and
222222
(pos? (.remaining buf'))
223223
(== 1 (.get buf')))
224-
224+
225225
(lazy-seq
226226
(let [status (.get buf')
227227
checksum (.getLong buf')
228228
size (.getInt buf')]
229229
(cons
230-
230+
231231
(task
232232
slab
233233
pos
234234
(+ header-size size)
235235
(read-write-lock slab))
236-
236+
237237
(slab->task-seq
238238
slab
239239
(+ pos header-size size)))))))
@@ -266,7 +266,7 @@
266266

267267
(mapped? [_]
268268
(boolean @buf))
269-
269+
270270
(unmap [_]
271271
(with-exclusive-lock lock
272272
(when (.exists (io/file filename))
@@ -308,18 +308,18 @@
308308
(.putInt cnt)
309309
(.put ary)
310310
(.put (byte 0))) ;; next doesn't exist
311-
311+
312312
(swap! position + header-size cnt)
313313

314314
(invalidate this pos (+ header-size cnt))
315-
315+
316316
;; return a task to enqueue in-memory
317317
(task
318318
this
319319
pos
320320
(+ header-size cnt)
321321
lock)))))
322-
322+
323323
clojure.lang.Seqable
324324
(seq [this]
325325
(slab->task-seq this))
@@ -357,7 +357,7 @@
357357

358358
(when-not (.createNewFile f)
359359
(throw (IOException. (str "Could not create new slab file at " (.getAbsolutePath f)))))
360-
360+
361361
(TaskSlab.
362362
(.getAbsolutePath f)
363363
q-name
@@ -413,8 +413,8 @@
413413
(defn- immediate-stats [^LinkedBlockingQueue q {:keys [enqueued retried completed]}]
414414
(let [cnt (.size q)
415415
completed (.get ^AtomicLong completed)
416-
enqueued (.get ^AtomicLong enqueued)]
417-
{:enqueued enqueued
416+
enqueued (.get ^AtomicLong enqueued)]
417+
{:enqueued enqueued
418418
:retried (.get ^AtomicLong retried)
419419
:completed completed
420420
:in-progress (- (- enqueued completed) cnt)}))
@@ -478,11 +478,11 @@
478478
(or fsync-threshold fsync-interval)
479479
(or fsync-take? fsync-put?)))
480480
"Both batch and per-task fsync options are enabled, which is probably not what you intended.")
481-
481+
482482
(.mkdirs (io/file directory))
483483

484484
(let [
485-
485+
486486
queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size))))
487487
queue-name->files (directory->queue-name->slab-files directory)
488488

@@ -505,7 +505,7 @@
505505

506506
queue-name->current-slab (atom {})
507507

508-
;; initialize
508+
;; initialize
509509
slabs (->> @queue-name->slabs vals (apply concat))
510510
slab->count (zipmap
511511
slabs
@@ -539,25 +539,27 @@
539539
this-ref (promise)
540540

541541
action-counter (AtomicLong. 0)
542-
542+
543543
mark-action! (if fsync-threshold
544544
(fn []
545545
(when (zero? (rem (.incrementAndGet action-counter) fsync-threshold))
546546
(fsync @this-ref)))
547547
(fn []))]
548548

549-
;;
549+
;;
550550
(when fsync-interval
551551
(future
552552
(let [ref (WeakReference. @this-ref)]
553553
(while (.get ref)
554554
(when-let [q (.get ref)]
555555
(try
556-
(Thread/sleep fsync-interval)
557-
(fsync q)
556+
(let [start (System/currentTimeMillis)]
557+
(fsync q)
558+
(let [end (System/currentTimeMillis)]
559+
(Thread/sleep (max 0 (- fsync-interval (- end start))))))
558560
(catch Throwable e
559561
)))))))
560-
562+
561563
;; populate queues with pre-existing tasks
562564
(let [empty-slabs (atom #{})]
563565
(doseq [[q slabs] @queue-name->slabs]
@@ -571,14 +573,14 @@
571573
::fsync? fsync-take?))
572574
(remove #(or (= :complete (status %))
573575
(and complete? (complete? @%)))))]
574-
576+
575577
(if (empty? tasks)
576-
578+
577579
;; if there aren't any active tasks, just delete the slab
578580
(do
579581
(delete-slab slab)
580582
(swap! empty-slabs conj slab))
581-
583+
582584
(do
583585
(doseq [task tasks]
584586
(status! task :incomplete)
@@ -612,19 +614,19 @@
612614
(fsync [_]
613615
(doseq [slab (->> @queue-name->slabs vals (apply concat))]
614616
(sync! slab)))
615-
617+
616618
(mark-retry! [_ q-name]
617619
(mark-action!)
618620
(populate-stats! q-name)
619621
(let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :retried])]
620622
(.incrementAndGet retry-counter)))
621-
623+
622624
(mark-complete! [_ q-name]
623625
(mark-action!)
624626
(populate-stats! q-name)
625627
(let [^AtomicLong retry-counter (get-in @queue-name->stats [q-name :completed])]
626628
(.incrementAndGet retry-counter)))
627-
629+
628630
(stats [_]
629631
(let [ks (keys @queue-name->stats)]
630632
(zipmap ks
@@ -657,7 +659,7 @@
657659
butlast
658660
(remove #(= slab %)))]
659661
(unmap s))))
660-
662+
661663
(status! t :in-progress)
662664
;; we don't need to fsync here, because in-progress and incomplete
663665
;; are effectively equivalent on restart
@@ -666,7 +668,7 @@
666668
timeout-val)
667669
(catch TimeoutException _
668670
timeout-val))))
669-
671+
670672
(take! [this q-name]
671673
(take! this q-name Long/MAX_VALUE nil))
672674

@@ -690,11 +692,11 @@
690692
(throw
691693
(IllegalArgumentException.
692694
(str "Can't enqueue task whose serialized representation is larger than :slab-size, which is currently " slab-size))))
693-
695+
694696
(when fsync-put?
695697
(sync! slab))
696698
task))
697-
699+
698700
queue! (fn [task]
699701
(if (zero? timeout)
700702
(.offer q task)
@@ -714,7 +716,7 @@
714716
false)
715717

716718
nil))
717-
719+
718720
(put! [this q-name task-descriptor]
719721
(put! this q-name task-descriptor Long/MAX_VALUE))))
720722

0 commit comments

Comments
 (0)