11package io .udash
22package rest
33
4- import com .avsystem .commons .rpc .AsRawReal
5- import com .avsystem .commons .serialization .json .JsonStringOutput
4+ import com .avsystem .commons .rpc .{AsRaw , AsRawReal , AsReal }
65import io .udash .rest .openapi .RestSchema
76import io .udash .rest .raw .{HttpErrorException , JsonValue , StreamedBody }
87import monix .eval .Task
98import monix .reactive .Observable
109
1110import scala .concurrent .duration .*
1211
13- case class DataStream (source : Observable [Int ], metadata : Map [String , String ])
12+ final case class DataStream (source : Observable [Int ], metadata : Map [String , String ])
13+
14+ object DataStream extends GenCodecRestImplicits {
15+ implicit def schema : RestSchema [DataStream ] =
16+ RestSchema .create(res => RestSchema .seqSchema[Seq , Int ].createSchema(res), " DataStream" )
1417
15- object DataStream {
16- implicit def schema : RestSchema [DataStream ] = ???
1718 implicit def dataStreamAsRawReal : AsRawReal [StreamedBody , DataStream ] =
1819 AsRawReal .create(
19- stream => StreamedBody .JsonList (stream.source.map(i => JsonValue ( JsonStringOutput .write(i)) )),
20- {
21- case StreamedBody . JsonList (e, c) => DataStream (e.map(_.value.toInt), Map .empty )
22- case _ => ???
23- }
20+ stream => StreamedBody .JsonList (stream.source.map(AsRaw [ JsonValue , Int ].asRaw )),
21+ rawBody => {
22+ val list = StreamedBody .castOrFail[ StreamedBody . JsonList ](rawBody )
23+ DataStream (list.elements.map( AsReal [ JsonValue , Int ].asReal), Map .empty)
24+ },
2425 )
2526}
2627
@@ -36,8 +37,8 @@ trait StreamingRestTestApi {
3637 @ GET def delayedStream (@ Query size : Int , @ Query delayMillis : Long ): Observable [Int ]
3738
3839 @ GET def delayedStreamTask (@ Query size : Int , @ Query delayMillis : Long ): Task [Observable [Int ]]
39- @ GET def customStreamTask (@ Query size : Int ): Task [DataStream ]
4040
41+ @ GET def customStreamTask (@ Query size : Int ): Task [DataStream ]
4142}
4243object StreamingRestTestApi extends DefaultRestApiCompanion [StreamingRestTestApi ] {
4344
@@ -64,11 +65,10 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
6465 else throw HttpErrorException .Streaming
6566 }
6667
67- override def delayedStream (size : Int , delayMillis : Long ): Observable [Int ] = {
68+ override def delayedStream (size : Int , delayMillis : Long ): Observable [Int ] =
6869 Observable .fromIterable(Range (0 , size))
6970 .zip(Observable .intervalAtFixedRate(delayMillis.millis, delayMillis.millis))
7071 .map(_._1)
71- }
7272
7373 override def delayedStreamTask (size : Int , delayMillis : Long ): Task [Observable [Int ]] =
7474 Task .delay(delayedStream(size, delayMillis))
0 commit comments