Skip to content

Commit f947072

Browse files
committed
Add streaming support testing to REST API with new endpoints and error handling
1 parent 3ee14b4 commit f947072

File tree

9 files changed

+564
-26
lines changed

9 files changed

+564
-26
lines changed

rest/.jvm/src/test/resources/StreamingRestTestApi.json

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,49 @@
2424
}
2525
}
2626
},
27+
"/delayedStream": {
28+
"get": {
29+
"operationId": "delayedStream",
30+
"parameters": [
31+
{
32+
"name": "size",
33+
"in": "query",
34+
"required": true,
35+
"explode": false,
36+
"schema": {
37+
"type": "integer",
38+
"format": "int32"
39+
}
40+
},
41+
{
42+
"name": "delayMillis",
43+
"in": "query",
44+
"required": true,
45+
"explode": false,
46+
"schema": {
47+
"type": "integer",
48+
"format": "int64"
49+
}
50+
}
51+
],
52+
"responses": {
53+
"200": {
54+
"description": "Success",
55+
"content": {
56+
"application/json": {
57+
"schema": {
58+
"type": "array",
59+
"items": {
60+
"type": "integer",
61+
"format": "int32"
62+
}
63+
}
64+
}
65+
}
66+
}
67+
}
68+
}
69+
},
2770
"/errorStream": {
2871
"post": {
2972
"operationId": "errorStream",

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ import scala.concurrent.duration.*
100100
Observable
101101
.fromIterator(Task.eval(input.readList().iterator(_.asInstanceOf[JsonStringInput].readRawJson())))
102102
.map(JsonValue(_))
103-
},
103+
}.onErrorFallbackTo(Observable.raiseError(HttpErrorException.Streaming)),
104104
charset = charset,
105105
)
106106
}

rest/src/main/scala/io/udash/rest/companions.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,7 @@ abstract class RestServerOpenApiImplCompanion[Implicits, Real](protected val imp
150150

151151
final def asHandleRequest(real: Real): RawRest.HandleRequest =
152152
RawRest.asHandleRequest(real)
153+
154+
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
155+
RawRest.asHandleRequestWithStreaming(real)
153156
}

rest/src/main/scala/io/udash/rest/raw/RestRequest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package io.udash
22
package rest
33
package raw
44

5-
import com.avsystem.commons.meta._
5+
import com.avsystem.commons.meta.*
66
import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx}
7-
import com.avsystem.commons.rpc._
7+
import com.avsystem.commons.rpc.*
88

99
import scala.util.control.NoStackTrace
1010

@@ -56,6 +56,7 @@ case class HttpErrorException(code: Int, payload: HttpBody = HttpBody.Empty, cau
5656
object HttpErrorException {
5757
def plain(code: Int, message: String, cause: Throwable = null): HttpErrorException =
5858
HttpErrorException(code, HttpBody.plain(message), cause)
59+
val Streaming: HttpErrorException = plain(400, "HTTP stream failure")
5960
}
6061

6162
final case class RestRequest(method: HttpMethod, parameters: RestParameters, body: HttpBody) {

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package rest
44
import cats.implicits.catsSyntaxTuple2Semigroupal
55
import com.avsystem.commons.*
66
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
7-
import io.udash.rest.raw.RawRest
7+
import io.udash.rest.raw.{HttpErrorException, RawRest}
88
import io.udash.testing.AsyncUdashSharedTest
99
import monix.eval.Task
1010
import monix.execution.Scheduler
@@ -13,6 +13,7 @@ import org.scalactic.source.Position
1313
import org.scalatest.time.{Millis, Seconds, Span}
1414
import org.scalatest.{Assertion, BeforeAndAfterEach}
1515

16+
import scala.concurrent.TimeoutException
1617
import scala.concurrent.duration.FiniteDuration
1718

1819
abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach {
@@ -28,7 +29,7 @@ abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach
2829

2930
override protected def beforeEach(): Unit = {
3031
super.beforeEach()
31-
impl.resetCounter()
32+
impl.resetCounter() // Reset non-streaming counter
3233
}
3334

3435
final val serverHandle: RawRest.HandleRequest =
@@ -168,8 +169,29 @@ trait StreamingRestApiTestScenarios extends RestApiTest {
168169
testStream(_.errorStream(immediate = true))
169170
}
170171

171-
// TODO streaming - does not work on client side
172-
"mid-stream error" ignore {
172+
"mid-stream error" in {
173173
testStream(_.errorStream(immediate = false))
174174
}
175+
176+
"slow source stream" in {
177+
testStream(_.delayedStream(size = 3, delayMillis = 100))
178+
}
179+
180+
"client-side timeout on slow stream" in {
181+
val streamTask = streamingProxy
182+
.delayedStream(size = 10, delayMillis = 200)
183+
.toListL
184+
185+
val timeoutTask = streamTask.timeout(500.millis).materialize
186+
187+
timeoutTask.runToFuture.map { result =>
188+
assert(result.isFailure, "Stream should have failed due to timeout")
189+
result match {
190+
case Failure(ex) =>
191+
assert(ex.isInstanceOf[TimeoutException], s"Expected TimeoutException, but got $ex")
192+
succeed
193+
case Success(_) => fail("Stream succeeded unexpectedly despite timeout")
194+
}
195+
}
196+
}
175197
}

rest/src/test/scala/io/udash/rest/SomeServerApiImpl.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
package io.udash.rest
22

3+
import monix.reactive.Observable
4+
35
import scala.concurrent.Future
46

57
final class SomeServerApiImpl {
68
@GET
79
def thingy(param: Int): Future[String] = Future.successful((param - 1).toString)
810

11+
@GET
12+
def streamingNumbers(count: Int): Observable[Int] =
13+
Observable.fromIterable(1 to count)
14+
15+
@POST
16+
def streamEcho(values: List[Int]): Observable[Int] =
17+
Observable.fromIterable(values)
18+
19+
@GET
20+
def streamBinary(chunkSize: Int): Observable[Array[Byte]] = {
21+
val content = "HelloWorld".getBytes
22+
Observable.fromIterable(content.grouped(chunkSize).toSeq)
23+
}
24+
25+
@GET
26+
def streamEmpty(): Observable[Array[Byte]] =
27+
Observable.empty
28+
929
val subapi = new SomeServerSubApiImpl
1030
}
1131
object SomeServerApiImpl extends DefaultRestServerApiImplCompanion[SomeServerApiImpl]

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package io.udash
22
package rest
33

44
import io.udash.rest.raw.HttpErrorException
5-
import monix.execution.Scheduler
5+
import monix.eval.Task
66
import monix.reactive.Observable
77

8-
import scala.concurrent.duration._
8+
import java.util.concurrent.atomic.AtomicBoolean
9+
import scala.concurrent.duration.*
910

1011
trait StreamingRestTestApi {
1112
@GET def simpleStream(size: Int): Observable[Int]
@@ -15,11 +16,11 @@ trait StreamingRestTestApi {
1516
@POST def binaryStream(): Observable[Array[Byte]]
1617

1718
@POST def errorStream(@Query immediate: Boolean): Observable[RestEntity]
19+
20+
@GET def delayedStream(@Query size: Int, @Query delayMillis: Long): Observable[Int]
1821
}
1922
object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi] {
2023

21-
import Scheduler.Implicits.global
22-
2324
final class Impl extends StreamingRestTestApi {
2425

2526
override def simpleStream(size: Int): Observable[Int] =
@@ -40,7 +41,14 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
4041
else
4142
Observable.fromIterable(Range(0, 3)).map { i =>
4243
if (i < 2) RestEntity(RestEntityId(i.toString), "first")
43-
else throw HttpErrorException.plain(400, "later bad")
44-
}
44+
else throw HttpErrorException.Streaming
45+
}
46+
47+
48+
override def delayedStream(size: Int, delayMillis: Long): Observable[Int] = {
49+
Observable.fromIterable(Range(0, size))
50+
.zip(Observable.intervalAtFixedRate(delayMillis.millis, delayMillis.millis))
51+
.map(_._1)
52+
}
4553
}
46-
}
54+
}

0 commit comments

Comments
 (0)