Skip to content

Commit 96d9143

Browse files
committed
Enhance streaming support in REST API: add error handling and update documentation
1 parent c20e1bc commit 96d9143

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

guide/guide/.js/src/main/assets/pages/rest.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ Ultimately, if you don't want to use `Future`s, you may replace it with some oth
705705
e.g. Monix Task or some IO monad.
706706
See [supporting result containers other than `Future`](#supporting-result-containers-other-than-future).
707707

708-
See [streaming serialization workflow](#streaming-serialization-workflow) for details on `monix.reactive.Observable`
708+
See [streaming serialization workflow](#streaming-serialization-workflow) for details on `monix.reactive.Observable`
709709
support in streaming REST API methods.
710710

711711
### Customizing serialization
@@ -949,7 +949,7 @@ computation which yields a `RestResponse` when run.
949949

950950
### Implementing a server
951951

952-
An existing implementation of REST API trait can be easily turned into a function using
952+
An existing implementation of REST API trait can be easily turned into a function using
953953
`RawRest.asHandleRequest` or `RawRest.asHandleRequestWithStreaming` (for server-side streaming support).
954954

955955
Therefore, the only thing you need to do to expose your REST API trait as an actual web service it to turn
@@ -1096,7 +1096,7 @@ However, this conversion loses the streaming benefits, so it's best used only wh
10961096

10971097
### Error Handling
10981098

1099-
Streaming endpoints handle errors similarly to regular endpoints. When an error occurs during streaming:
1099+
Streaming endpoints handle errors similarly to regular endpoints.
11001100

11011101
```scala
11021102
// Server side
@@ -1116,6 +1116,7 @@ client.streamItems("")
11161116
```
11171117

11181118
This allows graceful handling of errors that might occur during streaming operations.
1119+
When an error occurs during streaming from the server side, the HTTP connection is closed immediately. This means any in-progress streaming is cut off at the point of error.
11191120

11201121
### Advanced Streaming Patterns
11211122

rest/.jvm/src/main/scala/io/udash/rest/RestServlet.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ class RestServlet(
117117

118118
private def writeNonEmptyStreamedBody(
119119
response: HttpServletResponse,
120-
stream: StreamedRestResponse,
121120
body: StreamedBody.NonEmpty,
122121
): Task[Unit] = Task.defer {
123122
// The Content-Length header is intentionally omitted for streams.
@@ -160,7 +159,11 @@ class RestServlet(
160159
.map(_ => response.getOutputStream.write("]".getBytes(jsonList.charset)))
161160
}
162161
}.onErrorHandle { e =>
163-
logger.error(e.getMessage)
162+
// When an error occurs during streaming, we immediately close the connection rather than
163+
// attempting to send an error response. This is intentional because:
164+
// The client has likely already received and started processing partial data
165+
// for structured formats (like JSON arrays), the stream is now in an invalid state
166+
logger.error("Failure during streaming REST response", e)
164167
response.getOutputStream.close()
165168
}
166169

@@ -174,7 +177,7 @@ class RestServlet(
174177
case stream: StreamedRestResponse =>
175178
stream.body match {
176179
case StreamedBody.Empty => Task.unit
177-
case neBody: StreamedBody.NonEmpty => writeNonEmptyStreamedBody(response, stream, neBody)
180+
case neBody: StreamedBody.NonEmpty => writeNonEmptyStreamedBody(response, neBody)
178181
}
179182
}
180183

rest/src/main/scala/io/udash/rest/RestDataCompanion.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import com.avsystem.commons.meta.MacroInstances
55
import com.avsystem.commons.misc.{AbstractValueEnumCompanion, ValueEnum, ValueOf}
66
import com.avsystem.commons.rpc.{AsRaw, AsReal}
77
import com.avsystem.commons.serialization.{GenCodec, TransparentWrapperCompanion}
8-
import io.udash.rest.openapi._
8+
import io.udash.rest.openapi.*
99
import io.udash.rest.openapi.RestStructure.NameAndAdjusters
10-
import io.udash.rest.raw.{HttpBody, JsonValue, PlainValue, RestResponse}
10+
import io.udash.rest.raw.{HttpBody, JsonValue, PlainValue, RestResponse, StreamedBody, StreamedRestResponse}
1111

1212
trait CodecWithStructure[T] {
1313
def codec: GenCodec[T]
@@ -94,6 +94,18 @@ abstract class RestDataWrapperCompanion[Wrapped, T](implicit
9494
implicit def responseAsReal(implicit wrappedAsRaw: AsReal[RestResponse, Wrapped]): AsReal[RestResponse, T] =
9595
AsReal.fromTransparentWrapping
9696

97+
implicit def streamedBodyAsRaw(implicit wrappedAsRaw: AsRaw[StreamedBody, Wrapped]): AsRaw[StreamedBody, T] =
98+
AsRaw.fromTransparentWrapping
99+
100+
implicit def streamedBodyAsReal(implicit wrappedAsRaw: AsReal[StreamedBody, Wrapped]): AsReal[StreamedBody, T] =
101+
AsReal.fromTransparentWrapping
102+
103+
implicit def streamedResponseAsRaw(implicit wrappedAsRaw: AsRaw[StreamedRestResponse, Wrapped]): AsRaw[StreamedRestResponse, T] =
104+
AsRaw.fromTransparentWrapping
105+
106+
implicit def streamedResponseAsReal(implicit wrappedAsRaw: AsReal[StreamedRestResponse, Wrapped]): AsReal[StreamedRestResponse, T] =
107+
AsReal.fromTransparentWrapping
108+
97109
implicit def restSchema(implicit wrappedSchema: RestSchema[Wrapped]): RestSchema[T] =
98110
nameAndAdjusters.restSchema(wrappedSchema)
99111

0 commit comments

Comments
 (0)