|
| 1 | +// {cat=Server Sent Events; effects=Direct; server=Netty}: Describe and implement an endpoint which emits SSE |
| 2 | + |
| 3 | +//> using dep com.softwaremill.sttp.tapir::tapir-core:1.12.3 |
| 4 | +//> using dep com.softwaremill.sttp.tapir::tapir-jsoniter-scala:1.12.3 |
| 5 | +//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-sync:1.12.3 |
| 6 | +//> using dep ch.qos.logback:logback-classic:1.5.8 |
| 7 | +//> using dep com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros:2.30.15 |
| 8 | + |
| 9 | +package sttp.tapir.examples.sse |
| 10 | + |
| 11 | +import ox.flow.Flow |
| 12 | +import sttp.model.sse.ServerSentEvent |
| 13 | +import sttp.tapir.* |
| 14 | +import sttp.tapir.json.jsoniter.* |
| 15 | +import sttp.tapir.server.netty.sync.{NettySyncServer, serverSentEventsBody} |
| 16 | + |
| 17 | +import scala.concurrent.duration.* |
| 18 | + |
| 19 | +import com.github.plokhotnyuk.jsoniter_scala.core.* |
| 20 | +import com.github.plokhotnyuk.jsoniter_scala.macros.* |
| 21 | + |
| 22 | +case class CompletionResponse(text: String) derives ConfiguredJsonValueCodec, Schema |
| 23 | + |
| 24 | +// the endpoint's successful response body can be either SSE, or JSON |
| 25 | +val sseOrJsonEndpoint = infallibleEndpoint.get |
| 26 | + .in("chat" / "completions") |
| 27 | + .in(query[Boolean]("stream")) |
| 28 | + .out( |
| 29 | + oneOf( |
| 30 | + // see the note: https://tapir.softwaremill.com/en/latest/endpoint/oneof.html#oneof-and-non-blocking-streaming |
| 31 | + oneOfVariantValueMatcher(serverSentEventsBody.toEndpointIO.map(Left(_))(_.value)) { case Left(_) => true }, |
| 32 | + oneOfVariantValueMatcher(jsonBody[CompletionResponse].map(Right(_))(_.value)) { case Right(_) => true } |
| 33 | + ) |
| 34 | + ) |
| 35 | + |
| 36 | +// the full type of the expected body of `sseOrJsonEndpoint` is |
| 37 | +// Either[Nothing, Either[Flow[ServerSentEvent], CompletionResponse]], |
| 38 | +// but we're using `.handleSuccess` to only handle the successful case |
| 39 | +val sseOrJsonServerEndpoint = |
| 40 | + sseOrJsonEndpoint.handleSuccess: stream => |
| 41 | + if stream then |
| 42 | + Left( |
| 43 | + Flow |
| 44 | + .tick(1.second) // emit a new event every second |
| 45 | + .map(_ => s"Event") |
| 46 | + .take(5) |
| 47 | + .map(event => ServerSentEvent(data = Some(event))) |
| 48 | + ) |
| 49 | + else Right(CompletionResponse("Hello, world!")) |
| 50 | + |
| 51 | +@main def sseOrJsonNettySyncServer(): Unit = |
| 52 | + NettySyncServer() |
| 53 | + .host("0.0.0.0") |
| 54 | + .port(8080) |
| 55 | + .addEndpoints(List(sseOrJsonServerEndpoint)) |
| 56 | + .startAndWait() |
0 commit comments