Skip to content

Commit 3ee14b4

Browse files
author
Krzysztof Maliszewski
committed
Enhance documentation for streaming support
1 parent d27c904 commit 3ee14b4

File tree

5 files changed

+75
-19
lines changed

5 files changed

+75
-19
lines changed

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,20 @@ class RestServlet(
111111
stream: StreamedRestResponse,
112112
body: StreamedBody.NonEmpty,
113113
): Task[Unit] = Task.defer {
114+
// The Content-Length header is intentionally omitted for streams.
115+
// This signals to the client that the response body size is not predetermined and will be streamed.
116+
// Clients implementing the streaming part of the REST interface contract MUST be prepared
117+
// to handle responses without Content-Length by reading data incrementally until the stream completes.
114118
body match {
115119
case single: StreamedBody.Single =>
116120
Task.eval(writeNonEmptyBody(response, single.body))
117121
case binary: StreamedBody.RawBinary =>
118-
// TODO streaming document no content length behaviour in relation to the client
119122
response.setContentType(binary.contentType)
120123
binary.content.bufferTumbling(stream.batchSize).consumeWith(Consumer.foreach { batch =>
121124
batch.foreach(e => response.getOutputStream.write(e))
122125
response.getOutputStream.flush()
123126
})
124127
case jsonList: StreamedBody.JsonList =>
125-
// TODO streaming document no content length behaviour in relation to the client
126128
response.setContentType(jsonList.contentType)
127129
jsonList.elements
128130
.bufferTumbling(stream.batchSize)

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,18 @@ import java.nio.charset.Charset
2020
import scala.concurrent.CancellationException
2121
import scala.concurrent.duration.*
2222

23-
/** TODO streaming doc */
24-
final class JettyRestClient(
23+
/**
24+
* A REST client implementation based on the Eclipse Jetty HTTP client library.
25+
* Supports both standard request/response interactions and handling of streamed responses.
26+
*
27+
* Streaming responses allow processing large amounts of data without buffering the entire
28+
* response body in memory. This client activates streaming mode *only* when the server's
29+
* response headers *do not* include a `Content-Length`.
30+
*
31+
* @param client The configured Jetty `HttpClient` instance.
32+
* @param defaultMaxResponseLength Default maximum size (in bytes) for buffering non-streamed responses.
33+
* @param defaultTimeout Default timeout for requests.
34+
*/final class JettyRestClient(
2535
client: HttpClient,
2636
defaultMaxResponseLength: Int = JettyRestClient.DefaultMaxResponseLength,
2737
defaultTimeout: Duration = JettyRestClient.DefaultTimeout,
@@ -37,7 +47,16 @@ final class JettyRestClient(
3747
asHandleRequestWithStreaming(baseUri, customMaxResponseLength, customTimeout)
3848
)
3949

40-
/** TODO streaming doc */
50+
/**
51+
* Creates a request handler with streaming support that can be used to make REST calls.
52+
* The handler supports both regular responses and streaming responses, allowing for
53+
* incremental processing of large payloads through Observable streams.
54+
*
55+
* @param baseUrl Base URL for the REST service
56+
* @param customMaxResponseLength Optional maximum response length override for non-streamed responses
57+
* @param customTimeout Optional timeout override
58+
* @return A handler that can process REST requests with streaming capabilities
59+
*/
4160
def asHandleRequestWithStreaming(
4261
baseUrl: String,
4362
customMaxResponseLength: OptArg[Int] = OptArg.Empty,
@@ -59,7 +78,9 @@ final class JettyRestClient(
5978

6079
override def onHeaders(response: Response): Unit = {
6180
super.onHeaders(response)
62-
// TODO streaming document content length behaviour
81+
// When Content-Length is not provided (-1), process the response as a stream
82+
// since we can't determine the full size in advance. This enables handling
83+
// chunked transfer encoding and streaming responses.
6384
val contentLength = response.getHeaders.getLongField(HttpHeader.CONTENT_LENGTH)
6485
if (contentLength == -1) {
6586
val contentTypeOpt = response.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
@@ -122,7 +143,8 @@ final class JettyRestClient(
122143
val httpResp = result.getResponse
123144
val contentLength = httpResp.getHeaders.getLongField(HttpHeader.CONTENT_LENGTH)
124145
if (contentLength != -1) {
125-
// TODO streaming client-side handle errors ?
146+
// For responses with known content length, we handle them as regular (non-streamed) responses
147+
// Any errors will be propagated through the callback's Failure channel
126148
val restResponse = StreamedRestResponse(
127149
code = httpResp.getStatus,
128150
headers = parseHeaders(httpResp),
@@ -142,7 +164,15 @@ final class JettyRestClient(
142164
}
143165
}
144166

145-
/** TODO streaming doc */
167+
/**
168+
* Creates a `RawRest.HandleRequest` which handles standard REST requests by buffering the entire response.
169+
* This does *not* support streaming responses.
170+
*
171+
* @param baseUrl The base URL for the REST service.
172+
* @param customMaxResponseLength Optional override for the maximum response length.
173+
* @param customTimeout Optional override for the request timeout.
174+
* @return A `RawRest.HandleRequest` that buffers responses.
175+
*/
146176
def asHandleRequest(
147177
baseUrl: String,
148178
customMaxResponseLength: OptArg[Int] = OptArg.Empty,

rest/src/main/scala/io/udash/rest/raw/RawRest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ trait RawRest {
141141
def asHandleRequestWithStreaming(metadata: RestMetadata[_]): HandleRequestWithStreaming =
142142
RawRest.resolveAndHandle(metadata)(handleResolvedWithStreaming)
143143

144-
// TODO doc for compatibility
144+
/**
145+
* Handles a resolved REST call and returns a standard RestResponse.
146+
* This method is maintained for backward compatibility with non-streaming clients.
147+
* It delegates to handleResolvedWithStreaming and converts any streaming response to a standard response.
148+
*/
145149
def handleResolved(request: RestRequest, resolved: ResolvedCall): Task[RestResponse] =
146150
StreamedRestResponse.fallbackToRestResponse(handleResolvedWithStreaming(request, resolved))
147151

rest/src/main/scala/io/udash/rest/raw/RestResponse.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
package io.udash
22
package rest.raw
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.misc.ImplicitNotFound
66
import com.avsystem.commons.rpc.{AsRaw, AsReal}
77
import io.udash.rest.raw.RawRest.FromTask
8-
import io.udash.rest.raw.StreamedBody.castOrFail
98
import io.udash.rest.util.Utils
109
import monix.eval.{Task, TaskLike}
1110
import monix.reactive.Observable
1211

1312
import scala.annotation.implicitNotFound
1413

15-
/** TODO streaming doc */
14+
/** Base trait for REST response types, either standard or streaming. Contains common properties like status code and headers. */
1615
sealed trait AbstractRestResponse {
1716
def code: Int
1817
def headers: IMapping[PlainValue]
1918

2019
final def isSuccess: Boolean = code >= 200 && code < 300
2120
}
2221

23-
/** TODO streaming doc */
22+
/** Standard REST response containing a status code, headers, and a body. The body is loaded fully in memory as an HttpBody. */
2423
final case class RestResponse(
2524
code: Int,
2625
headers: IMapping[PlainValue],
@@ -112,7 +111,10 @@ trait RestResponseLowPrio { this: RestResponse.type =>
112111
): ImplicitNotFound[AsRaw[RestResponse, T]] = ImplicitNotFound()
113112
}
114113

115-
/** TODO streaming doc */
114+
/**
115+
* Streaming REST response containing a status code, headers, and a streamed body.
116+
* Unlike standard RestResponse, the body content can be delivered incrementally through a reactive stream.
117+
*/
116118
final case class StreamedRestResponse(
117119
code: Int,
118120
headers: IMapping[PlainValue],
@@ -129,7 +131,10 @@ final case class StreamedRestResponse(
129131

130132
object StreamedRestResponse extends StreamedRestResponseLowPrio {
131133

132-
/** TODO streaming doc */
134+
/**
135+
* Converts a StreamedRestResponse to a standard RestResponse by materializing streamed content.
136+
* This is useful for compatibility with APIs that don't support streaming.
137+
*/
133138
def fallbackToRestResponse(response: StreamedRestResponse): Task[RestResponse] = {
134139
val httpBody: Task[HttpBody] = response.body match {
135140
case StreamedBody.Empty =>
@@ -152,7 +157,10 @@ object StreamedRestResponse extends StreamedRestResponseLowPrio {
152157
httpBody.map(RestResponse(response.code, response.headers, _))
153158
}
154159

155-
/** TODO doc */
160+
/**
161+
* Converts any AbstractRestResponse to a standard RestResponse by materializing streamed content if necessary.
162+
* This is useful for compatibility with APIs that don't support streaming.
163+
*/
156164
def fallbackToRestResponse(response: Task[AbstractRestResponse]): Task[RestResponse] =
157165
response.flatMap {
158166
case restResponse: RestResponse => Task.now(restResponse)

rest/src/main/scala/io/udash/rest/raw/StreamedBody.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,22 @@ object StreamedBody extends StreamedBodyLowPrio {
3131
def contentType: String
3232
}
3333

34-
/** TODO streaming doc */
34+
/**
35+
* Represents a binary streamed response body.
36+
* The content is delivered as a stream of byte arrays which can be processed incrementally.
37+
* Useful for large binary files or content that is generated dynamically.
38+
*/
3539
final case class RawBinary(content: Observable[Array[Byte]]) extends NonEmpty {
3640
val contentType: String = HttpBody.OctetStreamType
3741

3842
override def toString: String = super.toString
3943
}
4044

41-
/** TODO streaming doc */
45+
/**
46+
* Represents a streamed list of JSON values.
47+
* Each element in the stream is a complete JSON value, allowing for incremental processing
48+
* of potentially large collections without loading everything into memory at once.
49+
*/
4250
final case class JsonList(
4351
elements: Observable[JsonValue],
4452
charset: String = HttpBody.Utf8Charset,
@@ -48,7 +56,11 @@ object StreamedBody extends StreamedBodyLowPrio {
4856
override def toString: String = super.toString
4957
}
5058

51-
/** TODO streaming doc */
59+
/**
60+
* Represents a single non-empty HTTP body that will be delivered as a streaming response.
61+
* Used when the content is already fully loaded but needs to be returned through a streaming API
62+
* for consistency with other streaming operations.
63+
*/
5264
final case class Single(body: HttpBody.NonEmpty) extends NonEmpty {
5365
override def contentType: String = body.contentType
5466
}

0 commit comments

Comments
 (0)