Skip to content

Commit 666371f

Browse files
committed
Add unit tests for advanced streaming types and non-streaming implementation
1 parent 06adaae commit 666371f

File tree

3 files changed

+70
-5
lines changed

3 files changed

+70
-5
lines changed

guide/guide/.js/src/main/assets/pages/rest.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,9 +1172,6 @@ object DataStream {
11721172
}
11731173

11741174
trait CustomStreamingApi {
1175-
/** Returns a custom streaming type */
1176-
def getDataStream(query: String): DataStream[SearchResult]
1177-
11781175
/** Returns a Task that produces a custom streaming type */
11791176
def prepareAndStreamData(id: String): Task[DataStream[DataPoint]]
11801177
}

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package rest
44
import cats.implicits.catsSyntaxTuple2Semigroupal
55
import com.avsystem.commons.*
66
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
7-
import io.udash.rest.raw.{HttpErrorException, RawRest}
7+
import io.udash.rest.raw.RawRest
88
import io.udash.testing.AsyncUdashSharedTest
99
import monix.eval.Task
1010
import monix.execution.Scheduler
@@ -194,4 +194,40 @@ trait StreamingRestApiTestScenarios extends RestApiTest {
194194
}
195195
}
196196
}
197+
198+
"streaming with non-streaming client" in {
199+
val standardProxy = RawRest.fromHandleRequest[StreamingRestTestApi](clientHandle)
200+
standardProxy.simpleStream(3).toListL.materialize.runToFuture.map {
201+
case Failure(exception: UnsupportedOperationException) =>
202+
assert(exception.getMessage == "Streaming unsupported by the client")
203+
case Failure(otherException) =>
204+
fail(s"Expected UnsupportedOperationException but got ${otherException.getClass.getName}: ${otherException.getMessage}")
205+
case Success(_) =>
206+
fail("Expected UnsupportedOperationException but operation succeeded")
207+
}
208+
}
209+
210+
"task of observable stream" in {
211+
val testTask = for {
212+
proxyResults <- streamingProxy.delayedStreamTask(3, 50).flatMap(_.toListL)
213+
implResults <- streamingImpl.delayedStreamTask(3, 50).flatMap(_.toListL)
214+
} yield {
215+
assert(proxyResults.map(mkDeep) == implResults.map(mkDeep))
216+
}
217+
testTask.runToFuture
218+
}
219+
220+
"custom stream task" in {
221+
val testTask = for {
222+
proxyResults <- streamingProxy.customStreamTask(3)
223+
implResults <- streamingImpl.customStreamTask(3)
224+
proxyObs <- proxyResults.source.toListL
225+
implObs <- implResults.source.toListL
226+
} yield {
227+
assert(proxyResults.metadata == implResults.metadata)
228+
assert(proxyObs == implObs)
229+
}
230+
testTask.runToFuture
231+
}
232+
197233
}

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,29 @@
11
package io.udash
22
package rest
33

4-
import io.udash.rest.raw.HttpErrorException
4+
import com.avsystem.commons.rpc.AsRawReal
5+
import com.avsystem.commons.serialization.json.JsonStringOutput
6+
import io.udash.rest.openapi.RestSchema
7+
import io.udash.rest.raw.{HttpErrorException, JsonValue, StreamedBody}
8+
import monix.eval.Task
59
import monix.reactive.Observable
610

711
import scala.concurrent.duration.*
812

13+
case class DataStream(source: Observable[Int], metadata: Map[String, String])
14+
15+
object DataStream {
16+
implicit def schema: RestSchema[DataStream] = ???
17+
implicit def dataStreamAsRawReal: AsRawReal[StreamedBody, DataStream] =
18+
AsRawReal.create(
19+
stream => StreamedBody.JsonList(stream.source.map(i => JsonValue(JsonStringOutput.write(i)))),
20+
{
21+
case StreamedBody.JsonList(e, c) => DataStream(e.map(_.value.toInt), Map.empty)
22+
case _ => ???
23+
}
24+
)
25+
}
26+
927
trait StreamingRestTestApi {
1028
@GET def simpleStream(size: Int): Observable[Int]
1129

@@ -16,6 +34,10 @@ trait StreamingRestTestApi {
1634
@POST def errorStream(@Query immediate: Boolean): Observable[RestEntity]
1735

1836
@GET def delayedStream(@Query size: Int, @Query delayMillis: Long): Observable[Int]
37+
38+
@GET def delayedStreamTask(@Query size: Int, @Query delayMillis: Long): Task[Observable[Int]]
39+
@GET def customStreamTask(@Query size: Int): Task[DataStream]
40+
1941
}
2042
object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi] {
2143

@@ -47,5 +69,15 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
4769
.zip(Observable.intervalAtFixedRate(delayMillis.millis, delayMillis.millis))
4870
.map(_._1)
4971
}
72+
73+
override def delayedStreamTask(size: Int, delayMillis: Long): Task[Observable[Int]] =
74+
Task.delay(delayedStream(size, delayMillis))
75+
76+
override def customStreamTask(size: Int): Task[DataStream] = Task {
77+
DataStream(
78+
Observable.fromIterable(Range(0, size)),
79+
Map.empty
80+
)
81+
}
5082
}
5183
}

0 commit comments

Comments
 (0)