Skip to content

Commit 1c0d1ec

Browse files
Phil Scottrr0gi
authored andcommitted
Provide Parallel.Services.
1 parent c6f24e4 commit 1c0d1ec

File tree

3 files changed

+89
-7
lines changed

3 files changed

+89
-7
lines changed

dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
logstash
3636
lwt_flag
3737
lwt_util
38+
parallel
3839
web))
3940
))
4041

parallel.ml

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ let reap l =
2929
| Unix_error (ECHILD,_,_) -> true (* exited *)
3030
| exn -> log #warn ~exn "Worker PID %d lost (wait)" pid; true) l
3131

32-
let hard_kill l =
32+
let hard_kill1 pid =
3333
let open Unix in
34+
try
35+
kill pid Sys.sigkill; log #warn "Worker PID %d killed with SIGKILL" pid
36+
with
37+
| Unix_error (ESRCH,_,_) -> ()
38+
| exn -> log #warn ~exn "Worker PID %d (SIGKILL)" pid
39+
40+
let hard_kill l =
3441
let (_,live) = reap l in
35-
live |> List.iter begin fun pid ->
36-
try
37-
kill pid Sys.sigkill; log #warn "Worker PID %d killed with SIGKILL" pid
38-
with
39-
| Unix_error (ESRCH,_,_) -> ()
40-
| exn -> log #warn ~exn "Worker PID %d (SIGKILL)" pid end
42+
List.iter hard_kill1 live
4143

4244
let killall signo pids =
4345
pids |> List.iter begin fun pid ->
@@ -304,3 +306,72 @@ let run_forks' f l =
304306
| [] -> ()
305307
| [x] -> f x
306308
| l -> run_forks f l
309+
310+
module Services = struct
311+
type t = {
312+
mutable pids : int list;
313+
work : int -> unit Lwt.t;
314+
}
315+
316+
let start n work =
317+
let rec start_forked i =
318+
if i >= n then Lwt.return_nil
319+
else (
320+
match Nix.fork () with
321+
| `Child ->
322+
let%lwt () = work i in
323+
exit 0
324+
| `Forked pid ->
325+
log#debug "Starting worker %d with pid %d" i pid;
326+
Lwt.map (fun pids -> pid :: pids) (start_forked (i + 1))
327+
)
328+
in
329+
Lwt.map (fun pids -> { pids; work }) (start_forked 0)
330+
331+
let wait pid =
332+
try%lwt Lwt.map fst (Lwt_unix.waitpid [] pid) with
333+
| Unix.Unix_error (ECHILD, _, _) -> Lwt.return pid
334+
| exn ->
335+
log#warn ~exn "Worker PID %d lost (wait)" pid;
336+
Lwt.return pid
337+
338+
let kill ~timeout pid =
339+
let graceful =
340+
Unix.kill pid Sys.sigterm;
341+
let%lwt _ = wait pid in
342+
log#debug "Worker PID %d killed with SIGTERM" pid;
343+
Lwt.return_unit
344+
in
345+
let ungraceful =
346+
let%lwt () = Lwt_unix.sleep timeout in
347+
hard_kill1 pid;
348+
Lwt.return_unit
349+
in
350+
Lwt.pick [ graceful; ungraceful ]
351+
352+
let rolling_restart ?wait ~timeout workers =
353+
let%lwt pids =
354+
Lwt_list.mapi_s begin fun i pid ->
355+
log#debug "Restarting worker %d with PID %d\n%!" i pid;
356+
let%lwt () = kill ~timeout pid in
357+
Option.may Unix.sleep wait;
358+
match Nix.fork () with
359+
| `Child ->
360+
let%lwt () = workers.work i in
361+
exit 0
362+
| `Forked pid' ->
363+
log#debug "Worker %d started with PID %d\n%!" i pid';
364+
Lwt.return pid'
365+
end
366+
workers.pids
367+
in
368+
workers.pids <- pids;
369+
Lwt.return_unit
370+
371+
let stop ~timeout { pids; _ } =
372+
log#info "Stopping workers";
373+
Lwt_list.iteri_p begin fun i pid ->
374+
log#debug "Stopping worker %d with PID %d" i pid;
375+
kill ~timeout pid
376+
end pids
377+
end

parallel.mli

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,13 @@ val perform : ('a,'b) t -> 'a Enum.t -> ('b -> unit) -> unit
6363
module Forks(T:WorkerT) : Workers
6464
with type task = T.task
6565
and type result = T.result
66+
67+
module Services : sig
68+
type t
69+
70+
val start : int -> (int -> unit Lwt.t) -> t Lwt.t
71+
72+
val rolling_restart : ?wait:int -> timeout:float -> t -> unit Lwt.t
73+
74+
val stop : timeout:float -> t -> unit Lwt.t
75+
end

0 commit comments

Comments
 (0)