Skip to content

Commit 8cb98d5

Browse files
authored
ensure Stream.toReadableStream ignores empty chunks (#5047)
1 parent db2dd3c commit 8cb98d5

File tree

2 files changed

+21
-18
lines changed

2 files changed

+21
-18
lines changed

.changeset/red-taxis-turn.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
ensure Stream.toReadableStream ignores empty chunks

packages/effect/src/internal/stream.ts

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5401,25 +5401,21 @@ export const repeatWith = dual<
54015401
Ref.get(driver.iterationMeta)
54025402
)
54035403

5404-
const scheduleOutput = pipe(driver.last, Effect.orDie, Effect.map(options.onSchedule))
54055404
const process = pipe(self, provideLastIterationInfo, map(options.onElement), toChannel)
54065405
const loop: Channel.Channel<Chunk.Chunk<C>, unknown, E, unknown, void, unknown, R | R2> = channel.unwrap(
5407-
Effect.match(driver.next(void 0), {
5408-
onFailure: () => core.void,
5409-
onSuccess: () =>
5410-
pipe(
5411-
process,
5412-
channel.zipRight(
5413-
pipe(
5414-
scheduleOutput,
5415-
Effect.map((c) => pipe(core.write(Chunk.of(c)), core.flatMap(() => loop))),
5416-
channel.unwrap
5417-
)
5406+
Effect.match(
5407+
driver.next(void 0),
5408+
{
5409+
onFailure: () => core.void,
5410+
onSuccess: (output) =>
5411+
core.flatMap(
5412+
process,
5413+
() => channel.zipRight(core.write(Chunk.of(options.onSchedule(output))), loop)
54185414
)
5419-
)
5420-
})
5415+
}
5416+
)
54215417
)
5422-
return new StreamImpl(pipe(process, channel.zipRight(loop)))
5418+
return new StreamImpl(channel.zipRight(process, loop))
54235419
}),
54245420
unwrap
54255421
)
@@ -7218,15 +7214,17 @@ export const toReadableStreamRuntime = dual<
72187214

72197215
return new ReadableStream<A>({
72207216
start(controller) {
7221-
fiber = runFork(runForEachChunk(self, (chunk) =>
7222-
latch.whenOpen(Effect.sync(() => {
7217+
fiber = runFork(runForEachChunk(self, (chunk) => {
7218+
if (chunk.length === 0) return Effect.void
7219+
return latch.whenOpen(Effect.sync(() => {
72237220
latch.unsafeClose()
72247221
for (const item of chunk) {
72257222
controller.enqueue(item)
72267223
}
72277224
currentResolve!()
72287225
currentResolve = undefined
7229-
}))))
7226+
}))
7227+
}))
72307228
fiber.addObserver((exit) => {
72317229
try {
72327230
if (exit._tag === "Failure") {

0 commit comments

Comments
 (0)