Skip to content

Commit da4b1cd

Browse files
Abort fetch requests when scope is complete (#3866)
1 parent a70abab commit da4b1cd

File tree

1 file changed

+68
-77
lines changed

1 file changed

+68
-77
lines changed

zio-http/js/src/main/scala/zio/http/internal/FetchDriver.scala

Lines changed: 68 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,31 @@ final case class FetchDriver() extends ZClient.Driver[Any, Scope, Throwable] {
2020
requestBody: Body,
2121
sslConfig: Option[ClientSSLConfig],
2222
proxy: Option[Proxy],
23-
)(implicit trace: Trace): ZIO[Scope, Throwable, Response] = {
23+
)(implicit trace: Trace): ZIO[Scope, Throwable, Response] =
2424
for {
25-
jsBody <- FetchDriver.fromZBody(requestBody)
26-
response <-
27-
ZIO.fromFuture { implicit ec =>
28-
val jsMethod = FetchDriver.fromZMethod(requestMethod)
29-
val jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
30-
for {
31-
response <- dom
32-
.fetch(
33-
url.encode,
34-
new dom.RequestInit {
35-
method = jsMethod
36-
headers = jsHeaders
37-
body = jsBody
38-
},
39-
)
40-
.toFuture
41-
} yield {
42-
val respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
43-
val ct = respHeaders.get(Header.ContentType)
44-
Response(
45-
status = Status.fromInt(response.status),
46-
headers = respHeaders,
47-
body = FetchBodyInternal.fromResponse(response, ct.map(Body.ContentType.fromHeader)),
48-
)
49-
}
50-
25+
jsBody <- FetchDriver.fromZBody(requestBody)
26+
jsMethod = FetchDriver.fromZMethod(requestMethod)
27+
jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
28+
abortSignal <- FetchDriver.makeAbortSignal
29+
response <-
30+
ZIO.fromPromiseJS {
31+
dom.fetch(
32+
url.encode,
33+
new dom.RequestInit {
34+
method = jsMethod
35+
headers = jsHeaders
36+
body = jsBody
37+
signal = abortSignal
38+
},
39+
)
5140
}
52-
} yield response
53-
}
41+
respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
42+
ct = respHeaders.get(Header.ContentType)
43+
} yield Response(
44+
status = Status.fromInt(response.status),
45+
headers = respHeaders,
46+
body = FetchBodyInternal.fromResponse(response, ct.map(Body.ContentType.fromHeader)),
47+
)
5448

5549
override def disableStreaming(implicit ev1: Scope =:= Scope): ZClient.Driver[Any, Any, Throwable] =
5650
FetchDriverBatched()
@@ -59,7 +53,7 @@ final case class FetchDriver() extends ZClient.Driver[Any, Scope, Throwable] {
5953
trace: Trace,
6054
ev: Scope =:= Scope,
6155
): ZIO[Env1 & Scope, Throwable, Response] =
62-
throw new UnsupportedOperationException("WebSockets are not supported in the js client yet.")
56+
ZIO.die(new UnsupportedOperationException("WebSockets are not supported in the js client yet."))
6357

6458
}
6559

@@ -89,6 +83,13 @@ object FetchDriver {
8983
body.asArray.map { ar => Uint8Array.of(ArraySeq.unsafeWrapArray(ar.map(_.toShort)): _*) }
9084
}
9185

86+
// Without this, if you have a streaming request, and disconnect from the stream, the connection will leak and stay open indefinitely.
87+
private[http] def makeAbortSignal: URIO[Scope, dom.AbortSignal] =
88+
for {
89+
controller <- ZIO.succeed { new dom.AbortController() }
90+
_ <- ZIO.addFinalizer { ZIO.succeed { controller.abort() } }
91+
} yield controller.signal
92+
9293
}
9394

9495
private[http] final case class FetchDriverBatched() extends ZClient.Driver[Any, Any, Throwable] {
@@ -101,54 +102,44 @@ private[http] final case class FetchDriverBatched() extends ZClient.Driver[Any,
101102
requestBody: Body,
102103
sslConfig: Option[ClientSSLConfig],
103104
proxy: Option[Proxy],
104-
)(implicit trace: Trace): ZIO[Any, Throwable, Response] = {
105-
for {
106-
jsBody <- FetchDriver.fromZBody(requestBody)
107-
response <-
108-
ZIO.fromFuture { implicit ec =>
109-
val jsMethod = FetchDriver.fromZMethod(requestMethod)
110-
val jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
111-
for {
112-
response <- dom
113-
.fetch(
114-
url.encode,
115-
new dom.RequestInit {
116-
method = jsMethod
117-
headers = jsHeaders
118-
body = jsBody
119-
},
120-
)
121-
.toFuture
122-
// fully materialize body; convert ArrayBuffer to Array[Byte] manually.
123-
// Fallback to text() on clone if needed.
124-
bytes <- response
125-
.arrayBuffer()
126-
.toFuture
127-
.map { buf =>
128-
val view = new scala.scalajs.js.typedarray.Uint8Array(buf)
129-
val out = new Array[Byte](view.length)
130-
var i = 0
131-
while (i < view.length) { out(i) = view(i).toByte; i += 1 }
132-
out
133-
}
134-
.recoverWith { case _ =>
135-
response.clone().text().toFuture.map(_.getBytes(Charsets.Http))
136-
}
137-
} yield {
138-
val respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
139-
val ct = respHeaders.get(Header.ContentType)
140-
val clHeader = respHeaders.get(Header.ContentLength)
141-
val cl = clHeader.orElse(Some(Header.ContentLength(bytes.length.toLong)))
142-
Response(
143-
status = Status.fromInt(response.status),
144-
headers = respHeaders,
145-
body = FetchBodyBatched(bytes, ct.map(Body.ContentType.fromHeader), cl),
146-
)
147-
}
148-
105+
)(implicit trace: Trace): ZIO[Any, Throwable, Response] =
106+
ZIO.scoped {
107+
for {
108+
jsBody <- FetchDriver.fromZBody(requestBody)
109+
jsMethod = FetchDriver.fromZMethod(requestMethod)
110+
jsHeaders = js.Dictionary(requestHeaders.map(h => h.headerName -> h.renderedValue).toSeq: _*)
111+
abortSignal <- FetchDriver.makeAbortSignal
112+
response <- ZIO.fromPromiseJS {
113+
dom.fetch(
114+
url.encode,
115+
new dom.RequestInit {
116+
method = jsMethod
117+
headers = jsHeaders
118+
body = jsBody
119+
signal = abortSignal
120+
},
121+
)
149122
}
150-
} yield response
151-
}
123+
bytes <- ZIO.fromPromiseJS { response.arrayBuffer() }.map { buf =>
124+
val view = new scala.scalajs.js.typedarray.Uint8Array(buf)
125+
val out = new Array[Byte](view.length)
126+
var i = 0
127+
while (i < view.length) { out(i) = view(i).toByte; i += 1 }
128+
out
129+
}.catchAllCause { _ =>
130+
ZIO.logDebug("Error fetching body bytes, using text fetch instead") *>
131+
ZIO.fromPromiseJS { response.clone().text() }.map(_.getBytes(Charsets.Http))
132+
}
133+
respHeaders = Headers.fromIterable(response.headers.map(h => Header.Custom(h(0), h(1))))
134+
ct = respHeaders.get(Header.ContentType)
135+
clHeader = respHeaders.get(Header.ContentLength)
136+
cl = clHeader.orElse(Some(Header.ContentLength(bytes.length.toLong)))
137+
} yield Response(
138+
status = Status.fromInt(response.status),
139+
headers = respHeaders,
140+
body = FetchBodyBatched(bytes, ct.map(Body.ContentType.fromHeader), cl),
141+
)
142+
}
152143

153144
override def socket[Env1 <: Any](version: Version, url: URL, headers: Headers, app: WebSocketApp[Env1])(implicit
154145
trace: Trace,

0 commit comments

Comments
 (0)