Skip to content

Commit 06adaae

Browse files
committed
Add documentation
1 parent ce3711e commit 06adaae

File tree

2 files changed

+191
-20
lines changed

2 files changed

+191
-20
lines changed

guide/guide/.js/src/main/assets/pages/rest.md

Lines changed: 190 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ derived from `GenCodec` instance.
703703

704704
Ultimately, if you don't want to use `Future`s, you may replace it with some other asynchronous wrapper type,
705705
e.g. Monix Task or some IO monad.
706-
See [supporting result containers other than `Future`](#supporting-result-containers-other-than-future).
706+
See [supporting result containers other than `Future`](#supporting-result-containers-other-than-future), [streaming serialization workflow](#streaming-serialization-workflow).
707707

708708
### Customizing serialization
709709

@@ -1042,35 +1042,55 @@ When a client makes a request to a streaming endpoint:
10421042

10431043
This approach allows processing of potentially unlimited amounts of data with minimal memory footprint on both the client and server.
10441044

1045-
### Streaming Types
1045+
#### Streaming response types
10461046

1047-
Udash REST supports two main streaming content types:
1047+
Streaming endpoints return data through a `StreamedRestResponse` instead of a regular `RestResponse`. This special
1048+
response type contains a `StreamedBody` which delivers content incrementally, rather than all at once.
10481049

1049-
1. **JSON Lists** - A stream of JSON objects sent as a JSON array `[{...}, {...}, ...]`
1050-
2. **Raw Binary** - A stream of binary data chunks, for files or other binary content
1050+
The framework supports two primary types of streaming responses:
10511051

1052-
### Implementation Details
1052+
1. **JSON Lists** - For streaming regular objects (any type `T` with a valid `RestSchema`), the content is delivered
1053+
as a stream of JSON values with `application/json` content type. Each element in the `Observable` is serialized
1054+
to JSON individually, allowing the client to process items as they arrive.
10531055

1054-
- Under the hood, streaming is implemented using Monix `Observable`
1055-
- Binary streams are transmitted as raw byte arrays
1056-
- JSON streams are automatically serialized/deserialized between JSON and your data types
1057-
- The server can control batch size to optimize network usage versus memory consumption
1056+
2. **Binary Streams** - For streaming binary data (`Observable[Array[Byte]]`), the content is delivered as a raw binary
1057+
stream with `application/octet-stream` content type. This is particularly useful for large file downloads or
1058+
real-time binary data processing.
10581059

1059-
### Handling Large Response Collections
1060+
#### Streaming serialization workflow
10601061

1061-
When dealing with large collections, streaming is preferable to loading everything in memory:
1062+
When a method returns an `Observable[T]`, the serialization flow is:
10621063

1063-
```scala
1064-
// Without streaming - entire list is loaded in memory
1065-
def getAllItems(): Future[List[Item]]
1064+
1. Each element of type `T` is serialized using the appropriate `AsRaw[JsonValue, T]` instance
1065+
2. These elements are delivered incrementally as part of a `StreamedBody.JsonList`
1066+
3. The client can process the items as they arrive, without waiting for the entire stream to complete
1067+
1068+
For binary data (`Observable[Array[Byte]]`), each byte array chunk is directly sent through a `StreamedBody.RawBinary`
1069+
without additional transformation.
1070+
1071+
#### Customizing streaming serialization
10661072

1067-
// With streaming - items are processed incrementally
1068-
def streamAllItems(): Observable[Item]
1073+
Just as with regular responses, you can customize how streaming responses are serialized. For instance, you might want
1074+
to provide a custom instance of `AsRaw[StreamedBody, Observable[T]]` for a specific type:
1075+
1076+
```scala
1077+
// Custom serialization for streaming a specialized data type
1078+
implicit def customStreamingFormat[T: CustomFormat]: AsRaw[StreamedBody, Observable[T]] =
1079+
obs => StreamedBody.JsonList(obs.map(customToJsonValue))
10691080
```
10701081

1071-
The streaming version allows processing data incrementally, which is crucial for very large datasets that might exceed available memory.
1082+
#### Compatibility with non-streaming clients
10721083

1073-
## Error Handling with Streaming
1084+
For backward compatibility with clients that don't support streaming, the framework provides automatic conversion from
1085+
streaming responses to standard responses using `StreamedRestResponse.fallbackToRestResponse`. This materialization
1086+
process collects all elements from the stream and combines them into a single response:
1087+
1088+
- For JSON streams, elements are collected into a JSON array
1089+
- For binary streams, byte arrays are concatenated
1090+
1091+
However, this conversion loses the streaming benefits, so it's best used only when necessary.
1092+
1093+
### Error Handling
10741094

10751095
Streaming endpoints handle errors similarly to regular endpoints. When an error occurs during streaming:
10761096

@@ -1093,6 +1113,76 @@ client.streamItems("")
10931113

10941114
This allows graceful handling of errors that might occur during streaming operations.
10951115

1116+
### Advanced Streaming Patterns
1117+
1118+
You can also use more advanced patterns for streaming responses:
1119+
1120+
#### Task of Observable
1121+
1122+
You can return a `Task` that resolves to an `Observable` when you need to perform some asynchronous work before starting the stream:
1123+
1124+
```scala
1125+
import monix.eval.Task
1126+
import monix.reactive.Observable
1127+
1128+
trait AdvancedStreamingApi {
1129+
/** Returns a Task that resolves to an Observable stream */
1130+
def streamWithInitialProcessing(id: String): Task[Observable[DataPoint]]
1131+
}
1132+
object AdvancedStreamingApi extends DefaultRestApiCompanion[AdvancedStreamingApi]
1133+
```
1134+
1135+
Implementation example:
1136+
1137+
```scala
1138+
class AdvancedStreamingApiImpl extends AdvancedStreamingApi {
1139+
def streamWithInitialProcessing(id: String): Task[Observable[DataPoint]] =
1140+
// First perform some async initialization work
1141+
Task.delay {
1142+
println(s"Starting stream for $id")
1143+
// Then return the actual stream
1144+
Observable.interval(1.second)
1145+
.map(i => DataPoint(id, i, System.currentTimeMillis()))
1146+
}
1147+
}
1148+
```
1149+
1150+
#### Custom Streaming Types
1151+
1152+
You can create custom types with streaming capabilities by defining appropriate serialization in their companion objects:
1153+
1154+
```scala
1155+
import monix.reactive.Observable
1156+
1157+
// Custom wrapper around a stream of values
1158+
case class DataStream[T](source: Observable[T], metadata: Map[String, String])
1159+
1160+
object DataStream {
1161+
// Define how to serialize DataStream to StreamedBody
1162+
implicit def dataStreamAsRawReal[T](implicit jsonAsRaw: AsRaw[JsonValue, T]): AsRawReal[StreamedBody, DataStream[T]] =
1163+
AsRawReal.create(
1164+
// Serialization: DataStream -> StreamedBody
1165+
stream => StreamedBody.JsonList(stream.source.map(jsonAsRaw.asRaw)),
1166+
// Deserialization: StreamedBody -> DataStream
1167+
body => {
1168+
val elements = StreamedBody.castOrFail[StreamedBody.JsonList](body).elements
1169+
DataStream(elements.map(jsonAsReal.asReal), Map.empty)
1170+
}
1171+
)
1172+
}
1173+
1174+
trait CustomStreamingApi {
1175+
/** Returns a custom streaming type */
1176+
def getDataStream(query: String): DataStream[SearchResult]
1177+
1178+
/** Returns a Task that produces a custom streaming type */
1179+
def prepareAndStreamData(id: String): Task[DataStream[DataPoint]]
1180+
}
1181+
object CustomStreamingApi extends DefaultRestApiCompanion[CustomStreamingApi]
1182+
```
1183+
1184+
This approach allows you to include additional metadata or context with your streams while maintaining the streaming behavior.
1185+
10961186
## Generating OpenAPI 3.0 specifications
10971187

10981188
[OpenAPI](https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md) is an open standard for describing
@@ -1388,7 +1478,88 @@ Because multiple REST HTTP methods may have the same path, adjusters are collect
13881478
are applied on the associated Path Item Object. When path item adjuster is applied on a [prefix method](#prefix-methods),
13891479
it will apply to all Path Item Objects associated with result of this prefix method.
13901480

1481+
### OpenAPI for Streaming Endpoints
1482+
1483+
When using Udash REST's streaming capabilities, OpenAPI specifications are automatically generated to reflect the streaming nature of the endpoints. This section describes how streaming responses are represented in OpenAPI documents.
1484+
1485+
#### Streaming Response Schema
1486+
1487+
Methods that return `Observable[T]` are automatically recognized as streaming endpoints. In the generated OpenAPI document, these endpoints are represented as arrays of the type `T`:
1488+
1489+
```scala
1490+
trait StreamingApi {
1491+
// This will be documented as an array of Item in OpenAPI
1492+
def streamItems(filter: String): Observable[Item]
1493+
}
1494+
```
1495+
1496+
For a streaming endpoint returning `Observable[Item]`, the generated schema will represent this as an array of `Item` objects. The framework automatically wraps the element type in an array schema to indicate that multiple items may be delivered over time.
1497+
1498+
#### Supported Streaming Formats
1499+
1500+
The OpenAPI document correctly describes the available media types for streaming responses:
1501+
1502+
1. **JSON Lists** - When streaming regular objects, they're represented as a JSON array in the schema.
1503+
This is reflected in the OpenAPI document with content type `application/json`.
1504+
1505+
2. **Binary Streams** - When streaming `Array[Byte]` (binary data), the content type in the OpenAPI document
1506+
will be `application/octet-stream`.
1507+
1508+
Example schema representation for a streaming endpoint:
1509+
1510+
```json
1511+
{
1512+
"paths": {
1513+
"/streamItems": {
1514+
"get": {
1515+
"responses": {
1516+
"200": {
1517+
"description": "Success",
1518+
"content": {
1519+
"application/json": {
1520+
"schema": {
1521+
"type": "array",
1522+
"items": {
1523+
"$ref": "#/components/schemas/Item"
1524+
}
1525+
}
1526+
}
1527+
}
1528+
}
1529+
}
1530+
}
1531+
}
1532+
}
1533+
}
1534+
```
1535+
1536+
#### Customizing OpenAPI for Streaming Endpoints
1537+
1538+
You can use the same annotation-based customization mechanisms discussed earlier to modify the OpenAPI documentation for streaming endpoints:
1539+
1540+
```scala
1541+
trait StreamingApi {
1542+
@description("Streams items matching the filter criteria")
1543+
@adjustOperation(op => op.copy(
1544+
responses = op.responses.copy(
1545+
byStatusCode = op.responses.byStatusCode + (
1546+
200 -> RefOr(Response(
1547+
description = "A stream of matching items that may be processed incrementally",
1548+
content = op.responses.byStatusCode(200).value.content
1549+
))
1550+
)
1551+
)
1552+
))
1553+
def streamItems(filter: String): Observable[Item]
1554+
1555+
@description("Streams binary file data")
1556+
def downloadFile(id: String): Observable[Array[Byte]]
1557+
}
1558+
```
1559+
13911560
### Limitations
13921561

13931562
- Current representation of OpenAPI document does not support
13941563
[specification extensions](https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#specificationExtensions).
1564+
1565+
- While the OpenAPI specification doesn't have native support for true streaming semantics, Udash REST represents streaming endpoints as array responses. This is the most accurate representation within the constraints of the OpenAPI specification format. Consumers of your API should understand that these array responses may be delivered incrementally rather than all at once, especially for potentially large datasets.

rest/src/main/scala/io/udash/rest/raw/RestResponse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ sealed trait AbstractRestResponse {
1919
final def isSuccess: Boolean = code >= 200 && code < 300
2020
}
2121

22-
/** Standard REST response containing a status code, headers, and a body. The body is loaded fully in memory as an HttpBody. */
22+
/** Standard REST response containing a status code, headers, and a body. The body is loaded fully in memory as an [[HttpBody]]. */
2323
final case class RestResponse(
2424
code: Int,
2525
headers: IMapping[PlainValue],

0 commit comments

Comments
 (0)