Skip to content

Commit 57d0888

Browse files
committed
Add FD passing
1 parent bb47407 commit 57d0888

File tree

15 files changed

+248
-57
lines changed

15 files changed

+248
-57
lines changed

lib_eio/unix/eio_unix.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ module Resource : sig
3232
module type FLOW = sig
3333
include Eio.Net.Pi.STREAM_SOCKET
3434
include Eio.File.Pi.WRITE with type t := t
35+
include Net.Pi.STREAM_SOCKET with type t := t
3536

3637
val fd : t -> Fd.t
3738
end

lib_eio/unix/fd.ml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ let of_unix ~sw ?blocking ?seekable ~close_unix fd =
4646
t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t);
4747
t
4848

49+
let of_unix_list ~sw fds =
50+
match Switch.get_error sw with
51+
| Some e -> List.iter Unix.close fds; raise e
52+
| None -> List.map (of_unix ~sw ~close_unix:true) fds
53+
4954
external eio_is_blocking : Unix.file_descr -> bool = "eio_unix_is_blocking"
5055

5156
let is_blocking t =

lib_eio/unix/fd.mli

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ val of_unix : sw:Switch.t -> ?blocking:bool -> ?seekable:bool -> close_unix:bool
1414
@param seekable The value to be returned by {!is_seekable}. Defaults to probing if needed.
1515
@param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *)
1616

17+
val of_unix_list : sw:Switch.t -> Unix.file_descr list -> t list
18+
(** [of_unix_list ~sw fds] is like [List.map (of_unix ~sw ~close_unix:true) fds],
19+
except that if [sw] is off then it closes all the FDs. *)
20+
1721
(** {2 Using FDs} *)
1822

1923
val use : t -> (Unix.file_descr -> 'a) -> if_closed:(unit -> 'a) -> 'a

lib_eio/unix/net.ml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,33 @@ let sockaddr_of_unix_datagram = function
3030
let host = Ipaddr.of_unix host in
3131
`Udp (host, port)
3232

33+
module Pi = struct
34+
module type STREAM_SOCKET = sig
35+
type t
36+
37+
val send_msg : t -> fds:Fd.t list -> Cstruct.t list -> int
38+
39+
val recv_msg_with_fds : t -> sw:Switch.t -> max_fds:int -> Cstruct.t list -> int * Fd.t list
40+
end
41+
42+
type (_, _, _) Eio.Resource.pi +=
43+
| Stream_socket : ('t, (module STREAM_SOCKET with type t = 't), [> `Platform of [> `Unix] | `Socket | `Stream]) Eio.Resource.pi
44+
end
45+
46+
let send_msg (Eio.Resource.T (t, ops)) ?(fds=[]) bufs =
47+
let module X = (val (Eio.Resource.get ops Pi.Stream_socket)) in
48+
let rec aux ~fds bufs =
49+
let sent = X.send_msg t ~fds bufs in
50+
match Cstruct.shiftv bufs sent with
51+
| [] -> ()
52+
| bufs -> aux bufs ~fds:[]
53+
in
54+
aux ~fds bufs
55+
56+
let recv_msg_with_fds (Eio.Resource.T (t, ops)) ~sw ~max_fds bufs =
57+
let module X = (val (Eio.Resource.get ops Pi.Stream_socket)) in
58+
X.recv_msg_with_fds t ~sw ~max_fds bufs
59+
3360
let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
3461
let options =
3562
match sockaddr with

lib_eio/unix/net.mli

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ type 'a listening_socket = ([> listening_socket_ty] as 'a) r
1313

1414
type t = [`Generic | `Unix] Eio.Net.ty r
1515

16+
(** {2 Passing file descriptors} *)
17+
18+
val send_msg :
19+
[> `Platform of [>`Unix] | `Socket | `Stream] r ->
20+
?fds:Fd.t list ->
21+
Cstruct.t list -> unit
22+
(** Like {!Eio.Flow.write}, but allows passing file descriptors (for Unix-domain sockets). *)
23+
24+
val recv_msg_with_fds :
25+
[> `Platform of [>`Unix] | `Socket | `Stream] r ->
26+
sw:Switch.t ->
27+
max_fds:int ->
28+
Cstruct.t list ->
29+
int * Fd.t list
30+
(** Like {!Eio.Flow.single_read}, but also allows receiving file descriptors (for Unix-domain sockets).
31+
32+
@param max_fds The maximum number of file descriptors to accept (additional ones will be closed). *)
33+
1634
(** {2 Unix address conversions}
1735
1836
Note: OCaml's {!Unix.sockaddr} type considers e.g. TCP port 80 and UDP port
@@ -71,6 +89,19 @@ val socketpair_datagram :
7189
This creates OS-level resources using [socketpair(2)].
7290
Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *)
7391

92+
module Pi : sig
93+
module type STREAM_SOCKET = sig
94+
type t
95+
96+
val send_msg : t -> fds:Fd.t list -> Cstruct.t list -> int
97+
98+
val recv_msg_with_fds : t -> sw:Switch.t -> max_fds:int -> Cstruct.t list -> int * Fd.t list
99+
end
100+
101+
type (_, _, _) Eio.Resource.pi +=
102+
| Stream_socket : ('t, (module STREAM_SOCKET with type t = 't), [> `Platform of [> `Unix] | `Socket | `Stream]) Eio.Resource.pi
103+
end
104+
74105
(** {2 Private API for backends} *)
75106

76107
val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)

lib_eio/unix/resource.ml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ let fd_opt (Eio.Resource.T (t, ops)) =
1111
module type FLOW = sig
1212
include Eio.Net.Pi.STREAM_SOCKET
1313
include Eio.File.Pi.WRITE with type t := t
14+
include Net.Pi.STREAM_SOCKET with type t := t
1415

1516
val fd : t -> Fd.t
1617
end
@@ -20,6 +21,7 @@ let flow_handler (type t tag) (module X : FLOW with type t = t and type tag = ta
2021
Eio.Resource.bindings (Eio.Net.Pi.stream_socket (module X)) @
2122
Eio.Resource.bindings (Eio.File.Pi.rw (module X)) @ [
2223
H (T, X.fd);
24+
H (Net.Pi.Stream_socket, (module X));
2325
]
2426

2527
module type DATAGRAM_SOCKET = sig

lib_eio_linux/eio_linux.ml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ module Flow = struct
185185
| `Receive -> Unix.SHUTDOWN_RECEIVE
186186
| `Send -> Unix.SHUTDOWN_SEND
187187
| `All -> Unix.SHUTDOWN_ALL
188+
189+
let send_msg t ~fds data =
190+
Low_level.send_msg t ~fds data
191+
192+
let recv_msg_with_fds t ~sw ~max_fds data =
193+
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds data in
194+
n, fds
188195
end
189196

190197
let flow_handler = Eio_unix.Resource.flow_handler (module Flow)

lib_eio_linux/low_level.ml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,7 @@ let recv_msg_with_fds ~sw ~max_fds fd buf =
263263
if res < 0 then (
264264
raise @@ Err.wrap (Uring.error_of_errno res) "recv_msg" ""
265265
);
266-
let fds =
267-
Uring.Msghdr.get_fds msghdr
268-
|> List.map (fun fd -> Fd.of_unix ~sw ~close_unix:true fd)
269-
in
266+
let fds = Uring.Msghdr.get_fds msghdr |> Fd.of_unix_list ~sw in
270267
addr, res, fds
271268

272269
let with_chunk ~fallback fn =

lib_eio_linux/tests/fd_passing.md

Lines changed: 0 additions & 42 deletions
This file was deleted.

lib_eio_posix/eio_posix_stubs.c

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,23 +212,51 @@ CAMLprim value caml_eio_posix_spawn(value v_errors, value v_actions) {
212212
CAMLreturn(Val_long(child_pid));
213213
}
214214

215-
CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_dst_opt, value v_bufs) {
216-
CAMLparam2(v_dst_opt, v_bufs);
215+
/* Copy [n_fds] from [v_fds] to [msg]. */
216+
static void fill_fds(struct msghdr *msg, int n_fds, value v_fds) {
217+
if (n_fds > 0) {
218+
int i;
219+
struct cmsghdr *cm;
220+
cm = CMSG_FIRSTHDR(msg);
221+
cm->cmsg_level = SOL_SOCKET;
222+
cm->cmsg_type = SCM_RIGHTS;
223+
cm->cmsg_len = CMSG_LEN(n_fds * sizeof(int));
224+
for (i = 0; i < n_fds; i++) {
225+
int fd = -1;
226+
if (Is_block(v_fds)) {
227+
fd = Int_val(Field(v_fds, 0));
228+
v_fds = Field(v_fds, 1);
229+
}
230+
((int *)CMSG_DATA(cm))[i] = fd;
231+
}
232+
}
233+
}
234+
235+
CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, value v_dst_opt, value v_bufs) {
236+
CAMLparam3(v_fds, v_dst_opt, v_bufs);
217237
int n_bufs = Wosize_val(v_bufs);
238+
int n_fds = Int_val(v_n_fds);
218239
struct iovec iov[n_bufs];
219240
union sock_addr_union dst_addr;
241+
int controllen = n_fds > 0 ? CMSG_SPACE(sizeof(int) * n_fds) : 0;
242+
char cmsg[controllen];
220243
struct msghdr msg = {
221244
.msg_iov = iov,
222245
.msg_iovlen = n_bufs,
246+
.msg_control = n_fds > 0 ? cmsg : NULL,
247+
.msg_controllen = controllen,
223248
};
224249
ssize_t r;
225250

251+
memset(cmsg, 0, controllen);
252+
226253
if (Is_some(v_dst_opt)) {
227254
caml_unix_get_sockaddr(Some_val(v_dst_opt), &dst_addr, &msg.msg_namelen);
228255
msg.msg_name = &dst_addr;
229256
}
230257

231258
fill_iov(iov, v_bufs);
259+
fill_fds(&msg, n_fds, v_fds);
232260

233261
caml_enter_blocking_section();
234262
r = sendmsg(Int_val(v_fd), &msg, 0);
@@ -238,20 +266,49 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_dst_opt, value v_bufs
238266
CAMLreturn(Val_long(r));
239267
}
240268

241-
CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_bufs) {
269+
static value get_msghdr_fds(struct msghdr *msg) {
270+
CAMLparam0();
271+
CAMLlocal2(v_list, v_cons);
272+
struct cmsghdr *cm;
273+
v_list = Val_int(0);
274+
for (cm = CMSG_FIRSTHDR(msg); cm; cm = CMSG_NXTHDR(msg, cm)) {
275+
if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) {
276+
int *fds = (int *) CMSG_DATA(cm);
277+
int n_fds = (cm->cmsg_len - CMSG_LEN(0)) / sizeof(int);
278+
int i;
279+
for (i = n_fds - 1; i >= 0; i--) {
280+
value fd = Val_int(fds[i]);
281+
v_cons = caml_alloc_tuple(2);
282+
Store_field(v_cons, 0, fd);
283+
Store_field(v_cons, 1, v_list);
284+
v_list = v_cons;
285+
}
286+
}
287+
}
288+
CAMLreturn(v_list);
289+
}
290+
291+
CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs) {
242292
CAMLparam1(v_bufs);
243293
CAMLlocal2(v_result, v_addr);
294+
int max_fds = Int_val(v_max_fds);
244295
int n_bufs = Wosize_val(v_bufs);
245296
struct iovec iov[n_bufs];
246297
union sock_addr_union source_addr;
298+
int controllen = max_fds > 0 ? CMSG_SPACE(sizeof(int) * max_fds) : 0;
299+
char cmsg[controllen];
247300
struct msghdr msg = {
248301
.msg_name = &source_addr,
249302
.msg_namelen = sizeof(source_addr),
250303
.msg_iov = iov,
251304
.msg_iovlen = n_bufs,
305+
.msg_control = max_fds > 0 ? cmsg : NULL,
306+
.msg_controllen = controllen,
252307
};
253308
ssize_t r;
254309

310+
memset(cmsg, 0, controllen);
311+
255312
fill_iov(iov, v_bufs);
256313

257314
caml_enter_blocking_section();
@@ -261,9 +318,10 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_bufs) {
261318

262319
v_addr = caml_unix_alloc_sockaddr(&source_addr, msg.msg_namelen, -1);
263320

264-
v_result = caml_alloc_tuple(2);
321+
v_result = caml_alloc_tuple(3);
265322
Store_field(v_result, 0, v_addr);
266323
Store_field(v_result, 1, Val_long(r));
324+
Store_field(v_result, 2, get_msghdr_fds(&msg));
267325

268326
CAMLreturn(v_result);
269327
}

0 commit comments

Comments
 (0)