Skip to content

Commit 1ac3f07

Browse files
committed
Update feature/perf from master
Signed-off-by: Edwin Török <[email protected]>
2 parents ce5abab + 4f3f08f commit 1ac3f07

File tree

7 files changed

+212
-58
lines changed

7 files changed

+212
-58
lines changed

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(public_name xapi-stdext-threads)
33
(name xapi_stdext_threads)
4-
(modules :standard \ ipq scheduler threadext_test ipq_test)
4+
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
55
(libraries
66
mtime
77
mtime.clock.os
@@ -22,8 +22,8 @@
2222
)
2323

2424
(tests
25-
(names threadext_test ipq_test)
25+
(names threadext_test ipq_test scheduler_test)
2626
(package xapi-stdext-threads)
27-
(modules threadext_test ipq_test)
27+
(modules threadext_test ipq_test scheduler_test)
2828
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
2929
)

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ let delay = Delay.make ()
2727

2828
let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}
2929

30+
let (pending_event : t option ref) = ref None
31+
3032
let (queue : t Ipq.t) = Ipq.create 50 queue_default
3133

3234
let lock = Mutex.create ()
@@ -48,66 +50,84 @@ module Clock = struct
4850
Mtime.min_stamp
4951
end
5052

51-
let add_to_queue ?(signal = true) name ty start newfunc =
52-
with_lock lock (fun () ->
53-
let ( ++ ) = Clock.add_span in
54-
Ipq.add queue
55-
{
56-
Ipq.ev= {func= newfunc; ty; name}
57-
; Ipq.time= Mtime_clock.now () ++ start
58-
}
59-
) ;
60-
if signal then Delay.signal delay
53+
let add_to_queue name ty start newfunc =
54+
let ( ++ ) = Clock.add_span in
55+
let item =
56+
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
57+
in
58+
with_lock lock (fun () -> Ipq.add queue item) ;
59+
Delay.signal delay
6160

6261
let remove_from_queue name =
63-
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
64-
if index > -1 then
65-
Ipq.remove queue index
66-
67-
let wait_next sleep =
68-
try ignore (Delay.wait delay sleep)
69-
with e ->
70-
let detailed_msg =
71-
match e with
72-
| Unix.Unix_error (code, _, _) ->
73-
Unix.error_message code
74-
| _ ->
75-
"unknown error"
76-
in
77-
error
78-
"Could not schedule interruptable delay (%s). Falling back to normal \
79-
delay. New events may be missed."
80-
detailed_msg ;
81-
Thread.delay sleep
62+
with_lock lock @@ fun () ->
63+
match !pending_event with
64+
| Some ev when ev.name = name ->
65+
pending_event := None
66+
| Some _ | None ->
67+
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
68+
if index > -1 then
69+
Ipq.remove queue index
70+
71+
let add_periodic_pending () =
72+
with_lock lock @@ fun () ->
73+
match !pending_event with
74+
| Some ({ty= Periodic timer; _} as ev) ->
75+
let ( ++ ) = Clock.add_span in
76+
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
77+
Ipq.add queue item ;
78+
pending_event := None
79+
| Some {ty= OneShot; _} ->
80+
pending_event := None
81+
| None ->
82+
()
8283

8384
let loop () =
8485
debug "%s started" __MODULE__ ;
8586
try
8687
while true do
87-
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
88-
if empty then
89-
wait_next 10.0
90-
(* Doesn't happen often - the queue isn't usually empty *)
91-
else
92-
let next = with_lock lock (fun () -> Ipq.maximum queue) in
93-
let now = Mtime_clock.now () in
94-
if next.Ipq.time < now then (
95-
let todo =
96-
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
97-
in
88+
let now = Mtime_clock.now () in
89+
let deadline, item =
90+
with_lock lock @@ fun () ->
91+
(* empty: wait till we get something *)
92+
if Ipq.is_empty queue then
93+
(Clock.add_span now 10.0, None)
94+
else
95+
let next = Ipq.maximum queue in
96+
if Mtime.is_later next.Ipq.time ~than:now then
97+
(* not expired: wait till time or interrupted *)
98+
(next.Ipq.time, None)
99+
else (
100+
(* remove expired item *)
101+
Ipq.pop_maximum queue |> ignore ;
102+
(* save periodic to be scheduled again *)
103+
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
104+
(now, Some next.Ipq.ev)
105+
)
106+
in
107+
match item with
108+
| Some todo ->
98109
(try todo.func () with _ -> ()) ;
99-
match todo.ty with
100-
| OneShot ->
101-
()
102-
| Periodic timer ->
103-
add_to_queue ~signal:false todo.name todo.ty timer todo.func
104-
) else (* Sleep until next event. *)
110+
add_periodic_pending ()
111+
| None -> (
112+
(* Sleep until next event. *)
105113
let sleep =
106-
Mtime.(span next.Ipq.time now)
107-
|> Mtime.Span.(add ms)
108-
|> Clock.span_to_s
114+
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
109115
in
110-
wait_next sleep
116+
try ignore (Delay.wait delay sleep)
117+
with e ->
118+
let detailed_msg =
119+
match e with
120+
| Unix.Unix_error (code, _, _) ->
121+
Unix.error_message code
122+
| _ ->
123+
"unknown error"
124+
in
125+
error
126+
"Could not schedule interruptable delay (%s). Falling back to \
127+
normal delay. New events may be missed."
128+
detailed_msg ;
129+
Thread.delay sleep
130+
)
111131
done
112132
with _ ->
113133
error

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ type func_ty =
1818
| OneShot (** Fire just once *)
1919
| Periodic of float (** Fire periodically with a given period in seconds *)
2020

21-
val add_to_queue :
22-
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
21+
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
2322
(** Start a new timer. *)
2423

2524
val remove_from_queue : string -> unit
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
(*
2+
* Copyright (C) 2024 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
module Scheduler = Xapi_stdext_threads_scheduler.Scheduler
16+
17+
let started = Atomic.make false
18+
19+
let start_schedule () =
20+
if not (Atomic.exchange started true) then
21+
Thread.create Scheduler.loop () |> ignore
22+
23+
let send event data = Event.(send event data |> sync)
24+
25+
let receive event = Event.(receive event |> sync)
26+
27+
let elapsed_ms cnt =
28+
let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in
29+
Int64.(div elapsed_ns 1000000L |> to_int)
30+
31+
let is_less = Alcotest.(testable (pp int)) Stdlib.( > )
32+
33+
let test_single () =
34+
let finished = Event.new_channel () in
35+
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
36+
send finished true
37+
) ;
38+
start_schedule () ;
39+
Alcotest.(check bool) "result" true (receive finished)
40+
41+
let test_remove_self () =
42+
let which = Event.new_channel () in
43+
Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () ->
44+
(* this should remove the periodic scheduling *)
45+
Scheduler.remove_from_queue "self" ;
46+
(* add an operation to stop the test *)
47+
Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () ->
48+
send which "stop"
49+
) ;
50+
send which "self"
51+
) ;
52+
start_schedule () ;
53+
let cnt = Mtime_clock.counter () in
54+
Alcotest.(check string) "same event name" "self" (receive which) ;
55+
Alcotest.(check string) "same event name" "stop" (receive which) ;
56+
let elapsed_ms = elapsed_ms cnt in
57+
Alcotest.check is_less "small time" 300 elapsed_ms
58+
59+
let test_empty () =
60+
let finished = Event.new_channel () in
61+
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
62+
send finished true
63+
) ;
64+
start_schedule () ;
65+
Alcotest.(check bool) "finished" true (receive finished) ;
66+
(* wait loop to go to wait with no work to do *)
67+
Thread.delay 0.1 ;
68+
Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () ->
69+
send finished true
70+
) ;
71+
let cnt = Mtime_clock.counter () in
72+
Alcotest.(check bool) "finished" true (receive finished) ;
73+
let elapsed_ms = elapsed_ms cnt in
74+
Alcotest.check is_less "small time" 100 elapsed_ms
75+
76+
let test_wakeup () =
77+
let which = Event.new_channel () in
78+
(* schedule a long event *)
79+
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
80+
send which "long"
81+
) ;
82+
start_schedule () ;
83+
(* wait loop to go to wait with no work to do *)
84+
Thread.delay 0.1 ;
85+
let cnt = Mtime_clock.counter () in
86+
(* schedule a quick event, should wake up the loop *)
87+
Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () ->
88+
send which "quick"
89+
) ;
90+
Alcotest.(check string) "same event name" "quick" (receive which) ;
91+
Scheduler.remove_from_queue "long" ;
92+
let elapsed_ms = elapsed_ms cnt in
93+
Alcotest.check is_less "small time" 150 elapsed_ms
94+
95+
let tests =
96+
[
97+
("test_single", `Quick, test_single)
98+
; ("test_remove_self", `Quick, test_remove_self)
99+
; ("test_empty", `Quick, test_empty)
100+
; ("test_wakeup", `Quick, test_wakeup)
101+
]
102+
103+
let () = Alcotest.run "Scheduler" [("generic", tests)]

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli

Whitespace-only changes.

ocaml/xapi/storage_smapiv1_wrapper.ml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,9 @@ functor
453453
List.fold_left perform_one vdi_t ops
454454

455455
let perform_nolock context ~dbg ~dp ~sr ~vdi ~vm this_op =
456+
debug "%s dp=%s, sr=%s, vdi=%s, vm=%s, op=%s" __FUNCTION__ dp
457+
(s_of_sr sr) (s_of_vdi vdi) (s_of_vm vm)
458+
(Vdi_automaton.string_of_op this_op) ;
456459
match Host.find sr !Host.host with
457460
| None ->
458461
raise (Storage_error (Sr_not_attached (s_of_sr sr)))
@@ -473,6 +476,15 @@ functor
473476
superstate to superstate'. These may fail: if so we revert the
474477
datapath+VDI state to the most appropriate value. *)
475478
let ops = Vdi_automaton.( - ) superstate superstate' in
479+
debug "%s %s -> %s: %s" __FUNCTION__
480+
(Vdi_automaton.string_of_state superstate)
481+
(Vdi_automaton.string_of_state superstate')
482+
(String.concat ", "
483+
(List.map
484+
(fun (op, _) -> Vdi_automaton.string_of_op op)
485+
ops
486+
)
487+
) ;
476488
side_effects context dbg dp sr sr_t vdi vdi_t vm ops
477489
with e ->
478490
let e =
@@ -529,7 +541,8 @@ functor
529541
)
530542
with e ->
531543
if not allow_leak then (
532-
ignore (Vdi.add_leaked dp vdi_t) ;
544+
Sr.add_or_replace vdi (Vdi.add_leaked dp vdi_t) sr_t ;
545+
Everything.to_file !host_state_path (Everything.make ()) ;
533546
raise e
534547
) else (
535548
(* allow_leak means we can forget this dp *)

ocaml/xapi/xapi_sr.ml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,15 +778,34 @@ let scan ~__context ~sr =
778778
Db.VDI.get_records_where ~__context
779779
~expr:(Eq (Field "SR", Literal sr'))
780780
in
781+
(* It is sufficient to just compare the refs in two db_vdis, as this
782+
is what update_vdis uses to determine what to delete *)
783+
let vdis_ref_equal db_vdi1 db_vdi2 =
784+
Listext.List.set_difference (List.map fst db_vdi1)
785+
(List.map fst db_vdi2)
786+
= []
787+
in
781788
let db_vdis_before = find_vdis () in
782789
let vs, sr_info =
783790
C.SR.scan2 (Ref.string_of task)
784791
(Storage_interface.Sr.of_string sr_uuid)
785792
in
786793
let db_vdis_after = find_vdis () in
787-
if limit > 0 && db_vdis_after <> db_vdis_before then
794+
if limit > 0 && not (vdis_ref_equal db_vdis_before db_vdis_after)
795+
then (
796+
debug
797+
"%s detected db change while scanning, before scan vdis %s, \
798+
after scan vdis %s, retry limit left %d"
799+
__FUNCTION__
800+
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_before
801+
|> String.concat ","
802+
)
803+
(List.map (fun (_, v) -> v.vDI_uuid) db_vdis_after
804+
|> String.concat ","
805+
)
806+
limit ;
788807
(scan_rec [@tailcall]) (limit - 1)
789-
else if limit = 0 then
808+
) else if limit = 0 then
790809
raise
791810
(Api_errors.Server_error
792811
(Api_errors.internal_error, ["SR.scan retry limit exceeded"])

0 commit comments

Comments
 (0)