Skip to content

Commit 0a69509

Browse files
committed
Adjust streaming tests
1 parent 1943dd6 commit 0a69509

File tree

7 files changed

+136
-67
lines changed

7 files changed

+136
-67
lines changed

rest/.jvm/src/test/resources/StreamingRestTestApi.json

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
}
2525
}
2626
},
27-
"/customStreamTask": {
27+
"/customStream": {
2828
"get": {
29-
"operationId": "customStreamTask",
29+
"operationId": "customStream",
3030
"parameters": [
3131
{
3232
"name": "size",
@@ -45,17 +45,17 @@
4545
"content": {
4646
"application/json": {
4747
"schema": {
48-
"$ref": "#/components/schemas/DataStream"
48+
"$ref": "#/components/schemas/CustomStream"
4949
}
5050
}
5151
}
5252
}
5353
}
5454
}
5555
},
56-
"/delayedStream": {
56+
"/customStreamTask": {
5757
"get": {
58-
"operationId": "delayedStream",
58+
"operationId": "customStreamTask",
5959
"parameters": [
6060
{
6161
"name": "size",
@@ -66,16 +66,6 @@
6666
"type": "integer",
6767
"format": "int32"
6868
}
69-
},
70-
{
71-
"name": "delayMillis",
72-
"in": "query",
73-
"required": true,
74-
"explode": false,
75-
"schema": {
76-
"type": "integer",
77-
"format": "int64"
78-
}
7969
}
8070
],
8171
"responses": {
@@ -84,21 +74,17 @@
8474
"content": {
8575
"application/json": {
8676
"schema": {
87-
"type": "array",
88-
"items": {
89-
"type": "integer",
90-
"format": "int32"
91-
}
77+
"$ref": "#/components/schemas/DataStream"
9278
}
9379
}
9480
}
9581
}
9682
}
9783
}
9884
},
99-
"/delayedStreamTask": {
85+
"/delayedStream": {
10086
"get": {
101-
"operationId": "delayedStreamTask",
87+
"operationId": "delayedStream",
10288
"parameters": [
10389
{
10490
"name": "size",
@@ -222,6 +208,39 @@
222208
}
223209
}
224210
}
211+
},
212+
"/streamTask": {
213+
"get": {
214+
"operationId": "streamTask",
215+
"parameters": [
216+
{
217+
"name": "size",
218+
"in": "query",
219+
"required": true,
220+
"explode": false,
221+
"schema": {
222+
"type": "integer",
223+
"format": "int32"
224+
}
225+
}
226+
],
227+
"responses": {
228+
"200": {
229+
"description": "Success",
230+
"content": {
231+
"application/json": {
232+
"schema": {
233+
"type": "array",
234+
"items": {
235+
"type": "integer",
236+
"format": "int32"
237+
}
238+
}
239+
}
240+
}
241+
}
242+
}
243+
}
225244
}
226245
},
227246
"servers": [
@@ -231,6 +250,13 @@
231250
],
232251
"components": {
233252
"schemas": {
253+
"CustomStream": {
254+
"type": "array",
255+
"items": {
256+
"type": "integer",
257+
"format": "int32"
258+
}
259+
},
234260
"DataStream": {
235261
"type": "array",
236262
"items": {

rest/.jvm/src/test/scala/io/udash/rest/openapi/OpenApiGenerationTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import scala.io.Source
88
import org.scalatest.funsuite.AnyFunSuite
99

1010
class OpenApiGenerationTest extends AnyFunSuite {
11+
1112
test("openapi for RestTestApi") {
1213
val openapi = RestTestApi.openapiMetadata.openapi(
1314
Info("Test API", "0.1", description = "Some test REST API"),

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ package rest.jetty
44
import com.avsystem.commons.*
55
import com.avsystem.commons.annotation.explicitGenerics
66
import com.avsystem.commons.serialization.json.{JsonReader, JsonStringInput}
7-
import io.udash.rest.jetty.JettyRestClient.unsupportedContentTypeError
87
import io.udash.rest.raw.*
9-
import io.udash.rest.raw.HttpErrorException.plain
108
import io.udash.rest.util.Utils
119
import io.udash.utils.URLEncoder
1210
import monix.eval.Task
@@ -103,13 +101,13 @@ final class JettyRestClient(
103101
Observable
104102
.fromIterator(Task.eval(input.readList().iterator(_.asInstanceOf[JsonStringInput].readRawJson())))
105103
.map(JsonValue(_))
106-
}.onErrorFallbackTo(Observable.raiseError(HttpErrorException.Streaming)),
104+
}.onErrorFallbackTo(Observable.raiseError(JettyRestClient.Streaming)),
107105
charset = charset,
108106
)
109107
}
110108
bodyOpt.mapOr(
111109
{
112-
callback(Failure(unsupportedContentTypeError(contentTypeOpt)))
110+
callback(Failure(JettyRestClient.unsupportedContentTypeError(contentTypeOpt)))
113111
},
114112
body => {
115113
this.collectToBuffer = false
@@ -265,10 +263,14 @@ final class JettyRestClient(
265263
object JettyRestClient {
266264
final val DefaultMaxResponseLength = 2 * 1024 * 1024
267265
final val DefaultTimeout = 10.seconds
268-
final val Streaming: HttpErrorException = plain(400, "HTTP stream failure")
266+
final val Streaming = HttpErrorException.plain(400, "HTTP stream failure")
269267

270-
def unsupportedContentTypeError(contentType: Opt[String]): HttpErrorException =
271-
plain(400, s"Unsupported streaming Content-Type = ${contentType.getOrElse("null")}", new UnsupportedOperationException())
268+
private def unsupportedContentTypeError(contentType: Opt[String]): HttpErrorException =
269+
HttpErrorException.plain(
270+
code = 400,
271+
message = s"Unsupported streaming Content-Type${contentType.mapOr("", c => s" = $c")}",
272+
cause = new UnsupportedOperationException,
273+
)
272274

273275
@explicitGenerics
274276
def apply[RestApi: RawRest.AsRealRpc : RestMetadata](

rest/src/main/scala/io/udash/rest/raw/RestRequest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ case class HttpErrorException(code: Int, payload: HttpBody = HttpBody.Empty, cau
5656
object HttpErrorException {
5757
def plain(code: Int, message: String, cause: Throwable = null): HttpErrorException =
5858
HttpErrorException(code, HttpBody.plain(message), cause)
59-
val Streaming: HttpErrorException = plain(400, "HTTP stream failure")
6059
}
6160

6261
final case class RestRequest(method: HttpMethod, parameters: RestParameters, body: HttpBody) {

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,42 @@ trait StreamingRestApiTestScenarios extends RestApiTest {
165165
testStream(_.binaryStream())
166166
}
167167

168+
"task of observable stream" in {
169+
val testTask = for {
170+
proxyResults <- streamingProxy.streamTask(size = 3).flatMap(_.toListL)
171+
implResults <- streamingImpl.streamTask(size = 3).flatMap(_.toListL)
172+
} yield {
173+
assert(proxyResults.map(mkDeep) == implResults.map(mkDeep))
174+
}
175+
testTask.runToFuture
176+
}
177+
178+
"custom stream task" in {
179+
val testTask = for {
180+
proxyResults <- streamingProxy.customStreamTask(3)
181+
implResults <- streamingImpl.customStreamTask(3)
182+
proxyObs <- proxyResults.source.toListL
183+
implObs <- implResults.source.toListL
184+
} yield {
185+
assert(proxyResults.metadata == implResults.metadata)
186+
assert(proxyObs == implObs)
187+
}
188+
testTask.runToFuture
189+
}
190+
191+
"custom stream" in {
192+
val testTask = for {
193+
proxyResults <- streamingProxy.customStream(3)
194+
implResults <- streamingImpl.customStream(3)
195+
proxyObs <- proxyResults.source.toListL
196+
implObs <- implResults.source.toListL
197+
} yield {
198+
assert(proxyResults.code == implResults.code)
199+
assert(proxyObs == implObs)
200+
}
201+
testTask.runToFuture
202+
}
203+
168204
"immediate stream error" in {
169205
testStream(_.errorStream(immediate = true))
170206
}
@@ -184,15 +220,16 @@ trait StreamingRestApiTestScenarios extends RestApiTest {
184220

185221
val timeoutTask = streamTask.timeout(500.millis).materialize
186222

187-
timeoutTask.runToFuture.map { result =>
223+
timeoutTask.map { result =>
188224
assert(result.isFailure, "Stream should have failed due to timeout")
189225
result match {
190226
case Failure(ex) =>
191227
assert(ex.isInstanceOf[TimeoutException], s"Expected TimeoutException, but got $ex")
192228
succeed
193-
case Success(_) => fail("Stream succeeded unexpectedly despite timeout")
229+
case Success(_) =>
230+
fail("Stream succeeded unexpectedly despite timeout")
194231
}
195-
}
232+
}.runToFuture
196233
}
197234

198235
"streaming with non-streaming client" in {
@@ -206,28 +243,4 @@ trait StreamingRestApiTestScenarios extends RestApiTest {
206243
fail("Expected UnsupportedOperationException but operation succeeded")
207244
}
208245
}
209-
210-
"task of observable stream" in {
211-
val testTask = for {
212-
proxyResults <- streamingProxy.delayedStreamTask(3, 50).flatMap(_.toListL)
213-
implResults <- streamingImpl.delayedStreamTask(3, 50).flatMap(_.toListL)
214-
} yield {
215-
assert(proxyResults.map(mkDeep) == implResults.map(mkDeep))
216-
}
217-
testTask.runToFuture
218-
}
219-
220-
"custom stream task" in {
221-
val testTask = for {
222-
proxyResults <- streamingProxy.customStreamTask(3)
223-
implResults <- streamingImpl.customStreamTask(3)
224-
proxyObs <- proxyResults.source.toListL
225-
implObs <- implResults.source.toListL
226-
} yield {
227-
assert(proxyResults.metadata == implResults.metadata)
228-
assert(proxyObs == implObs)
229-
}
230-
testTask.runToFuture
231-
}
232-
233246
}

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package rest
33

44
import com.avsystem.commons.rpc.{AsRaw, AsRawReal, AsReal}
55
import io.udash.rest.openapi.RestSchema
6-
import io.udash.rest.raw.{HttpErrorException, JsonValue, StreamedBody}
6+
import io.udash.rest.raw.*
77
import monix.eval.Task
88
import monix.reactive.Observable
99

@@ -15,7 +15,7 @@ object DataStream extends GenCodecRestImplicits {
1515
implicit def schema: RestSchema[DataStream] =
1616
RestSchema.create(res => RestSchema.seqSchema[Seq, Int].createSchema(res), "DataStream")
1717

18-
implicit def dataStreamAsRawReal: AsRawReal[StreamedBody, DataStream] =
18+
implicit val dataStreamAsRawReal: AsRawReal[StreamedBody, DataStream] =
1919
AsRawReal.create(
2020
stream => StreamedBody.JsonList(stream.source.map(AsRaw[JsonValue, Int].asRaw)),
2121
rawBody => {
@@ -25,20 +25,42 @@ object DataStream extends GenCodecRestImplicits {
2525
)
2626
}
2727

28+
final case class CustomStream(source: Observable[Int], code: Int)
29+
object CustomStream extends GenCodecRestImplicits {
30+
implicit def schema: RestSchema[CustomStream] =
31+
RestSchema.create(res => RestSchema.seqSchema[Seq, Int].createSchema(res), "CustomStream")
32+
33+
implicit val customStreamAsRawReal: AsRawReal[StreamedRestResponse, CustomStream] =
34+
AsRawReal.create(
35+
stream => StreamedRestResponse(
36+
code = stream.code,
37+
headers = IMapping.empty,
38+
body = StreamedBody.JsonList(stream.source.map(AsRaw[JsonValue, Int].asRaw)),
39+
),
40+
rawResponse => {
41+
val list = StreamedBody.castOrFail[StreamedBody.JsonList](rawResponse.body)
42+
CustomStream(list.elements.map(AsReal[JsonValue, Int].asReal), rawResponse.code)
43+
},
44+
)
45+
}
46+
2847
trait StreamingRestTestApi {
2948
@GET def simpleStream(size: Int): Observable[Int]
3049

3150
@GET def jsonStream: Observable[RestEntity]
3251

3352
@POST def binaryStream(): Observable[Array[Byte]]
3453

54+
@streamingResponseBatchSize(3)
3555
@POST def errorStream(@Query immediate: Boolean): Observable[RestEntity]
3656

3757
@GET def delayedStream(@Query size: Int, @Query delayMillis: Long): Observable[Int]
3858

39-
@GET def delayedStreamTask(@Query size: Int, @Query delayMillis: Long): Task[Observable[Int]]
59+
@GET def streamTask(@Query size: Int): Task[Observable[Int]]
4060

4161
@GET def customStreamTask(@Query size: Int): Task[DataStream]
62+
63+
@GET def customStream(@Query size: Int): Task[CustomStream]
4264
}
4365
object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi] {
4466

@@ -62,22 +84,29 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
6284
else
6385
Observable.fromIterable(Range(0, 3)).map { i =>
6486
if (i < 2) RestEntity(RestEntityId(i.toString), "first")
65-
else throw HttpErrorException.Streaming
87+
else throw HttpErrorException.plain(400, "bad stream")
6688
}
6789

6890
override def delayedStream(size: Int, delayMillis: Long): Observable[Int] =
6991
Observable.fromIterable(Range(0, size))
7092
.zip(Observable.intervalAtFixedRate(delayMillis.millis, delayMillis.millis))
7193
.map(_._1)
7294

73-
override def delayedStreamTask(size: Int, delayMillis: Long): Task[Observable[Int]] =
74-
Task.delay(delayedStream(size, delayMillis))
95+
override def streamTask(size: Int): Task[Observable[Int]] =
96+
Task.eval(Observable.fromIterable(Range(0, size)))
7597

7698
override def customStreamTask(size: Int): Task[DataStream] = Task {
7799
DataStream(
78-
Observable.fromIterable(Range(0, size)),
79-
Map.empty
100+
source = Observable.fromIterable(Range(0, size)),
101+
metadata = Map.empty
102+
)
103+
}
104+
105+
override def customStream(size: Int): Task[CustomStream] = Task {
106+
CustomStream(
107+
source = Observable.fromIterable(Range(0, size)),
108+
code = 200,
80109
)
81110
}
82111
}
83-
}
112+
}

rest/src/test/scala/io/udash/rest/raw/ServerImplApiTest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,4 +368,3 @@ class ServerImplApiTest extends AnyFunSuite with ScalaFutures {
368368
assertRawExchange(request, response)
369369
}
370370
}
371-

0 commit comments

Comments
 (0)