Skip to content

Commit bb3a1f1

Browse files
PawelStroinskiDerGuteMoritz
authored andcommitted
Extend listen-socket to epoll and kqueue transports
The `existing-channel` option added in #748 works only with NIO transport (Aleph default). When trying to use it with epoll, we would get a Netty error: `incompatible event loop type: io.netty.channel.epoll.EpollEventLoop`. This is not surprising, since the server-channel class to use depends on the transport, as per [`transport-server-channel-class`](https://github.com/clj-commons/aleph/blob/5a0c6976aa42f30ca3786cef64f9ba70cf037ae1/src/aleph/netty.clj#L1327) and the `existing-channel` handling was always creating a NIO server-channel regardless of transport. It’s not difficult to support most of the other transports. We just have to use the server-channel class as per the aforementioned fn. Their constructors in case of epoll and kqueue take an FD number rather than Java channel. There is no suitable constructor for io-uring, so it is not supported. The validation has been updated accordingly. The full integration test is still limited to NIO, because obtaining an FD of a server-socket opened in Java requires deep reflection (the `--add-opens` flag), which would add some complexity (perhaps a dedicated lein profile) just to test this. The validation test checks that the FD route is attempted for the other transports, but it stops at `Bad file descriptor` error. The option is being renamed from `existing-channel` to `listen-socket`, because this seems more fitting when we are accepting not just channels but also FDs. The `existing-channel` was merged very recently and hasn’t yet been released in a maven version.
1 parent 5a0c697 commit bb3a1f1

File tree

6 files changed

+103
-50
lines changed

6 files changed

+103
-50
lines changed

src/aleph/http.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
| --- | --- |
5252
| `port` | The port the server will bind to. If `0`, the server will bind to a random port. |
5353
| `socket-address` | A `java.net.SocketAddress` specifying both the port and interface to bind to. |
54-
| `existing-channel` | A pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. |
54+
| `listen-socket` | A pre-bound server-socket as a `java.nio.channels.ServerSocketChannel` (NIO transport) or a `long` file descriptor (epoll and kqueue transports) for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. |
5555
| `bootstrap-transform` | A function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it. |
5656
| `http-versions` | An optional vector of allowable HTTP versions to negotiate via ALPN, in preference order. Defaults to `[:http1]`. |
5757
| `ssl-context` | An `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details) if an SSL connection is desired. When passing an `io.netty.handler.ssl.SslContext` object, it must have an ALPN config matching the `http-versions` option (see `aleph.netty/ssl-server-context` and `aleph.netty/application-protocol-config`). If only HTTP/1.1 is desired, ALPN config is optional.

src/aleph/http/server.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@
737737
[handler
738738
{:keys [port
739739
socket-address
740-
existing-channel
740+
listen-socket
741741
executor
742742
http-versions
743743
ssl-context
@@ -789,8 +789,8 @@
789789
:bootstrap-transform bootstrap-transform
790790
:socket-address (cond
791791
socket-address socket-address
792-
(nil? existing-channel) (InetSocketAddress. port))
793-
:existing-channel existing-channel
792+
(nil? listen-socket) (InetSocketAddress. port))
793+
:listen-socket listen-socket
794794
:on-close (when (and shutdown-executor?
795795
(or (instance? ExecutorService executor)
796796
(instance? ExecutorService continue-executor)))

src/aleph/netty.clj

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,39 @@
13311331
:io-uring IOUringServerSocketChannel
13321332
:nio NioServerSocketChannel))
13331333

1334+
(defn- wrapping-channel-factory
1335+
^ChannelFactory [listen-socket transport]
1336+
(proxy [ChannelFactory] []
1337+
(newChannel []
1338+
(case transport
1339+
:epoll (EpollServerSocketChannel. ^long listen-socket)
1340+
:kqueue (KQueueServerSocketChannel. ^long listen-socket)
1341+
:nio (NioServerSocketChannel. ^java.nio.channels.ServerSocketChannel listen-socket)))))
1342+
1343+
(defn- validate-listen-socket
1344+
[listen-socket transport]
1345+
(when (some? listen-socket)
1346+
(case transport
1347+
(:epoll :kqueue) (when-not (int? listen-socket)
1348+
(throw (IllegalArgumentException.
1349+
(str "With epoll and kqueue transports, only a numeric file descriptor "
1350+
"is supported as listen-socket, but received: "
1351+
(pr-str listen-socket)))))
1352+
1353+
:nio (cond (not (instance? java.nio.channels.ServerSocketChannel listen-socket))
1354+
(throw (IllegalArgumentException.
1355+
(str "With NIO transport, only a java.nio.channels.ServerSocketChannel "
1356+
"is supported as listen-socket, but received: "
1357+
(pr-str listen-socket))))
1358+
1359+
(nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel listen-socket))
1360+
(throw (IllegalArgumentException.
1361+
(str "The listen-socket is not bound: " (pr-str listen-socket)))))
1362+
1363+
(throw (IllegalArgumentException.
1364+
(str "The listen-socket option is not supported with this transport: "
1365+
(pr-str transport)))))))
1366+
13341367
(defn ^:no-doc convert-address-types [address-types]
13351368
(case address-types
13361369
:ipv4-only ResolvedAddressTypes/IPV4_ONLY
@@ -1668,22 +1701,6 @@
16681701
(.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group))
16691702
(pipeline-builder pipeline)))
16701703

1671-
(defn- validate-existing-channel
1672-
[existing-channel]
1673-
(when (some? existing-channel)
1674-
(when-not (instance? java.nio.channels.ServerSocketChannel existing-channel)
1675-
(throw (IllegalArgumentException.
1676-
(str "The existing-channel type is not supported: " (pr-str existing-channel)))))
1677-
(when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel))
1678-
(throw (IllegalArgumentException.
1679-
(str "The existing-channel is not bound: " (pr-str existing-channel)))))))
1680-
1681-
(defn- wrapping-channel-factory
1682-
^ChannelFactory [^java.nio.channels.ServerSocketChannel channel]
1683-
(proxy [ChannelFactory] []
1684-
(newChannel []
1685-
(NioServerSocketChannel. channel))))
1686-
16871704
(defn ^:no-doc start-server
16881705
([pipeline-builder
16891706
ssl-context
@@ -1702,13 +1719,13 @@
17021719
bootstrap-transform
17031720
on-close
17041721
^SocketAddress socket-address
1705-
existing-channel
1722+
listen-socket
17061723
transport
17071724
shutdown-timeout]
17081725
:or {shutdown-timeout default-shutdown-timeout}
17091726
:as opts}]
17101727
(ensure-transport-available! transport)
1711-
(validate-existing-channel existing-channel)
1728+
(validate-listen-socket listen-socket transport)
17121729
(let [num-cores (.availableProcessors (Runtime/getRuntime))
17131730
num-threads (* 2 num-cores)
17141731
thread-factory (enumerating-thread-factory "aleph-server-pool" false)
@@ -1734,16 +1751,16 @@
17341751
(.option ChannelOption/SO_REUSEADDR true)
17351752
(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
17361753
(.group group)
1737-
(cond-> (nil? existing-channel) (.channel channel-class))
1738-
(cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel)))
1754+
(cond-> (nil? listen-socket) (.channel channel-class))
1755+
(cond-> (some? listen-socket) (.channelFactory (wrapping-channel-factory listen-socket transport)))
17391756
;;TODO: add a server (.handler) call to the bootstrap, for logging or something
17401757
(.childHandler (pipeline-initializer pipeline-builder))
17411758
(.childOption ChannelOption/SO_REUSEADDR true)
17421759
(.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
17431760
bootstrap-transform)
17441761

17451762
^ServerSocketChannel
1746-
ch (-> (if (nil? existing-channel)
1763+
ch (-> (if (nil? listen-socket)
17471764
(.bind b socket-address)
17481765
(.register b))
17491766
.sync
@@ -1755,7 +1772,7 @@
17551772
(when (compare-and-set! closed? false true)
17561773
;; This is the three step closing sequence:
17571774
;; 1. Stop listening to incoming requests
1758-
(if (nil? existing-channel)
1775+
(if (nil? listen-socket)
17591776
(-> ch .close .sync)
17601777
(-> ch .deregister .sync))
17611778
(-> (if (pos? shutdown-timeout)
@@ -1785,7 +1802,7 @@
17851802
(port [_]
17861803
(-> ch .localAddress .getPort))
17871804
(wait-for-close [_]
1788-
(when (nil? existing-channel)
1805+
(when (nil? listen-socket)
17891806
(-> ch .closeFuture .await))
17901807
(-> group .terminationFuture .await)
17911808
nil)))

src/aleph/tcp.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@
7474
| --- | ---
7575
| `port` | the port the server will bind to. If `0`, the server will bind to a random port.
7676
| `socket-address` | a `java.net.SocketAddress` specifying both the port and interface to bind to.
77-
| `existing-channel` | a pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. |
77+
| `listen-socket` | a pre-bound server-socket as a `java.nio.channels.ServerSocketChannel` (NIO transport) or a `long` file descriptor (epoll and kqueue transports) for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`.
7878
| `ssl-context` | an `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details). If given, the server will only accept SSL connections and call the handler once the SSL session has been successfully established. If a self-signed certificate is all that's required, `(aleph.netty/self-signed-ssl-context)` will suffice.
7979
| `bootstrap-transform` | a function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it.
8080
| `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
8181
| `raw-stream?` | if true, messages from the stream will be `io.netty.buffer.ByteBuf` objects rather than byte-arrays. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users.
8282
| `shutdown-timeout` | interval in seconds within which in-flight requests must be processed, defaults to 15 seconds. A value of 0 bypasses waiting entirely.
8383
| `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`)."
8484
[handler
85-
{:keys [port socket-address existing-channel ssl-context bootstrap-transform pipeline-transform epoll?
85+
{:keys [port socket-address listen-socket ssl-context bootstrap-transform pipeline-transform epoll?
8686
shutdown-timeout transport]
8787
:or {bootstrap-transform identity
8888
pipeline-transform identity
@@ -104,8 +104,8 @@
104104
:bootstrap-transform bootstrap-transform
105105
:socket-address (cond
106106
socket-address socket-address
107-
(nil? existing-channel) (InetSocketAddress. port))
108-
:existing-channel existing-channel
107+
(nil? listen-socket) (InetSocketAddress. port))
108+
:listen-socket listen-socket
109109
:transport (netty/determine-transport transport epoll?)
110110
:shutdown-timeout shutdown-timeout})))
111111

test/aleph/http_test.clj

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030
IPool
3131
Pool)
3232
(io.netty.channel
33-
ChannelHandlerContext
34-
ChannelOutboundHandlerAdapter
35-
ChannelPipeline
36-
ChannelPromise)
33+
ChannelException
34+
ChannelHandlerContext
35+
ChannelOutboundHandlerAdapter
36+
ChannelPipeline
37+
ChannelPromise)
3738
(io.netty.handler.codec TooLongFrameException)
3839
(io.netty.handler.codec.compression
3940
CompressionOptions
@@ -1774,15 +1775,50 @@
17741775
(catch Exception e
17751776
(is (instance? ProxyConnectException e)))))))))
17761777

1777-
(deftest test-existing-channel
1778-
(testing "validation"
1779-
(is (thrown-with-msg? Exception #"existing-channel"
1780-
(with-http-servers basic-handler {:existing-channel "a string"})))
1781-
(with-open [unbound-channel (ServerSocketChannel/open)]
1782-
(is (thrown-with-msg? Exception #"existing-channel"
1783-
(with-http-servers basic-handler {:existing-channel unbound-channel})))))
1784-
1785-
(testing "with a bound server-socket channel"
1778+
(deftest test-listen-socket
1779+
(testing "validation when transport is"
1780+
(let [file-descriptor Integer/MAX_VALUE]
1781+
(testing "nio"
1782+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1783+
(with-http-servers basic-handler {:listen-socket file-descriptor})))
1784+
(with-open [unbound-channel (ServerSocketChannel/open)]
1785+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1786+
(with-http-servers basic-handler {:listen-socket unbound-channel})))))
1787+
1788+
(testing "epoll"
1789+
(when (netty/epoll-available?)
1790+
(testing "we should pass our validation and get an error due to the fake file descriptor"
1791+
(is (thrown-with-msg? ChannelException #"Bad file descriptor"
1792+
(with-http-servers basic-handler {:transport :epoll
1793+
:listen-socket file-descriptor}))))
1794+
(with-open [channel (bound-channel port)]
1795+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1796+
(with-http-servers basic-handler {:transport :epoll
1797+
:listen-socket channel}))))))
1798+
1799+
(testing "kqueue"
1800+
(when (netty/kqueue-available?)
1801+
(testing "we should pass our validation and get an error due to the fake file descriptor"
1802+
(is (thrown-with-msg? ChannelException #"Bad file descriptor"
1803+
(with-http-servers basic-handler {:transport :kqueue
1804+
:listen-socket file-descriptor}))))
1805+
(with-open [channel (bound-channel port)]
1806+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1807+
(with-http-servers basic-handler {:transport :kqueue
1808+
:listen-socket channel}))))))
1809+
1810+
(testing "io-uring"
1811+
(when (netty/io-uring-available?)
1812+
(testing "which is not supported"
1813+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1814+
(with-http-servers basic-handler {:transport :io-uring
1815+
:listen-socket file-descriptor}))))
1816+
(with-open [channel (bound-channel port)]
1817+
(is (thrown-with-msg? IllegalArgumentException #"listen-socket"
1818+
(with-http-servers basic-handler {:transport :io-uring
1819+
:listen-socket channel}))))))))
1820+
1821+
(testing "with nio transport and a bound server-socket channel"
17861822
(testing "- unknown to the server"
17871823
(with-open [_ (bound-channel port)]
17881824
;; The port is already bound by a server-socket channel, but
@@ -1796,16 +1832,16 @@
17961832
;; This time, we shouldn't get a BindException, because we are
17971833
;; telling Aleph to use an existing server-socket channel,
17981834
;; which should be already bound, so Aleph doesn't try to bind.
1799-
(with-http-servers basic-handler {:existing-channel channel}
1835+
(with-http-servers basic-handler {:listen-socket channel}
18001836
(is (= 200 (:status @(http-get "/string")))))
18011837
;; The existing channel should not be closed on a server shutdown,
18021838
;; because that channel is not owned by the server.
18031839
(is (.isOpen channel)))))
18041840

1805-
(testing "the :port option is not required when :existing-channel is passed"
1841+
(testing "the :port option is not required when :listen-socket is passed"
18061842
(with-redefs [http-server-options (dissoc http-server-options :port)]
18071843
(with-open [channel (bound-channel port)]
1808-
(with-http1-server basic-handler {:existing-channel channel}
1844+
(with-http1-server basic-handler {:listen-socket channel}
18091845
(is (= 200 (:status @(http-get "/string")))))))))
18101846

18111847
(deftest ^:leak test-leak-in-raw-stream-handler

test/aleph/tcp_test.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@
5656
(catch Exception _
5757
(is (not (netty/io-uring-available?)))))))
5858

59-
(deftest test-existing-channel
60-
(testing "the :port option is not required when :existing-channel is passed"
59+
(deftest test-listen-socket
60+
(testing "the :port option is not required when :listen-socket is passed"
6161
(let [port 8083]
6262
(with-open [channel (bound-channel port)]
63-
(with-server (tcp/start-server echo-handler {:existing-channel channel :shutdown-timeout 0})
63+
(with-server (tcp/start-server echo-handler {:listen-socket channel :shutdown-timeout 0})
6464
(let [c @(tcp/client {:host "localhost" :port port})]
6565
(s/put! c "foo")
6666
(is (= "foo" (bs/to-string @(s/take! c))))))))))

0 commit comments

Comments
 (0)