@@ -419,20 +419,25 @@ module From = struct
419
419
420
420
let session_is_invalid call = with_lock call.m (fun () -> call.session_invalid)
421
421
422
- let wait2 call from_id deadline =
422
+ let wait2 call from_id timer =
423
423
let timeoutname = Printf. sprintf " event_from_timeout_%Ld" call.index in
424
424
with_lock m (fun () ->
425
425
while
426
426
from_id = call.cur_id
427
427
&& (not (session_is_invalid call))
428
- && Unix. gettimeofday () < deadline
428
+ && not ( Clock.Timer. has_expired timer)
429
429
do
430
- Xapi_stdext_threads_scheduler.Scheduler. add_to_queue timeoutname
431
- Xapi_stdext_threads_scheduler.Scheduler. OneShot
432
- (deadline -. Unix. gettimeofday () +. 0.5 )
433
- (fun () -> Condition. broadcast c) ;
434
- Condition. wait c m ;
435
- Xapi_stdext_threads_scheduler.Scheduler. remove_from_queue timeoutname
430
+ match Clock.Timer. remaining timer with
431
+ | Expired _ ->
432
+ ()
433
+ | Remaining delta ->
434
+ Xapi_stdext_threads_scheduler.Scheduler. add_to_queue_span
435
+ timeoutname Xapi_stdext_threads_scheduler.Scheduler. OneShot
436
+ delta (fun () -> Condition. broadcast c
437
+ ) ;
438
+ Condition. wait c m ;
439
+ Xapi_stdext_threads_scheduler.Scheduler. remove_from_queue
440
+ timeoutname
436
441
done
437
442
) ;
438
443
if session_is_invalid call then (
@@ -506,7 +511,7 @@ let rec next ~__context =
506
511
else
507
512
rpc_of_events relevant
508
513
509
- let from_inner __context session subs from from_t deadline =
514
+ let from_inner __context session subs from from_t timer =
510
515
let open Xapi_database in
511
516
let open From in
512
517
(* The database tables involved in our subscription *)
@@ -605,14 +610,14 @@ let from_inner __context session subs from from_t deadline =
605
610
&& mods = []
606
611
&& deletes = []
607
612
&& messages = []
608
- && Unix. gettimeofday () < deadline
613
+ && not ( Clock.Timer. has_expired timer)
609
614
then (
610
615
last_generation := last ;
611
616
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
612
617
sub.cur_id < - last ;
613
618
(* last id the client got is equivalent to the current one *)
614
619
last_msg_gen := msg_gen ;
615
- wait2 sub last deadline ;
620
+ wait2 sub last timer ;
616
621
Thread. delay 0.05 ;
617
622
grab_nonempty_range ()
618
623
) else
@@ -705,14 +710,19 @@ let from ~__context ~classes ~token ~timeout =
705
710
)
706
711
in
707
712
let subs = List. map Subscription. of_string classes in
708
- let deadline = Unix. gettimeofday () +. timeout in
713
+ let duration =
714
+ timeout
715
+ |> Clock.Timer. s_to_span
716
+ |> Option. value ~default: Mtime.Span. (24 * hour)
717
+ in
718
+ let timer = Clock.Timer. start ~duration in
709
719
(* We need to iterate because it's possible for an empty event set
710
720
to be generated if we peek in-between a Modify and a Delete; we'll
711
721
miss the Delete event and fail to generate the Modify because the
712
722
snapshot can't be taken. *)
713
723
let rec loop () =
714
- let event_from = from_inner __context session subs from from_t deadline in
715
- if event_from.events = [] && Unix. gettimeofday () < deadline then (
724
+ let event_from = from_inner __context session subs from from_t timer in
725
+ if event_from.events = [] && not ( Clock.Timer. has_expired timer) then (
716
726
debug " suppressing empty event.from" ;
717
727
loop ()
718
728
) else
0 commit comments