Skip to content

Commit 8c68173

Browse files
committed
Streaming support in Udash REST
1 parent c6274da commit 8c68173

File tree

24 files changed

+1019
-177
lines changed

24 files changed

+1019
-177
lines changed

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ object Dependencies {
114114
"javax.servlet" % "javax.servlet-api" % servletVersion,
115115
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
116116
"org.eclipse.jetty" % "jetty-server" % jettyVersion % Test,
117-
"org.eclipse.jetty.ee8" % "jetty-ee8-servlet" % jettyVersion % Test
117+
"org.eclipse.jetty.ee8" % "jetty-ee8-servlet" % jettyVersion % Test,
118118
))
119119

120120
val restSjsDeps = restCrossDeps

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 99 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.udash.rest.raw._
99
import io.udash.utils.URLEncoder
1010
import monix.eval.Task
1111
import monix.execution.Scheduler
12+
import monix.reactive.Consumer
1213

1314
import java.io.ByteArrayOutputStream
1415
import java.util.concurrent.atomic.AtomicBoolean
@@ -21,6 +22,7 @@ object RestServlet {
2122
final val DefaultHandleTimeout = 30.seconds
2223
final val DefaultMaxPayloadSize = 16 * 1024 * 1024L // 16MB
2324
final val CookieHeader = "Cookie"
25+
private final val BufferSize = 8192
2426

2527
/**
2628
* Wraps an implementation of some REST API trait into a Java Servlet.
@@ -33,18 +35,17 @@ object RestServlet {
3335
@explicitGenerics def apply[RestApi: RawRest.AsRawRpc : RestMetadata](
3436
apiImpl: RestApi,
3537
handleTimeout: FiniteDuration = DefaultHandleTimeout,
36-
maxPayloadSize: Long = DefaultMaxPayloadSize
38+
maxPayloadSize: Long = DefaultMaxPayloadSize,
3739
)(implicit
3840
scheduler: Scheduler
39-
): RestServlet = new RestServlet(RawRest.asHandleRequest[RestApi](apiImpl), handleTimeout, maxPayloadSize)
40-
41-
private final val BufferSize = 8192
41+
): RestServlet =
42+
new RestServlet(RawRest.asHandleRequestWithStreaming[RestApi](apiImpl), handleTimeout, maxPayloadSize)
4243
}
4344

4445
class RestServlet(
45-
handleRequest: RawRest.HandleRequest,
46+
handleRequest: RawRest.HandleRequestWithStreaming,
4647
handleTimeout: FiniteDuration = DefaultHandleTimeout,
47-
maxPayloadSize: Long = DefaultMaxPayloadSize
48+
maxPayloadSize: Long = DefaultMaxPayloadSize,
4849
)(implicit
4950
scheduler: Scheduler
5051
) extends HttpServlet with LazyLogging {
@@ -66,9 +67,12 @@ class RestServlet(
6667

6768
// readRequest must execute in Jetty thread but we want exceptions to be handled uniformly, hence the Try
6869
val udashRequest = Try(readRequest(request))
69-
val cancelable = Task.defer(handleRequest(udashRequest.get)).executeAsync.runAsync {
70-
case Right(restResponse) =>
71-
completeWith(writeResponse(response, restResponse))
70+
val cancelable = Task.defer(handleRequest(udashRequest.get)).flatMap { rr =>
71+
Task(setResponseHeaders(response, rr.code, rr.headers)) >>
72+
writeResponseBody(response, rr)
73+
}.executeAsync.runAsync {
74+
case Right(_) =>
75+
asyncContext.complete()
7276
case Left(e: HttpErrorException) =>
7377
completeWith(writeResponse(response, e.toResponse))
7478
case Left(e) =>
@@ -88,6 +92,92 @@ class RestServlet(
8892
})
8993
}
9094

95+
private def setResponseHeaders(response: HttpServletResponse, code: Int, headers: IMapping[PlainValue]): Unit = {
96+
response.setStatus(code)
97+
headers.entries.foreach {
98+
case (name, PlainValue(value)) => response.addHeader(name, value)
99+
}
100+
}
101+
102+
private def writeNonEmptyBody(response: HttpServletResponse, body: HttpBody.NonEmpty): Unit = {
103+
val bytes = body.bytes
104+
response.setContentType(body.contentType)
105+
response.setContentLength(bytes.length)
106+
response.getOutputStream.write(bytes)
107+
}
108+
109+
private def writeNonEmptyStreamedBody(
110+
response: HttpServletResponse,
111+
stream: StreamedRestResponse,
112+
body: StreamedBody.NonEmpty,
113+
): Task[Unit] = Task.defer {
114+
body match {
115+
case single: StreamedBody.Single =>
116+
Task.eval(writeNonEmptyBody(response, single.body))
117+
case binary: StreamedBody.RawBinary =>
118+
// TODO streaming document no content length behaviour in relation to the client
119+
response.setContentType(binary.contentType)
120+
binary.content.bufferTumbling(stream.batchSize).consumeWith(Consumer.foreach { batch =>
121+
batch.foreach(e => response.getOutputStream.write(e))
122+
response.getOutputStream.flush()
123+
})
124+
case jsonList: StreamedBody.JsonList =>
125+
// TODO streaming document no content length behaviour in relation to the client
126+
response.setContentType(jsonList.contentType)
127+
response.getOutputStream.write("[".getBytes(jsonList.charset))
128+
jsonList.elements
129+
.bufferTumbling(stream.batchSize)
130+
.zipWithIndex
131+
.consumeWith(Consumer.foreach { case (batch, idx) =>
132+
val firstBatch = idx == 0
133+
if (firstBatch)
134+
batch.iterator.zipWithIndex.foreach { case (e, idx) =>
135+
if (idx != 0) {
136+
response.getOutputStream.write(",".getBytes(jsonList.charset))
137+
}
138+
response.getOutputStream.write(e.value.getBytes(jsonList.charset))
139+
}
140+
else
141+
batch.foreach { e =>
142+
response.getOutputStream.write(",".getBytes(jsonList.charset))
143+
response.getOutputStream.write(e.value.getBytes(jsonList.charset))
144+
}
145+
response.getOutputStream.flush()
146+
})
147+
.map(_ => response.getOutputStream.write("]".getBytes(jsonList.charset)))
148+
}
149+
}
150+
151+
private def writeResponseBody(response: HttpServletResponse, rr: AbstractRestResponse): Task[Unit] =
152+
rr match {
153+
case resp: RestResponse =>
154+
resp.body match {
155+
case HttpBody.Empty => Task.unit
156+
case neBody: HttpBody.NonEmpty => Task(writeNonEmptyBody(response, neBody))
157+
}
158+
case stream: StreamedRestResponse =>
159+
stream.body match {
160+
case StreamedBody.Empty => Task.unit
161+
case neBody: StreamedBody.NonEmpty => writeNonEmptyStreamedBody(response, stream, neBody)
162+
}
163+
}
164+
165+
private def writeResponse(response: HttpServletResponse, restResponse: RestResponse): Unit = {
166+
setResponseHeaders(response, restResponse.code, restResponse.headers)
167+
restResponse.body match {
168+
case HttpBody.Empty =>
169+
case neBody: HttpBody.NonEmpty => writeNonEmptyBody(response, neBody)
170+
}
171+
}
172+
173+
private def writeFailure(response: HttpServletResponse, message: Opt[String]): Unit = {
174+
response.setStatus(500)
175+
message.foreach { msg =>
176+
response.setContentType(s"text/plain;charset=utf-8")
177+
response.getWriter.write(msg)
178+
}
179+
}
180+
91181
private def readParameters(request: HttpServletRequest): RestParameters = {
92182
// can't use request.getPathInfo because it decodes the URL before we can split it
93183
val pathPrefix = request.getContextPath.orEmpty + request.getServletPath.orEmpty
@@ -162,28 +252,4 @@ class RestServlet(
162252
val body = readBody(request)
163253
RestRequest(method, parameters, body)
164254
}
165-
166-
private def writeResponse(response: HttpServletResponse, restResponse: RestResponse): Unit = {
167-
response.setStatus(restResponse.code)
168-
restResponse.headers.entries.foreach {
169-
case (name, PlainValue(value)) => response.addHeader(name, value)
170-
}
171-
restResponse.body match {
172-
case HttpBody.Empty =>
173-
case neBody: HttpBody.NonEmpty =>
174-
// TODO: can we improve performance by avoiding intermediate byte array for textual content?
175-
val bytes = neBody.bytes
176-
response.setContentType(neBody.contentType)
177-
response.setContentLength(bytes.length)
178-
response.getOutputStream.write(bytes)
179-
}
180-
}
181-
182-
private def writeFailure(response: HttpServletResponse, message: Opt[String]): Unit = {
183-
response.setStatus(500)
184-
message.foreach { msg =>
185-
response.setContentType(s"text/plain;charset=utf-8")
186-
response.getWriter.write(msg)
187-
}
188-
}
189255
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
{
2+
"openapi": "3.0.2",
3+
"info": {
4+
"title": "Streaming Test API",
5+
"version": "0.1",
6+
"description": "Some test REST API"
7+
},
8+
"paths": {
9+
"/jsonStream": {
10+
"get": {
11+
"operationId": "jsonStream",
12+
"responses": {
13+
"200": {
14+
"description": "Success",
15+
"content": {
16+
"application/json": {
17+
"schema": {
18+
"type": "array",
19+
"items": {
20+
"$ref": "#/components/schemas/RestEntity"
21+
}
22+
}
23+
}
24+
}
25+
}
26+
}
27+
}
28+
},
29+
"/simpleStream": {
30+
"get": {
31+
"operationId": "simpleStream",
32+
"responses": {
33+
"200": {
34+
"description": "Success",
35+
"content": {
36+
"application/json": {
37+
"schema": {
38+
"type": "array",
39+
"items": {
40+
"type": "string"
41+
}
42+
}
43+
}
44+
}
45+
}
46+
}
47+
}
48+
}
49+
},
50+
"servers": [
51+
{
52+
"url": "http://localhost"
53+
}
54+
],
55+
"components": {
56+
"schemas": {
57+
"RestEntity": {
58+
"type": "object",
59+
"description": "REST entity",
60+
"properties": {
61+
"id": {
62+
"description": "entity id",
63+
"allOf": [
64+
{
65+
"$ref": "#/components/schemas/RestEntityId"
66+
}
67+
]
68+
},
69+
"name": {
70+
"type": "string",
71+
"default": "anonymous"
72+
},
73+
"subentity": {
74+
"description": "recursive optional subentity",
75+
"nullable": true,
76+
"allOf": [
77+
{
78+
"$ref": "#/components/schemas/RestEntity"
79+
}
80+
],
81+
"default": null
82+
},
83+
"enumField": {
84+
"allOf": [
85+
{
86+
"$ref": "#/components/schemas/RestEntityEnumCustom"
87+
}
88+
],
89+
"default": "OptionOne"
90+
},
91+
"inlinedEnumField": {
92+
"type": "string",
93+
"enum": [
94+
"Option1",
95+
"Option2"
96+
],
97+
"default": "Option1"
98+
},
99+
"enumMap": {
100+
"type": "object",
101+
"additionalProperties": {
102+
"$ref": "#/components/schemas/RestEntityEnumCustom"
103+
},
104+
"default": {}
105+
}
106+
},
107+
"required": [
108+
"id"
109+
]
110+
},
111+
"RestEntityEnumCustom": {
112+
"type": "string",
113+
"description": "Example named enum",
114+
"enum": [
115+
"OptionOne",
116+
"OptionTwo"
117+
],
118+
"example": "OptionOne"
119+
},
120+
"RestEntityId": {
121+
"type": "string",
122+
"description": "Entity identifier"
123+
}
124+
}
125+
}
126+
}

rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer {
1414

1515
protected def setupServer(server: Server): Unit = {
1616
val servlet = new RestServlet(serverHandle, serverTimeout, maxPayloadSize)
17-
val holder = new ServletHolder(servlet)
17+
val streamingServlet = new RestServlet(streamingServerHandle, serverTimeout, maxPayloadSize)
1818
val handler = new ServletContextHandler()
19-
handler.addServlet(holder, "/api/*")
19+
handler.addServlet(new ServletHolder(servlet), "/api/*")
20+
handler.addServlet(new ServletHolder(streamingServlet), "/stream-api/*")
2021
server.setHandler(handler)
2122
}
2223
}

rest/.jvm/src/test/scala/io/udash/rest/SomeApi.scala

Lines changed: 0 additions & 26 deletions
This file was deleted.

rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.udash
22
package rest
33

4-
import io.udash.rest.raw.HttpErrorException
5-
import io.udash.rest.raw.RawRest.HandleRequest
4+
import io.udash.rest.raw.{HttpErrorException, RawRest}
65
import sttp.client3.{HttpClientFutureBackend, SttpBackend}
76

87
import java.net.http.HttpClient
@@ -23,7 +22,7 @@ trait SttpClientRestTest extends ServletBasedRestApiTest {
2322
.build()
2423
)
2524

26-
def clientHandle: HandleRequest =
25+
def clientHandle: RawRest.HandleRequest =
2726
SttpRestClient.asHandleRequest[Future](s"$baseUrl/api")
2827

2928
override protected def afterAll(): Unit = {

rest/.jvm/src/test/scala/io/udash/rest/examples/GenericApi.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ object GenericApi {
1717

1818
import openapi._
1919
implicit def openApiMetadata[T: RestSchema]: OpenApiMetadata[GenericApi[T]] = OpenApiMetadata.materialize
20-
}
20+
}

0 commit comments

Comments
 (0)