Skip to content

Commit b3dca9d

Browse files
committed
Make multififo threads periodically steal to balance load
1 parent 3801d9e commit b3dca9d

File tree

3 files changed

+47
-16
lines changed

3 files changed

+47
-16
lines changed

lib/picos_aux.mpmcq/picos_aux_mpmcq.ml

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,25 @@ let rec push_head t value backoff =
186186
end
187187
end
188188

189-
let rec length t =
190-
let head = Atomic.get t.head in
191-
let tail = Atomic.fenceless_get t.tail in
192-
if head != Atomic.get t.head then length t
193-
else
194-
let head_at =
195-
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
196-
in
197-
let tail_at =
198-
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
199-
in
200-
tail_at - head_at + 1
189+
let[@inline] length t =
190+
let t_head = t.head in
191+
let t_tail = t.tail in
192+
let head = ref (Atomic.get t_head) in
193+
let tail = ref (Atomic.fenceless_get t_tail) in
194+
while
195+
head := Atomic.get t_head;
196+
tail := Atomic.fenceless_get t_tail;
197+
!head != Atomic.get t_head
198+
do
199+
()
200+
done;
201+
let head_at =
202+
match !head with H (Cons r) -> r.counter | H (Head r) -> r.counter
203+
in
204+
let tail_at =
205+
match !tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
206+
in
207+
tail_at - head_at + 1
201208

202209
let[@inline] is_empty t = length t == 0
203210
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)

lib/picos_mux.multififo/picos_mux_multififo.ml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ and _ tdt =
4545
mutable num_stopped : int;
4646
mutable fiber : Fiber.Maybe.t;
4747
mutable remaining_quota : int;
48+
mutable countdown_to_steal : int;
4849
}
4950
-> [> `Per_thread ] tdt
5051

@@ -116,7 +117,27 @@ let exec ready (Per_thread p : per_thread) t =
116117
| Resume (fiber, k) -> Fiber.resume fiber k
117118

118119
let rec next (Per_thread p as pt : per_thread) =
119-
match Mpmcq.pop_exn p.ready with
120+
let ready =
121+
let c = p.countdown_to_steal in
122+
if 0 < c then begin
123+
p.countdown_to_steal <- c - 1;
124+
p.ready
125+
end
126+
else begin
127+
let t = p.context in
128+
if 1 < t.threads_num then begin
129+
let (Per_thread v) = get_thread t (Random.int t.threads_num) in
130+
p.countdown_to_steal <- Mpmcq.length p.ready + 1;
131+
if p.countdown_to_steal <= Mpmcq.length v.ready then v.ready
132+
else p.ready
133+
end
134+
else begin
135+
p.countdown_to_steal <- 1_000;
136+
p.ready
137+
end
138+
end
139+
in
140+
match Mpmcq.pop_exn ready with
120141
| ready ->
121142
let t = p.context in
122143
relaxed_wakeup t ~known_not_empty:false p.ready;
@@ -195,6 +216,7 @@ let per_thread context =
195216
num_stopped = 0;
196217
fiber = Fiber.Maybe.nothing;
197218
remaining_quota = 0;
219+
countdown_to_steal = 1;
198220
}
199221
in
200222
p.resume <-

lib/picos_mux.multififo/picos_mux_multififo.mli

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
44
This scheduler uses a queue per thread to implement a mostly FIFO scheduler.
55
If a thread runs out of fibers to run, it will try to take a fiber from the
6-
queues of other threads, which means that fibers can move from one thread to
7-
another. This scheduler also gives priority to fibers woken up due to being
8-
canceled.
6+
queues of other threads. Furthermore, threads periodically consider taking
7+
fibers from other threads to balance the number of fibers per thread. All of
8+
this means that this scheduler should act relatively fairly and work well
9+
for concurrent workloads and workloads where fairness matters. This
10+
scheduler also gives priority to fibers woken up due to being canceled.
911
1012
🐌 Due to mostly FIFO scheduling this scheduler performs poorly on highly
1113
parallel workloads.

0 commit comments

Comments
 (0)