Skip to content

Commit 3886728

Browse files
authored
Use the current Ox scope when running a Netty sync-server to create threads (forks) (#4752)
Closes #4747
1 parent 4581b26 commit 3886728

File tree

4 files changed

+42
-32
lines changed

4 files changed

+42
-32
lines changed

perf-tests/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ perfTests/runMain sttp.tapir.perf.apis.ServerRunner http4s.TapirMulti
1212
Run it without server name to see a list of all available names.
1313
Exception: If you're testing `NettySyncServer` (tapir-server-netty-sync), its server runner is located elsewhere:
1414
```
15-
nettyServerSync3/Test/runMain sttp.tapir.netty.sync.perf.NettySyncServerRunner
15+
nettyServerSync3/Test/runMain sttp.tapir.server.netty.sync.perf.NettySyncServerRunner
1616
```
17-
This is caused by `perf-tests` using Scala 2.13 forced by Gatling, while `NettySyncServer` is written excluisively for Scala 3.
17+
This is caused by `perf-tests` using Scala 2.13 forced by Gatling, while `NettySyncServer` is written exclusively for Scala 3.
1818

1919
## Configuring and running simulations
2020

@@ -37,7 +37,7 @@ If not set, default values will be used (see `sttp.tapir.perf.CommonSimulations`
3737

3838
## Profiling
3939

40-
To atach the profiler to a running server, it is recommended to use [async-profiler](https://github.com/async-profiler/async-profiler).
40+
To attach the profiler to a running server, it is recommended to use [async-profiler](https://github.com/async-profiler/async-profiler).
4141
Start the profiler by calling:
4242
```
4343
asprof -e cpu,alloc,lock -f profile.jfr <PID>

server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/NettySyncServer.scala

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@ import sttp.tapir.server.netty.{NettyConfig, NettyResponse, Route}
1616
import java.net.{InetSocketAddress, SocketAddress}
1717
import java.nio.file.Path
1818
import java.util.concurrent.atomic.AtomicBoolean
19-
import java.util.concurrent.{Executors, Future as JFuture}
2019
import scala.concurrent.duration.FiniteDuration
2120
import scala.concurrent.{Future, Promise}
2221
import scala.util.control.NonFatal
22+
import java.util.concurrent.atomic.AtomicReference
2323

2424
case class NettySyncServer(
2525
serverEndpoints: Vector[ServerEndpoint[OxStreams & WebSockets, Identity]],
2626
otherRoutes: Vector[IdRoute],
2727
options: NettySyncServerOptions,
2828
config: NettyConfig
2929
):
30-
private val executor = Executors.newVirtualThreadPerTaskExecutor()
3130
private val logger = LoggerFactory.getLogger(getClass.getName)
3231

3332
def addEndpoint(se: ServerEndpoint[OxStreams & WebSockets, Identity]): NettySyncServer = addEndpoints(List(se))
@@ -93,11 +92,6 @@ case class NettySyncServer(
9392
never
9493
}
9594

96-
private[netty] def start(route: Route[Identity]): NettySyncServerBinding =
97-
startUsingSocketOverride[InetSocketAddress](route, None) match
98-
case (socket, stop) =>
99-
NettySyncServerBinding(socket, stop)
100-
10195
def startUsingDomainSocket(path: Path)(using Ox): NettySyncDomainSocketBinding =
10296
startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), inScopeRunner()) match
10397
case (socket, stop) =>
@@ -109,28 +103,52 @@ case class NettySyncServer(
109103
): (SA, () => Unit) =
110104
val endpointRoute = NettySyncServerInterpreter(options).toRoute(serverEndpoints.toList, inScopeRunner)
111105
val route = Route.combine(endpointRoute +: otherRoutes)
112-
startUsingSocketOverride(route, socketOverride)
106+
startUsingSocketOverride(route, socketOverride, inScopeRunner)
113107

114-
private def startUsingSocketOverride[SA <: SocketAddress](route: Route[Identity], socketOverride: Option[SA]): (SA, () => Unit) =
108+
private def startUsingSocketOverride[SA <: SocketAddress](
109+
route: Route[Identity],
110+
socketOverride: Option[SA],
111+
inScopeRunner: InScopeRunner
112+
): (SA, () => Unit) =
115113
val eventLoopGroup = config.eventLoopConfig.initEventLoopGroup()
116114

117115
def unsafeRunF(
118116
callToExecute: () => Identity[ServerResponse[NettyResponse]]
119117
): (Future[ServerResponse[NettyResponse]], () => Future[Unit]) =
120118
val scalaPromise = Promise[ServerResponse[NettyResponse]]()
121-
val jFuture: JFuture[?] = executor.submit(new Runnable {
122-
override def run(): Unit = try {
123-
val result = callToExecute()
124-
scalaPromise.success(result)
125-
} catch {
126-
case NonFatal(e) => scalaPromise.failure(e)
127-
}
128-
})
119+
120+
// used to cancel the fork, possible states:
121+
// - Cancelled: needed if cancellation happens after the fork starts, but before it is set in the atomic reference
122+
// - CancellableFork: used when cancellation using interruption is enabled
123+
// - None: means that the fork was not set yet
124+
object Cancelled
125+
val runningFork = new AtomicReference[None.type | CancellableFork[Unit] | Cancelled.type](None)
126+
// #4747: we are creating forks using the concurrency scope within which the netty server is running
127+
// however, this is called on a Netty-managed thread, hence we need to use the inScopeRunner
128+
inScopeRunner.async {
129+
def run(): Unit =
130+
if runningFork.get() != Cancelled then
131+
try scalaPromise.success(callToExecute())
132+
catch case NonFatal(e) => scalaPromise.failure(e)
133+
134+
if options.interruptServerLogicWhenRequestCancelled then
135+
val forked = forkCancellable(run())
136+
// we only update the state if it's not cancelled already
137+
runningFork.getAndAccumulate(forked, (cur, giv) => if cur != Cancelled then giv else cur) match {
138+
case None => // common "happy path" case
139+
case _: CancellableFork[Unit] => throw new IllegalStateException("Another fork was already set")
140+
case Cancelled => forked.cancelNow() // cancellation happened before the fork was set
141+
}
142+
else forkDiscard(run())
143+
}
129144

130145
(
131146
scalaPromise.future,
132147
() => {
133-
jFuture.cancel(options.interruptServerLogicWhenRequestCancelled)
148+
runningFork.getAndSet(Cancelled) match {
149+
case fork: CancellableFork[Unit] => fork.cancelNow()
150+
case _ => // skip - fork not yet set
151+
}
134152
Future.unit
135153
}
136154
)

server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncTestServerInterpreter.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,7 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
3232
override def server(
3333
routes: NonEmptyList[IdRoute],
3434
gracefulShutdownTimeout: Option[FiniteDuration] = None
35-
): Resource[IO, Port] = {
36-
val config =
37-
NettyConfig.default.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose.noGracefulShutdown
38-
val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config)
39-
val options = NettySyncServerOptions.default
40-
val bind = IO.blocking(NettySyncServer(options, customizedConfig).start(Route.combine(routes.toList)))
41-
42-
Resource.make(bind)(server => IO.blocking(server.stop())).map(_.port)
43-
}
35+
): Resource[IO, Port] = throw new UnsupportedOperationException // instead, scoped* variants are used by NettySyncCreateServerTest
4436

4537
def scopedServerWithRoutesStop(
4638
routes: NonEmptyList[IdRoute],
@@ -50,7 +42,7 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
5042
NettyConfig.default.eventLoopGroup(eventLoopGroup).randomPort.withDontShutdownEventLoopGroupOnClose.noGracefulShutdown
5143
val customizedConfig = gracefulShutdownTimeout.map(config.withGracefulShutdownTimeout).getOrElse(config)
5244
val options = NettySyncServerOptions.default
53-
useInScope(NettySyncServer(options, customizedConfig).start(Route.combine(routes.toList)))(_.stop())
45+
useInScope(NettySyncServer(options, customizedConfig).addRoute(Route.combine(routes.toList)).start())(_.stop())
5446

5547
def scopedServerWithInterceptorsStop(
5648
endpoint: ServerEndpoint[OxStreams with WebSockets, Identity],

server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/perf/NettySyncServerRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object NettySyncServerRunner {
7474
)
7575
val wsServerEndpoint = wsEndpoint.handleSuccess(_ => wsPipe)
7676

77-
val endpoints = genEndpointsId(1)
77+
val endpoints = genEndpointsId(128)
7878

7979
def main(args: Array[String]): Unit = {
8080
val declaredPort = 8080

0 commit comments

Comments
 (0)