Skip to content

Commit ce3711e

Browse files
committed
address mr comments
1 parent 77fe745 commit ce3711e

File tree

6 files changed

+106
-29
lines changed

6 files changed

+106
-29
lines changed

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ package rest.jetty
44
import com.avsystem.commons.*
55
import com.avsystem.commons.annotation.explicitGenerics
66
import com.avsystem.commons.serialization.json.{JsonReader, JsonStringInput}
7+
import io.udash.rest.jetty.JettyRestClient.unsupportedContentTypeError
78
import io.udash.rest.raw.*
9+
import io.udash.rest.raw.HttpErrorException.plain
810
import io.udash.rest.util.Utils
911
import io.udash.utils.URLEncoder
1012
import monix.eval.Task
11-
import monix.execution.{Callback, Scheduler}
13+
import monix.execution.{Ack, Callback, Scheduler}
1214
import monix.reactive.Observable
1315
import monix.reactive.OverflowStrategy.Unbounded
1416
import monix.reactive.subjects.{ConcurrentSubject, PublishToOneSubject}
@@ -28,9 +30,9 @@ import scala.concurrent.duration.*
2830
* response body in memory. This client activates streaming mode *only* when the server's
2931
* response headers *do not* include a `Content-Length`.
3032
*
31-
* @param client The configured Jetty `HttpClient` instance.
33+
* @param client The configured Jetty `HttpClient` instance.
3234
* @param defaultMaxResponseLength Default maximum size (in bytes) for buffering non-streamed responses.
33-
* @param defaultTimeout Default timeout for requests.
35+
* @param defaultTimeout Default timeout for requests.
3436
*/
3537
final class JettyRestClient(
3638
client: HttpClient,
@@ -53,9 +55,9 @@ final class JettyRestClient(
5355
* The handler supports both regular responses and streaming responses, allowing for
5456
* incremental processing of large payloads through Observable streams.
5557
*
56-
* @param baseUrl Base URL for the REST service
58+
* @param baseUrl Base URL for the REST service
5759
* @param customMaxResponseLength Optional maximum response length override for non-streamed responses
58-
* @param customTimeout Optional timeout override
60+
* @param customTimeout Optional timeout override
5961
* @return A handler that can process REST requests with streaming capabilities
6062
*/
6163
def asHandleRequestWithStreaming(
@@ -107,8 +109,7 @@ final class JettyRestClient(
107109
}
108110
bodyOpt.mapOr(
109111
{
110-
// TODO streaming error handling client-side
111-
callback(Failure(new Exception(s"Unsupported content type $contentTypeOpt")))
112+
callback(Failure(unsupportedContentTypeError(contentTypeOpt)))
112113
},
113114
body => {
114115
this.collectToBuffer = false
@@ -127,17 +128,19 @@ final class JettyRestClient(
127128
override def onContent(response: Response, chunk: Content.Chunk, demander: Runnable): Unit =
128129
if (collectToBuffer)
129130
super.onContent(response, chunk, demander)
130-
else
131-
if (chunk == Content.Chunk.EOF) {
132-
rawContentSubject.onComplete()
133-
} else {
134-
val buf = chunk.getByteBuffer
135-
val arr = new Array[Byte](buf.remaining)
136-
buf.get(arr)
137-
publishSubject.subscription // wait for subscription
138-
.flatMapNow(_ => rawContentSubject.onNext(arr))
139-
.mapNow(_ => demander.run())
131+
else if (chunk == Content.Chunk.EOF) {
132+
rawContentSubject.onComplete()
133+
} else {
134+
val buf = chunk.getByteBuffer
135+
val arr = new Array[Byte](buf.remaining)
136+
buf.get(arr)
137+
publishSubject.subscription // wait for subscription
138+
.flatMapNow(_ => rawContentSubject.onNext(arr))
139+
.mapNow {
140+
case Ack.Continue => demander.run()
141+
case Ack.Stop => ()
140142
}
143+
}
141144

142145
override def onComplete(result: Result): Unit =
143146
if (result.isSucceeded) {
@@ -169,9 +172,9 @@ final class JettyRestClient(
169172
* Creates a [[RawRest.HandleRequest]] which handles standard REST requests by buffering the entire response.
170173
* This does <b>not</b> support streaming responses.
171174
*
172-
* @param baseUrl The base URL for the REST service.
175+
* @param baseUrl The base URL for the REST service.
173176
* @param customMaxResponseLength Optional override for the maximum response length.
174-
* @param customTimeout Optional override for the request timeout.
177+
* @param customTimeout Optional override for the request timeout.
175178
* @return A `RawRest.HandleRequest` that buffers responses.
176179
*/
177180
def asHandleRequest(
@@ -262,6 +265,10 @@ final class JettyRestClient(
262265
object JettyRestClient {
263266
final val DefaultMaxResponseLength = 2 * 1024 * 1024
264267
final val DefaultTimeout = 10.seconds
268+
final val Streaming: HttpErrorException = plain(400, "HTTP stream failure")
269+
270+
def unsupportedContentTypeError(contentType: Opt[String]): HttpErrorException =
271+
plain(400, s"Unsupported streaming Content-Type = ${contentType.getOrElse("null")}", new UnsupportedOperationException())
265272

266273
@explicitGenerics
267274
def apply[RestApi: RawRest.AsRealRpc : RestMetadata](

rest/src/main/scala/io/udash/rest/companions.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,18 @@ abstract class RestServerApiCompanion[Implicits, Real](protected val implicits:
5454
implicit final lazy val restMetadata: RestMetadata[Real] = inst(implicits, this).metadata
5555
implicit final lazy val restAsRaw: RawRest.AsRawRpc[Real] = inst(implicits, this).asRaw
5656

57+
58+
/**
59+
* Maintained for backward compatibility with non-streaming clients.
60+
* Converts the real API implementation into a request handler without streaming support.
61+
*/
5762
final def asHandleRequest(real: Real): RawRest.HandleRequest =
5863
RawRest.asHandleRequest(real)
64+
/**
65+
* Converts the real API implementation into a request handler with streaming capabilities.
66+
*/
67+
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
68+
RawRest.asHandleRequestWithStreaming(real)
5969
}
6070

6171
/** @see [[io.udash.rest.RestApiCompanion RestApiCompanion]] */
@@ -68,6 +78,8 @@ abstract class RestServerOpenApiCompanion[Implicits, Real](protected val implici
6878

6979
final def asHandleRequest(real: Real): RawRest.HandleRequest =
7080
RawRest.asHandleRequest(real)
81+
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
82+
RawRest.asHandleRequestWithStreaming(real)
7183
}
7284

7385
/**
@@ -103,6 +115,8 @@ abstract class RestOpenApiCompanion[Implicits, Real](protected val implicits: Im
103115
RawRest.fromHandleRequest(handleRequest)
104116
final def asHandleRequest(real: Real): RawRest.HandleRequest =
105117
RawRest.asHandleRequest(real)
118+
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
119+
RawRest.asHandleRequestWithStreaming(real)
106120
}
107121

108122
trait PolyRestApiFullInstances[T[_[_]]] {
@@ -136,6 +150,8 @@ abstract class RestServerApiImplCompanion[Implicits, Real](protected val implici
136150

137151
final def asHandleRequest(real: Real): RawRest.HandleRequest =
138152
RawRest.asHandleRequest(real)
153+
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
154+
RawRest.asHandleRequestWithStreaming(real)
139155
}
140156

141157
/**
@@ -150,7 +166,6 @@ abstract class RestServerOpenApiImplCompanion[Implicits, Real](protected val imp
150166

151167
final def asHandleRequest(real: Real): RawRest.HandleRequest =
152168
RawRest.asHandleRequest(real)
153-
154169
final def asHandleRequestWithStreaming(real: Real): RawRest.HandleRequestWithStreaming =
155170
RawRest.asHandleRequestWithStreaming(real)
156171
}

rest/src/main/scala/io/udash/rest/implicits.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package io.udash
22
package rest
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.meta.Fallback
66
import com.avsystem.commons.misc.ImplicitNotFound
77
import com.avsystem.commons.rpc.{AsRaw, AsRawReal, AsReal, InvalidRpcCall}
88
import com.avsystem.commons.serialization.json.{JsonStringInput, JsonStringOutput}
99
import com.avsystem.commons.serialization.{GenCodec, GenKeyCodec}
1010
import io.udash.rest.openapi.{OpenApiMetadata, RestSchema}
11+
import io.udash.rest.raw.*
1112
import io.udash.rest.raw.RawRest.FromTask
12-
import io.udash.rest.raw._
1313
import monix.eval.Task
1414
import monix.execution.Scheduler
1515

rest/src/test/scala/io/udash/rest/CompilationErrorsTest.scala

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package io.udash
22
package rest
33

44
import io.udash.testing.CompilationErrorAssertions
5+
import monix.eval.Task
6+
import monix.reactive.Observable
57

68
import scala.concurrent.Future
79
import org.scalatest.funsuite.AnyFunSuite
@@ -17,8 +19,6 @@ class CompilationErrorsTest extends AnyFunSuite with CompilationErrorAssertions
1719
def meth(par: Any): Future[Unit]
1820
}
1921

20-
// TODO streaming add streaming tests
21-
2222
test("missing serializer for parameter") {
2323
val error = norm(typeErrorFor("object Api extends DefaultRestApiCompanion[MissingSerializerForParam]"))
2424
assert(error ==
@@ -134,4 +134,52 @@ class CompilationErrorsTest extends AnyFunSuite with CompilationErrorAssertions
134134
| Cannot serialize Unit into StreamedRestResponse, because:
135135
| Cannot serialize Unit into io.udash.rest.raw.StreamedBody, appropriate AsRaw instance not found""".stripMargin)
136136
}
137-
}
137+
138+
139+
trait MissingObservableSerializerForResult {
140+
@GET def streamMeth(): Observable[Any]
141+
}
142+
143+
test("missing serializer for Observable result element") {
144+
val error = norm(typeErrorFor("object Api extends DefaultRestServerApiImplCompanion[MissingObservableSerializerForResult]"))
145+
assert(error ==
146+
"""cannot translate between trait MissingObservableSerializerForResult and trait RawRest:
147+
|problem with method streamMeth:
148+
| * it cannot be translated into an HTTP GET method:
149+
| monix.reactive.Observable[Any] is not a valid result type because:
150+
| Cannot serialize monix.reactive.Observable[Any] into RestResponse, because:
151+
| Cannot serialize monix.reactive.Observable[Any] into HttpBody, because:
152+
| Cannot serialize monix.reactive.Observable[Any] into JsonValue, because:
153+
| No GenCodec found for monix.reactive.Observable[Any]
154+
| * it cannot be translated into an HTTP GET stream method:
155+
| monix.reactive.Observable[Any] is not a valid result type because:
156+
| Cannot serialize monix.reactive.Observable[Any] into StreamedRestResponse, because:
157+
| Cannot serialize Any into StreamedBody, because:
158+
| Cannot serialize Any into JsonValue, because:
159+
| No GenCodec found for Any""".stripMargin)
160+
}
161+
162+
trait MissingTaskObservableSerializerForResult {
163+
@GET def taskStreamMeth(): Task[Observable[Any]]
164+
}
165+
166+
test("missing serializer for Task[Observable] result element") {
167+
val error = norm(typeErrorFor("object Api extends DefaultRestApiCompanion[MissingTaskObservableSerializerForResult]"))
168+
assert(error ==
169+
"""cannot translate between trait MissingTaskObservableSerializerForResult and trait RawRest:
170+
|problem with method taskStreamMeth:
171+
| * it cannot be translated into an HTTP GET method:
172+
| monix.eval.Task[monix.reactive.Observable[Any]] is not a valid result type because:
173+
| Cannot serialize monix.reactive.Observable[Any] into RestResponse, because:
174+
| Cannot serialize monix.reactive.Observable[Any] into HttpBody, because:
175+
| Cannot serialize monix.reactive.Observable[Any] into JsonValue, because:
176+
| No GenCodec found for monix.reactive.Observable[Any]
177+
| * it cannot be translated into an HTTP GET stream method:
178+
| monix.eval.Task[monix.reactive.Observable[Any]] is not a valid result type because:
179+
| Cannot serialize monix.reactive.Observable[Any] into StreamedRestResponse, because:
180+
| Cannot serialize Any into StreamedBody, because:
181+
| Cannot serialize Any into JsonValue, because:
182+
| No GenCodec found for Any""".stripMargin)
183+
}
184+
185+
}

rest/src/test/scala/io/udash/rest/StreamingRestTestApi.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ package io.udash
22
package rest
33

44
import io.udash.rest.raw.HttpErrorException
5-
import monix.eval.Task
65
import monix.reactive.Observable
76

8-
import java.util.concurrent.atomic.AtomicBoolean
97
import scala.concurrent.duration.*
108

119
trait StreamingRestTestApi {
@@ -42,8 +40,7 @@ object StreamingRestTestApi extends DefaultRestApiCompanion[StreamingRestTestApi
4240
Observable.fromIterable(Range(0, 3)).map { i =>
4341
if (i < 2) RestEntity(RestEntityId(i.toString), "first")
4442
else throw HttpErrorException.Streaming
45-
}
46-
43+
}
4744

4845
override def delayedStream(size: Int, delayMillis: Long): Observable[Int] = {
4946
Observable.fromIterable(Range(0, size))

rest/src/test/scala/io/udash/rest/raw/ServerImplApiTest.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,5 +357,15 @@ class ServerImplApiTest extends AnyFunSuite with ScalaFutures {
357357
expectedCode = 200
358358
)
359359
}
360+
361+
test("streaming GET call to non-streaming endpoint") {
362+
val params = RestParameters(
363+
path = PlainValue.decodePath("/streamingNumbers"),
364+
query = Mapping.create("count" -> PlainValue("5"))
365+
)
366+
val request = RestRequest(HttpMethod.GET, params, HttpBody.Empty)
367+
val response = RestResponse(200, IMapping.empty, HttpBody.json(JsonValue("[1,2,3,4,5]")))
368+
assertRawExchange(request, response)
369+
}
360370
}
361371

0 commit comments

Comments
 (0)