@@ -8,10 +8,13 @@ import io.udash.rest.raw.*
88import io .udash .rest .util .Utils
99import io .udash .utils .URLEncoder
1010import monix .eval .Task
11- import monix .execution .Callback
12- import monix .reactive .Observable
11+ import monix .execution .{Callback , Scheduler }
12+ import monix .reactive .OverflowStrategy .Unbounded
13+ import monix .reactive .{MulticastStrategy , Observable }
14+ import monix .reactive .subjects .{ConcurrentSubject , PublishToOneSubject }
1315import org .eclipse .jetty .client .*
1416import org .eclipse .jetty .http .{HttpCookie , HttpHeader , MimeTypes }
17+ import org .eclipse .jetty .io .Content
1518
1619import java .nio .charset .Charset
1720import scala .concurrent .CancellationException
@@ -48,36 +51,35 @@ final class JettyRestClient(
4851
4952 override def handleRequestStream (request : RestRequest ): Task [StreamedRestResponse ] =
5053 prepareRequest(baseUrl, timeout, request).flatMap { httpReq =>
51- Task .async { (callback : Callback [Throwable , StreamedRestResponse ]) =>
52- val listener = new InputStreamResponseListener {
54+ Task .async0 { (scheduler : Scheduler , callback : Callback [Throwable , StreamedRestResponse ]) =>
55+ val listener = new BufferingResponseListener (maxResponseLength) {
56+ private var collectToBuffer : Boolean = true
57+ private lazy val publishSubject = PublishToOneSubject [Array [Byte ]]()
58+ private lazy val rawContentSubject = ConcurrentSubject .from(publishSubject, Unbounded )(scheduler)
59+
5360 override def onHeaders (response : Response ): Unit = {
5461 super .onHeaders(response)
5562 // TODO streaming document content length behaviour
5663 val contentLength = response.getHeaders.getLongField(HttpHeader .CONTENT_LENGTH )
5764 if (contentLength == - 1 ) {
5865 val contentTypeOpt = response.getHeaders.get(HttpHeader .CONTENT_TYPE ).opt
5966 val mediaTypeOpt = contentTypeOpt.map(MimeTypes .getContentTypeWithoutCharset)
60- val charsetOpt = contentTypeOpt.map(MimeTypes .getCharsetFromContentType)
61- // TODO streaming error handling client-side ???
6267 val bodyOpt = mediaTypeOpt matchOpt {
6368 case Opt (HttpBody .OctetStreamType ) =>
64- // TODO streaming configure chunk size ???
65- StreamedBody .RawBinary (Observable .fromInputStream(Task .eval(getInputStream)))
69+ StreamedBody .RawBinary (content = rawContentSubject)
6670 case Opt (HttpBody .JsonType ) =>
67- val charset = charsetOpt .getOrElse(HttpBody .Utf8Charset )
71+ val charset = contentTypeOpt.map( MimeTypes .getCharsetFromContentType) .getOrElse(HttpBody .Utf8Charset )
6872 // suboptimal - maybe "online" parsing is possible using Jackson / other lib without waiting for full content ?
69- val elements : Observable [ JsonValue ] =
70- Observable
71- .fromTask(Utils .mergeArrays(Observable .fromInputStream( Task .eval(getInputStream)) ))
73+ StreamedBody . JsonList (
74+ elements = Observable
75+ .fromTask(Utils .mergeArrays(rawContentSubject ))
7276 .map(raw => new String (raw, charset))
7377 .flatMap { jsonStr =>
7478 val input = new JsonStringInput (new JsonReader (jsonStr))
7579 Observable
7680 .fromIterator(Task .eval(input.readList().iterator(_.asInstanceOf [JsonStringInput ].readRawJson())))
7781 .map(JsonValue (_))
78- }
79- StreamedBody .JsonList (
80- elements = elements,
82+ },
8183 charset = charset,
8284 )
8385 }
@@ -87,6 +89,7 @@ final class JettyRestClient(
8789 callback(Failure (new Exception (s " Unsupported content type $contentTypeOpt" )))
8890 },
8991 body => {
92+ this .collectToBuffer = false
9093 val restResponse = StreamedRestResponse (
9194 code = response.getStatus,
9295 headers = parseHeaders(response),
@@ -99,42 +102,40 @@ final class JettyRestClient(
99102 }
100103 }
101104
102- override def onComplete (result : Result ): Unit = {
103- super .onComplete(result)
105+ override def onContent (response : Response , chunk : Content .Chunk , demander : Runnable ): Unit =
106+ if (collectToBuffer)
107+ super .onContent(response, chunk, demander)
108+ else
109+ if (chunk == Content .Chunk .EOF ) {
110+ rawContentSubject.onComplete()
111+ } else {
112+ val buf = chunk.getByteBuffer
113+ val arr = new Array [Byte ](buf.remaining)
114+ buf.get(arr)
115+ publishSubject.subscription // wait for subscription
116+ .flatMapNow(_ => rawContentSubject.onNext(arr))
117+ .mapNow(_ => demander.run())
118+ }
119+
120+ override def onComplete (result : Result ): Unit =
104121 if (result.isSucceeded) {
105122 val httpResp = result.getResponse
106123 val contentLength = httpResp.getHeaders.getLongField(HttpHeader .CONTENT_LENGTH )
107124 if (contentLength != - 1 ) {
108- val contentTypeOpt = httpResp.getHeaders.get(HttpHeader .CONTENT_TYPE ).opt
109- val charsetOpt = contentTypeOpt.map(MimeTypes .getCharsetFromContentType)
110125 // TODO streaming client-side handle errors ?
111- val rawBody = getInputStream.readAllBytes()
112- val body = (contentTypeOpt, charsetOpt) match {
113- case (Opt (contentType), Opt (charset)) =>
114- StreamedBody .fromHttpBody(
115- HttpBody .textual(
116- content = new String (rawBody, charset),
117- mediaType = MimeTypes .getContentTypeWithoutCharset(contentType),
118- charset = charset,
119- )
120- )
121- case (Opt (contentType), Opt .Empty ) =>
122- StreamedBody .fromHttpBody(HttpBody .binary(rawBody, contentType))
123- case _ =>
124- StreamedBody .Empty
125- }
126126 val restResponse = StreamedRestResponse (
127127 code = httpResp.getStatus,
128128 headers = parseHeaders(httpResp),
129- body = body ,
129+ body = StreamedBody .fromHttpBody(parseHttpBody(httpResp, this )) ,
130130 batchSize = 1 ,
131131 )
132132 callback(Success (restResponse))
133+ } else {
134+ rawContentSubject.onComplete()
133135 }
134136 } else {
135137 callback(Failure (result.getFailure))
136138 }
137- }
138139 }
139140 httpReq.send(listener)
140141 }.doOnCancel(Task (httpReq.abort(new CancellationException (" Request cancelled" ))))
@@ -193,17 +194,11 @@ final class JettyRestClient(
193194 override def onComplete (result : Result ): Unit =
194195 if (result.isSucceeded) {
195196 val httpResp = result.getResponse
196- val contentTypeOpt = httpResp.getHeaders.get(HttpHeader .CONTENT_TYPE ).opt
197- val charsetOpt = contentTypeOpt.map(MimeTypes .getCharsetFromContentType)
198- val body = (contentTypeOpt, charsetOpt) match {
199- case (Opt (contentType), Opt (charset)) =>
200- HttpBody .textual(getContentAsString, MimeTypes .getContentTypeWithoutCharset(contentType), charset)
201- case (Opt (contentType), Opt .Empty ) =>
202- HttpBody .binary(getContent, contentType)
203- case _ =>
204- HttpBody .Empty
205- }
206- val response = RestResponse (httpResp.getStatus, parseHeaders(httpResp), body)
197+ val response = RestResponse (
198+ code = httpResp.getStatus,
199+ headers = parseHeaders(httpResp),
200+ body = parseHttpBody(httpResp, this ),
201+ )
207202 callback(Success (response))
208203 } else {
209204 callback(Failure (result.getFailure))
@@ -212,6 +207,23 @@ final class JettyRestClient(
212207 }
213208 .doOnCancel(Task (httpReq.abort(new CancellationException (" Request cancelled" ))))
214209
210+ private def parseHttpBody (httpResp : Response , listener : BufferingResponseListener ): HttpBody = {
211+ val contentTypeOpt = httpResp.getHeaders.get(HttpHeader .CONTENT_TYPE ).opt
212+ val charsetOpt = contentTypeOpt.map(MimeTypes .getCharsetFromContentType)
213+ (contentTypeOpt, charsetOpt) match {
214+ case (Opt (contentType), Opt (charset)) =>
215+ HttpBody .textual(
216+ content = listener.getContentAsString,
217+ mediaType = MimeTypes .getContentTypeWithoutCharset(contentType),
218+ charset = charset,
219+ )
220+ case (Opt (contentType), Opt .Empty ) =>
221+ HttpBody .binary(listener.getContent, contentType)
222+ case _ =>
223+ HttpBody .Empty
224+ }
225+ }
226+
215227 private def parseHeaders (httpResp : Response ): IMapping [PlainValue ] =
216228 IMapping (httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue (h.getValue))).toList)
217229}
0 commit comments