Skip to content

Commit ca5b901

Browse files
authored
Merge branch 'main' into fix/io-read_input_stream-overallocation
2 parents 4e10816 + 62d25fa commit ca5b901

File tree

6 files changed

+41
-30
lines changed

6 files changed

+41
-30
lines changed

.scalafmt.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version = "3.7.14"
1+
version = "3.7.15"
22

33
style = default
44

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
265265
.settings(
266266
name := "fs2-core",
267267
libraryDependencies ++= Seq(
268-
"org.scodec" %%% "scodec-bits" % "1.1.37",
268+
"org.scodec" %%% "scodec-bits" % "1.1.38",
269269
"org.typelevel" %%% "cats-core" % "2.10.0",
270270
"org.typelevel" %%% "cats-effect" % "3.5.2",
271271
"org.typelevel" %%% "cats-effect-laws" % "3.5.2" % Test,

io/js/src/main/scala/fs2/io/NodeStream.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,5 @@ trait Duplex extends Readable with Writable {
7272
protected[io] override def destroy(): this.type = js.native
7373
}
7474

75+
@deprecated("No longer raised", "3.9.3")
7576
final class StreamDestroyedException private[io] () extends IOException

io/js/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,30 @@ private[fs2] trait ioplatform {
109109
/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
110110
* that ends whenever the resulting stream terminates.
111111
*/
112-
def toReadable[F[_]](implicit F: Async[F]): Pipe[F, Byte, Readable] =
113-
in =>
114-
Stream
115-
.resource(mkDuplex(in))
116-
.flatMap { case (duplex, out) =>
117-
Stream
118-
.emit(duplex)
119-
.merge(out.drain)
120-
.concurrently(
121-
Stream.eval(
122-
F.async_[Unit](cb =>
123-
duplex.end { e =>
124-
cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
125-
}
126-
)
127-
)
128-
)
129-
}
130-
.adaptError { case IOException(ex) => ex }
112+
def toReadable[F[_]: Async]: Pipe[F, Byte, Readable] =
113+
in => Stream.resource(toReadableResource(in))
131114

132115
/** Like [[toReadable]] but returns a `Resource` rather than a single element stream.
133116
*/
134-
def toReadableResource[F[_]: Async](s: Stream[F, Byte]): Resource[F, Readable] =
135-
s.through(toReadable).compile.resource.lastOrError
117+
def toReadableResource[F[_]](s: Stream[F, Byte])(implicit F: Async[F]): Resource[F, Readable] =
118+
mkDuplex(s)
119+
.flatMap { case (duplex, out) =>
120+
out
121+
.concurrently(
122+
Stream.eval(
123+
F.async_[Unit](cb =>
124+
duplex.end { e =>
125+
cb(e.filterNot(_ == null).toLeft(()).leftMap(js.JavaScriptException))
126+
}
127+
)
128+
)
129+
)
130+
.compile
131+
.drain
132+
.background
133+
.as(duplex)
134+
}
135+
.adaptError { case IOException(ex) => ex }
136136

137137
/** Writes all bytes to the specified `Writable`.
138138
*/
@@ -206,7 +206,7 @@ private[fs2] trait ioplatform {
206206
errorDispatcher <- Dispatcher.sequential[F]
207207
readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource
208208
writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource
209-
error <- F.deferred[Throwable].toResource
209+
interrupt <- F.deferred[Either[Throwable, Unit]].toResource
210210
duplex <- Resource.make {
211211
F.delay {
212212
new facade.stream.Duplex(
@@ -236,10 +236,9 @@ private[fs2] trait ioplatform {
236236

237237
var destroy = { (_, err, cb) =>
238238
errorDispatcher.unsafeRunAndForget {
239-
error
239+
interrupt
240240
.complete(
241-
Option(err)
242-
.fold[Exception](new StreamDestroyedException)(js.JavaScriptException(_))
241+
Option(err).map(js.JavaScriptException(_)).toLeft(())
243242
) *> F.delay(cb(null))
244243
}
245244
}
@@ -254,10 +253,9 @@ private[fs2] trait ioplatform {
254253
}
255254
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
256255
out = writeChannel.stream.unchunks
257-
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit])))
258256
} yield (
259257
duplex,
260-
drainIn.merge(out).adaptError { case IOException(ex) => ex }
258+
drainIn.merge(out).interruptWhen(interrupt).adaptError { case IOException(ex) => ex }
261259
)
262260

263261
/** Stream of bytes read asynchronously from standard input. */

io/js/src/test/scala/fs2/io/IoPlatformSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,16 @@ class IoPlatformSuite extends Fs2Suite {
114114
.timeoutTo(100.millis, IO.unit)
115115
}
116116

117+
test("Destroying Node.js stream without error does not raise an exception") {
118+
Stream
119+
.never[IO]
120+
.through {
121+
toDuplexAndRead[IO] { duplex =>
122+
IO(duplex.destroy())
123+
}
124+
}
125+
.compile
126+
.drain
127+
}
128+
117129
}

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.9.6
1+
sbt.version=1.9.7

0 commit comments

Comments
 (0)