Skip to content

Commit a74f633

Browse files
authored
Merge pull request #636 from http4s/http-3-client
Http 3 client
2 parents 1218b1d + 7046667 commit a74f633

File tree

9 files changed

+654
-74
lines changed

9 files changed

+654
-74
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ lazy val client = project
9494
"org.http4s" %% "http4s-client" % http4sVersion,
9595
"io.netty" % "netty-codec-http2" % netty,
9696
"io.netty" % "netty-handler-proxy" % netty,
97+
"io.netty" % "netty-codec-http3" % netty,
9798
"org.http4s" %% "http4s-client-testkit" % http4sVersion % Test,
9899
"org.http4s" %% "http4s-ember-server" % http4sVersion % Test,
99100
"org.http4s" %% "http4s-dsl" % http4sVersion % Test,

client/src/main/scala/org/http4s/netty/client/EventLoopHolder.scala

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,40 @@ package org.http4s.netty
1818
package client
1919

2020
import io.netty.bootstrap.Bootstrap
21+
import io.netty.channel.Channel
2122
import io.netty.channel.ChannelOption
2223
import io.netty.channel.MultiThreadIoEventLoopGroup
2324
import io.netty.channel.MultithreadEventLoopGroup
2425
import io.netty.channel.epoll.Epoll
26+
import io.netty.channel.epoll.EpollDatagramChannel
2527
import io.netty.channel.epoll.EpollIoHandler
2628
import io.netty.channel.epoll.EpollSocketChannel
2729
import io.netty.channel.kqueue.KQueue
30+
import io.netty.channel.kqueue.KQueueDatagramChannel
2831
import io.netty.channel.kqueue.KQueueIoHandler
2932
import io.netty.channel.kqueue.KQueueSocketChannel
3033
import io.netty.channel.nio.NioIoHandler
34+
import io.netty.channel.socket.DatagramChannel
3135
import io.netty.channel.socket.SocketChannel
36+
import io.netty.channel.socket.nio.NioDatagramChannel
3237
import io.netty.channel.socket.nio.NioSocketChannel
3338
import io.netty.channel.uring.IoUring
39+
import io.netty.channel.uring.IoUringDatagramChannel
3440
import io.netty.channel.uring.IoUringIoHandler
3541
import io.netty.channel.uring.IoUringSocketChannel
3642
import org.slf4j.Logger
3743
import org.slf4j.LoggerFactory
3844

39-
import scala.annotation.nowarn
4045
import scala.reflect.ClassTag
4146

42-
private[client] final case class EventLoopHolder[A <: SocketChannel](
47+
private[client] final case class EventLoopHolder[A <: Channel](
4348
eventLoop: MultithreadEventLoopGroup)(implicit classTag: ClassTag[A]) {
4449
def runtimeClass: Class[A] = classTag.runtimeClass.asInstanceOf[Class[A]]
4550

4651
def configure(bootstrap: Bootstrap): Bootstrap =
4752
bootstrap
4853
.group(eventLoop)
4954
.channel(runtimeClass)
50-
.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
5155
.option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
5256
}
5357

@@ -66,7 +70,18 @@ private[client] object EventLoopHolder {
6670
throw new IllegalStateException("No native transport available"))
6771
}
6872

69-
@nowarn("cat=deprecation")
73+
def fromUdpTransport(
74+
transport: NettyTransport,
75+
eventLoopThreads: Int): EventLoopHolder[_ <: DatagramChannel] =
76+
transport match {
77+
case NettyTransport.Nio =>
78+
EventLoopHolder[NioDatagramChannel](
79+
new MultiThreadIoEventLoopGroup(eventLoopThreads, NioIoHandler.newFactory()))
80+
case n: NettyTransport.Native =>
81+
selectUdpTransport(n, eventLoopThreads).getOrElse(
82+
throw new IllegalStateException("No native transport available"))
83+
}
84+
7085
def selectTransport(
7186
native: NettyTransport.Native,
7287
eventLoopThreads: Int): Option[EventLoopHolder[_ <: SocketChannel]] =
@@ -86,7 +101,7 @@ private[client] object EventLoopHolder {
86101
Some(
87102
EventLoopHolder[KQueueSocketChannel](
88103
new MultiThreadIoEventLoopGroup(eventLoopThreads, KQueueIoHandler.newFactory())))
89-
case NettyTransport.Auto | NettyTransport.Native =>
104+
case NettyTransport.Auto =>
90105
selectTransport(NettyTransport.IOUring, eventLoopThreads)
91106
.orElse(selectTransport(NettyTransport.Epoll, eventLoopThreads))
92107
.orElse(selectTransport(NettyTransport.KQueue, eventLoopThreads))
@@ -98,4 +113,36 @@ private[client] object EventLoopHolder {
98113
}
99114
case _ => None
100115
}
116+
117+
def selectUdpTransport(
118+
native: NettyTransport.Native,
119+
eventLoopThreads: Int): Option[EventLoopHolder[_ <: DatagramChannel]] =
120+
native match {
121+
case NettyTransport.IOUring if IoUring.isAvailable =>
122+
logger.info("Using IOUring")
123+
Some(
124+
EventLoopHolder[IoUringDatagramChannel](
125+
new MultiThreadIoEventLoopGroup(eventLoopThreads, IoUringIoHandler.newFactory())))
126+
case NettyTransport.Epoll if Epoll.isAvailable =>
127+
logger.info("Using Epoll")
128+
Some(
129+
EventLoopHolder[EpollDatagramChannel](
130+
new MultiThreadIoEventLoopGroup(eventLoopThreads, EpollIoHandler.newFactory())))
131+
case NettyTransport.KQueue if KQueue.isAvailable =>
132+
logger.info("Using KQueue")
133+
Some(
134+
EventLoopHolder[KQueueDatagramChannel](
135+
new MultiThreadIoEventLoopGroup(eventLoopThreads, KQueueIoHandler.newFactory())))
136+
case NettyTransport.Auto =>
137+
selectUdpTransport(NettyTransport.IOUring, eventLoopThreads)
138+
.orElse(selectUdpTransport(NettyTransport.Epoll, eventLoopThreads))
139+
.orElse(selectUdpTransport(NettyTransport.KQueue, eventLoopThreads))
140+
.orElse {
141+
logger.info("Falling back to NIO EventLoopGroup")
142+
Some(
143+
EventLoopHolder[NioDatagramChannel](
144+
new MultiThreadIoEventLoopGroup(eventLoopThreads, NioIoHandler.newFactory())))
145+
}
146+
case _ => None
147+
}
101148
}

client/src/main/scala/org/http4s/netty/client/Http4sHandler.scala

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.netty.handler.codec.http2.Http2SettingsFrame
3232
import io.netty.handler.timeout.IdleState
3333
import io.netty.handler.timeout.IdleStateEvent
3434
import org.http4s._
35+
import org.http4s.client.RequestKey
3536
import org.http4s.netty.client.Http4sHandler.Http2GoAwayError
3637
import org.http4s.netty.client.Http4sHandler.logger
3738

@@ -76,7 +77,7 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
7677

7778
val headersFrame = new DefaultHttp2HeadersFrame(
7879
http2Headers,
79-
modelConversion.notAllowedWithBody.contains(request.method))
80+
NettyModelConversion.notAllowedWithBody.contains(request.method))
8081

8182
def endOfStream: F[Unit] = request.trailerHeaders.flatMap { headers =>
8283
val trail =
@@ -86,20 +87,21 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
8687
HttpConversionUtil.toHttp2Headers(modelConversion.toNettyHeaders(headers), false),
8788
true)
8889
}
89-
F.delay(writeInEventLoop(trail, channel, key))
90+
F.delay(Http4sHandler.writeInEventLoop(trail, channel, key.requestKey)(onException))
9091
}
9192

9293
val body = if (!headersFrame.isEndStream) {
9394
(request.body.chunks
9495
.evalMap(chunk =>
9596
F.delay(
96-
writeInEventLoop(
97+
Http4sHandler.writeInEventLoop(
9798
new DefaultHttp2DataFrame(NettyModelConversion.chunkToBytebuf(chunk), false),
9899
channel,
99-
key))) ++ fs2.Stream.eval(endOfStream)).compile.drain
100+
key.requestKey)(onException))) ++ fs2.Stream.eval(endOfStream)).compile.drain
100101
} else F.unit
101102

102-
F.delay(writeInEventLoop(headersFrame, channel, key)) >> body
103+
F.delay(
104+
Http4sHandler.writeInEventLoop(headersFrame, channel, key.requestKey)(onException)) >> body
103105
}
104106

105107
private[client] def dispatch(
@@ -121,37 +123,12 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
121123
.evalMap { nettyRequest =>
122124
F.async[Resource[F, Response[F]]] { cb =>
123125
promises.enqueue(cb)
124-
writeInEventLoop(nettyRequest, channel, key)
126+
Http4sHandler.writeInEventLoop(nettyRequest, channel, key.requestKey)(onException)
125127
F.pure(Some(F.unit))
126128
}
127129
}
128130
.flatMap(identity)
129131
)
130-
131-
private def writeInEventLoop(event: AnyRef, channel: Channel, key: Key) =
132-
if (channel.eventLoop().inEventLoop) {
133-
safedispatch(event, channel, key)
134-
} else {
135-
channel
136-
.eventLoop()
137-
.execute(() => safedispatch(event, channel, key))
138-
}
139-
140-
private def safedispatch(event: AnyRef, channel: Channel, key: Key): Unit = void {
141-
// always enqueue
142-
if (channel.isActive) {
143-
logger.trace(s"ch $channel: sending ${event} to $key")
144-
// The voidPromise lets us receive failed-write signals from the
145-
// exceptionCaught method.
146-
channel.writeAndFlush(event, channel.voidPromise)
147-
logger.trace(s"ch $channel: after ${event} to $key")
148-
} else {
149-
// make sure we call all enqueued promises
150-
logger.info(s"ch $channel: message dispatched by closed channel to destination $key.")
151-
onException(channel, new ClosedChannelException)
152-
}
153-
}
154-
155132
override def isSharable: Boolean = false
156133

157134
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = void {
@@ -264,4 +241,30 @@ private object Http4sHandler {
264241

265242
final case class Http2GoAwayError(error: Http2Error, lastStreamId: Int)
266243
extends Exception(s"http/2 recieved GoAway with ${error.name} and streamId=${lastStreamId}")
244+
245+
private[client] def writeInEventLoop(event: AnyRef, channel: Channel, key: RequestKey)(
246+
onException: (Channel, Throwable) => Unit) =
247+
if (channel.eventLoop().inEventLoop) {
248+
safedispatch(event, channel, key)(onException)
249+
} else {
250+
channel
251+
.eventLoop()
252+
.execute(() => safedispatch(event, channel, key)(onException))
253+
}
254+
255+
private def safedispatch(event: AnyRef, channel: Channel, key: RequestKey)(
256+
onException: (Channel, Throwable) => Unit): Unit = void {
257+
// always enqueue
258+
if (channel.isActive) {
259+
logger.trace(s"ch $channel: sending ${event} to $key")
260+
// The voidPromise lets us receive failed-write signals from the
261+
// exceptionCaught method.
262+
channel.writeAndFlush(event, channel.voidPromise)
263+
logger.trace(s"ch $channel: after ${event} to $key")
264+
} else {
265+
// make sure we call all enqueued promises
266+
logger.info(s"ch $channel: message dispatched by closed channel to destination $key.")
267+
onException(channel, new ClosedChannelException)
268+
}
269+
}
267270
}

client/src/main/scala/org/http4s/netty/client/NettyClientBuilder.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package client
2020
import cats.effect.Async
2121
import cats.effect.Resource
2222
import io.netty.bootstrap.Bootstrap
23+
import io.netty.channel.ChannelOption
2324
import org.http4s.Headers
2425
import org.http4s.client.Client
2526
import org.http4s.headers.`User-Agent`
@@ -122,6 +123,7 @@ class NettyClientBuilder[F[_]](
122123
Resource.make(F.delay {
123124
val bootstrap = new Bootstrap()
124125
EventLoopHolder.fromTransport(transport, eventLoopThreads).configure(bootstrap)
126+
bootstrap.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
125127
nettyChannelOptions.foldLeft(bootstrap) { case (boot, (opt, value)) =>
126128
boot.option(opt, value)
127129
}

0 commit comments

Comments
 (0)