Skip to content

Commit 56f7db2

Browse files
committed
Handle empty observable result
1 parent 9c45195 commit 56f7db2

File tree

5 files changed

+68
-39
lines changed

5 files changed

+68
-39
lines changed

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package io.udash
22
package rest
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.annotation.explicitGenerics
66
import com.typesafe.scalalogging.LazyLogging
7-
import io.udash.rest.RestServlet._
8-
import io.udash.rest.raw._
7+
import io.udash.rest.RestServlet.*
8+
import io.udash.rest.raw.*
99
import io.udash.utils.URLEncoder
1010
import monix.eval.Task
1111
import monix.execution.Scheduler
12-
import monix.reactive.Consumer
12+
import monix.reactive.{Consumer, Observable}
1313

1414
import java.io.ByteArrayOutputStream
1515
import java.util.concurrent.atomic.AtomicBoolean
1616
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
1717
import javax.servlet.{AsyncEvent, AsyncListener}
1818
import scala.annotation.tailrec
19-
import scala.concurrent.duration._
19+
import scala.concurrent.duration.*
2020

2121
object RestServlet {
2222
final val DefaultHandleTimeout = 30.seconds
@@ -124,20 +124,21 @@ class RestServlet(
124124
case jsonList: StreamedBody.JsonList =>
125125
// TODO streaming document no content length behaviour in relation to the client
126126
response.setContentType(jsonList.contentType)
127-
response.getOutputStream.write("[".getBytes(jsonList.charset))
128127
jsonList.elements
129128
.bufferTumbling(stream.batchSize)
129+
.switchIfEmpty(Observable(Seq.empty))
130130
.zipWithIndex
131131
.consumeWith(Consumer.foreach { case (batch, idx) =>
132132
val firstBatch = idx == 0
133-
if (firstBatch)
133+
if (firstBatch) {
134+
response.getOutputStream.write("[".getBytes(jsonList.charset))
134135
batch.iterator.zipWithIndex.foreach { case (e, idx) =>
135136
if (idx != 0) {
136137
response.getOutputStream.write(",".getBytes(jsonList.charset))
137138
}
138139
response.getOutputStream.write(e.value.getBytes(jsonList.charset))
139140
}
140-
else
141+
} else
141142
batch.foreach { e =>
142143
response.getOutputStream.write(",".getBytes(jsonList.charset))
143144
response.getOutputStream.write(e.value.getBytes(jsonList.charset))

rest/.jvm/src/test/resources/StreamingRestTestApi.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,27 @@
66
"description": "Some test REST API"
77
},
88
"paths": {
9+
"/emptyStream": {
10+
"get": {
11+
"operationId": "emptyStream",
12+
"responses": {
13+
"200": {
14+
"description": "Success",
15+
"content": {
16+
"application/json": {
17+
"schema": {
18+
"type": "array",
19+
"items": {
20+
"type": "integer",
21+
"format": "int32"
22+
}
23+
}
24+
}
25+
}
26+
}
27+
}
28+
}
29+
},
930
"/jsonStream": {
1031
"get": {
1132
"operationId": "jsonStream",

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -99,41 +99,40 @@ final class JettyRestClient(
9999
}
100100
}
101101

102-
override def onFailure(response: Response, failure: Throwable): Unit = {
103-
super.onFailure(response, failure)
104-
// TODO streaming error handling client-side ???
105-
}
106-
107102
override def onComplete(result: Result): Unit = {
108103
super.onComplete(result)
109-
val httpResp = result.getResponse
110-
val contentLength = httpResp.getHeaders.getLongField(HttpHeader.CONTENT_LENGTH)
111-
if (contentLength != -1) {
112-
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
113-
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
114-
// TODO streaming client-side handle errors ?
115-
val rawBody = getInputStream.readAllBytes()
116-
val body = (contentTypeOpt, charsetOpt) match {
117-
case (Opt(contentType), Opt(charset)) =>
118-
StreamedBody.fromHttpBody(
119-
HttpBody.textual(
120-
content = new String(rawBody, charset),
121-
mediaType = MimeTypes.getContentTypeWithoutCharset(contentType),
122-
charset = charset,
104+
if (result.isSucceeded) {
105+
val httpResp = result.getResponse
106+
val contentLength = httpResp.getHeaders.getLongField(HttpHeader.CONTENT_LENGTH)
107+
if (contentLength != -1) {
108+
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
109+
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
110+
// TODO streaming client-side handle errors ?
111+
val rawBody = getInputStream.readAllBytes()
112+
val body = (contentTypeOpt, charsetOpt) match {
113+
case (Opt(contentType), Opt(charset)) =>
114+
StreamedBody.fromHttpBody(
115+
HttpBody.textual(
116+
content = new String(rawBody, charset),
117+
mediaType = MimeTypes.getContentTypeWithoutCharset(contentType),
118+
charset = charset,
119+
)
123120
)
124-
)
125-
case (Opt(contentType), Opt.Empty) =>
126-
StreamedBody.fromHttpBody(HttpBody.binary(rawBody, contentType))
127-
case _ =>
128-
StreamedBody.Empty
121+
case (Opt(contentType), Opt.Empty) =>
122+
StreamedBody.fromHttpBody(HttpBody.binary(rawBody, contentType))
123+
case _ =>
124+
StreamedBody.Empty
125+
}
126+
val restResponse = StreamedRestResponse(
127+
code = httpResp.getStatus,
128+
headers = parseHeaders(httpResp),
129+
body = body,
130+
batchSize = 1,
131+
)
132+
callback(Success(restResponse))
129133
}
130-
val restResponse = StreamedRestResponse(
131-
code = httpResp.getStatus,
132-
headers = parseHeaders(httpResp),
133-
body = body,
134-
batchSize = 1,
135-
)
136-
callback(Success(restResponse))
134+
} else {
135+
callback(Failure(result.getFailure))
137136
}
138137
}
139138
}

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ trait RestApiTestScenarios extends RestApiTest {
148148
// TODO streaming MORE tests: cancellation, timeouts, errors, errors after sending a few elements, custom format, slow source observable
149149
trait StreamingRestApiTestScenarios extends RestApiTest {
150150

151+
"empty GET stream" in {
152+
testStream(_.emptyStream)
153+
}
154+
151155
"trivial GET stream" in {
152156
testStream(_.simpleStream)
153157
}

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import monix.execution.Scheduler
55
import monix.reactive.Observable
66

77
trait StreamingRestTestApi {
8+
@GET def emptyStream: Observable[Int]
9+
810
@GET def simpleStream: Observable[String]
911

1012
@GET def jsonStream: Observable[RestEntity]
@@ -14,6 +16,8 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
1416
import Scheduler.Implicits.global
1517

1618
final class Impl extends StreamingRestTestApi {
19+
override def emptyStream: Observable[Int] = Observable.empty
20+
1721
override def simpleStream: Observable[String] = Observable("a", "b", "c")
1822

1923
override def jsonStream: Observable[RestEntity] = Observable(

0 commit comments

Comments
 (0)