Skip to content

Commit 2ec7b01

Browse files
committed
hostnet: switch Unix domain socket I/O to direct style with pthreads
Two threads per connection: this assumes a relatively low number of connections. Signed-off-by: David Scott <[email protected]>
1 parent 0f8ca93 commit 2ec7b01

File tree

6 files changed

+439
-1
lines changed

6 files changed

+439
-1
lines changed

src/hostnet/constants.ml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
(* IP datagram (65535) - IP header(20) - UDP header(8) *)
22
let max_udp_length = 65507
3+
4+
let mib = 1024 * 1024

src/hostnet/forwards.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ module Tcp = struct
274274
end
275275

276276
module Unix = struct
277-
module FLOW = Host.Sockets.Stream.Unix
277+
module FLOW = Host_unix_stream
278278
module Remote = Read_some (FLOW)
279279
module Handshake = Handshake (Remote)
280280

src/hostnet/host_unix_stream.ml

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
let src =
2+
let src =
3+
Logs.Src.create "host_unix_stream"
4+
~doc:"Host Unix SOCK_STREAM implementation"
5+
in
6+
Logs.Src.set_level src (Some Logs.Info);
7+
src
8+
9+
module Log = (val Logs.src_log src : Logs.LOG)
10+
11+
module Sock_Stream = UnixThreadsIO.Make (struct
12+
let send = Writev.writev
13+
let recv = Read.read
14+
end)
15+
16+
type flow = Sock_Stream.flow
17+
type error = [ `Closed | `Msg of string ]
18+
19+
let pp_error ppf = function
20+
| `Closed -> Fmt.string ppf "Closed"
21+
| `Msg m -> Fmt.string ppf m
22+
23+
type write_error = error
24+
25+
let pp_write_error = pp_error
26+
27+
open Lwt.Infix
28+
29+
let read t =
30+
Sock_Stream.recv t >>= fun buf ->
31+
let n = Cstruct.length buf in
32+
if n = 0 then Lwt.return @@ Ok `Eof else Lwt.return @@ Ok (`Data buf)
33+
34+
let write t buf =
35+
Lwt.catch
36+
(fun () -> Sock_Stream.send t [ buf ] >>= fun () -> Lwt.return @@ Ok ())
37+
(fun e -> Lwt.return (Error (`Msg (Printexc.to_string e))))
38+
39+
let writev t bufs =
40+
Lwt.catch
41+
(fun () -> Sock_Stream.send t bufs >>= fun () -> Lwt.return @@ Ok ())
42+
(fun e -> Lwt.return (Error (`Msg (Printexc.to_string e))))
43+
44+
let close t =
45+
Sock_Stream.close t;
46+
Lwt.return_unit
47+
48+
type address = string
49+
50+
(* For low-frequency tasks like binding a listening socket, we fork a pthread for one request. *)
51+
let run_in_pthread f =
52+
let t, u = Lwt.task () in
53+
let (_ : Thread.t) =
54+
Thread.create
55+
(fun () ->
56+
try
57+
let result = f () in
58+
Luv_lwt.in_lwt_async (fun () -> Lwt.wakeup_later u result)
59+
with e -> Luv_lwt.in_lwt_async (fun () -> Lwt.wakeup_exn u e))
60+
()
61+
in
62+
t
63+
64+
let connect ?read_buffer_size:_ address =
65+
let open Lwt.Infix in
66+
run_in_pthread (fun () ->
67+
let s = Unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 in
68+
try
69+
Unix.connect s (Unix.ADDR_UNIX address);
70+
Ok s
71+
with e ->
72+
Unix.close s;
73+
Error e)
74+
>>= function
75+
| Ok fd -> Lwt.return (Ok (Sock_Stream.of_bound_fd fd))
76+
| Error e -> Lwt.return (Error (`Msg (Printexc.to_string e)))
77+
78+
let shutdown_write t = Sock_Stream.shutdown_write t; Lwt.return_unit
79+
let shutdown_read t = Sock_Stream.shutdown_read t; Lwt.return_unit

src/hostnet/host_unix_stream.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include Sig.FLOW_CLIENT with type address = string

0 commit comments

Comments
 (0)