Skip to content

Commit 0bcc120

Browse files
committed
Client-side cancellation
1 parent fcb0c93 commit 0bcc120

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

rest/jetty/src/main/scala/io/udash/rest/jetty/JettyRestClient.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ final class JettyRestClient(
7171

7272
override def handleRequestStream(request: RestRequest): Task[StreamedRestResponse] =
7373
prepareRequest(baseUrl, timeout, request).flatMap { httpReq =>
74-
Task.async0 { (scheduler: Scheduler, callback: Callback[Throwable, StreamedRestResponse]) =>
74+
def cancelRequest: Task[Unit] =
75+
Task(httpReq.abort(new CancellationException("Request cancelled")).discard)
76+
77+
Task.cancelable0 { (scheduler: Scheduler, callback: Callback[Throwable, StreamedRestResponse]) =>
7578
val listener = new BufferingResponseListener(maxResponseLength) {
7679
private var collectToBuffer: Boolean = true
7780
private lazy val publishSubject = PublishToOneSubject[Array[Byte]]()
@@ -88,7 +91,7 @@ final class JettyRestClient(
8891
val mediaTypeOpt = contentTypeOpt.map(MimeTypes.getContentTypeWithoutCharset)
8992
val bodyOpt = mediaTypeOpt matchOpt {
9093
case Opt(HttpBody.OctetStreamType) =>
91-
StreamedBody.RawBinary(content = rawContentSubject)
94+
StreamedBody.RawBinary(content = rawContentSubject.doOnSubscriptionCancel(cancelRequest))
9295
case Opt(HttpBody.JsonType) =>
9396
val charset = contentTypeOpt.map(MimeTypes.getCharsetFromContentType).getOrElse(HttpBody.Utf8Charset)
9497
// suboptimal - maybe "online" parsing is possible using Jackson / other lib without waiting for full content ?
@@ -101,7 +104,9 @@ final class JettyRestClient(
101104
Observable
102105
.fromIterator(Task.eval(input.readList().iterator(_.asInstanceOf[JsonStringInput].readRawJson())))
103106
.map(JsonValue(_))
104-
}.onErrorFallbackTo(Observable.raiseError(JettyRestClient.Streaming)),
107+
}
108+
.doOnSubscriptionCancel(cancelRequest)
109+
.onErrorFallbackTo(Observable.raiseError(JettyRestClient.Streaming)),
105110
charset = charset,
106111
)
107112
}
@@ -134,9 +139,9 @@ final class JettyRestClient(
134139
publishSubject.subscription // wait for subscription
135140
.flatMapNow(_ => rawContentSubject.onNext(arr))
136141
.mapNow {
137-
case Ack.Continue => demander.run()
138-
case Ack.Stop => ()
139-
}
142+
case Ack.Continue => demander.run()
143+
case Ack.Stop => ()
144+
}
140145
}
141146

142147
override def onComplete(result: Result): Unit =
@@ -160,7 +165,9 @@ final class JettyRestClient(
160165
}
161166
}
162167
httpReq.send(listener)
163-
}.doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled"))))
168+
169+
cancelRequest // see cats.effect#CancelToken
170+
}
164171
}
165172
}
166173

0 commit comments

Comments
 (0)