Skip to content

Commit 3198244

Browse files
committed
use Channel for stream rx
1 parent dd102ce commit 3198244

File tree

2 files changed

+33
-11
lines changed

2 files changed

+33
-11
lines changed

.changeset/cold-goats-repair.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect-rx/rx": patch
3+
---
4+
5+
use Channel for stream rx

packages/rx/src/Rx.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
*/
44
/* eslint-disable @typescript-eslint/no-empty-object-type */
55
import { NoSuchElementException } from "effect/Cause"
6+
import type * as Cause from "effect/Cause"
7+
import * as Channel from "effect/Channel"
68
import * as Chunk from "effect/Chunk"
79
import * as Duration from "effect/Duration"
810
import * as Effect from "effect/Effect"
911
import * as Either from "effect/Either"
1012
import * as Exit from "effect/Exit"
11-
import { dual, pipe } from "effect/Function"
13+
import { constVoid, dual, pipe } from "effect/Function"
1214
import { globalValue } from "effect/GlobalValue"
1315
import * as Hash from "effect/Hash"
1416
import * as Inspectable from "effect/Inspectable"
@@ -654,15 +656,24 @@ function makeStream<A, E>(
654656
): Result.Result<A, E | NoSuchElementException> {
655657
const previous = ctx.self<Result.Result<A, E | NoSuchElementException>>()
656658

657-
const cancel = runCallbackSync(runtime)(
658-
Stream.runForEach(
659-
stream,
660-
(a) => Effect.sync(() => ctx.setSelfSync(Result.waiting(Result.success(a))))
661-
),
662-
(exit) => {
663-
if (exit._tag === "Failure") {
664-
ctx.setSelfSync(Result.failureWithPrevious(exit.cause, previous))
665-
} else {
659+
const writer: Channel.Channel<never, Chunk.Chunk<A>, never, E> = Channel.readWithCause({
660+
onInput(input: Chunk.Chunk<A>) {
661+
return Channel.suspend(() => {
662+
const last = Chunk.last(input)
663+
if (last._tag === "Some") {
664+
ctx.setSelfSync(Result.success(last.value, true))
665+
}
666+
return writer
667+
})
668+
},
669+
onFailure(cause: Cause.Cause<E>) {
670+
return Channel.suspend(() => {
671+
ctx.setSelfSync(Result.failureWithPrevious(cause, previous))
672+
return Channel.void
673+
})
674+
},
675+
onDone(_done: unknown) {
676+
return Channel.suspend(() => {
666677
pipe(
667678
ctx.self<Result.Result<A, E | NoSuchElementException>>(),
668679
Option.flatMap(Result.value),
@@ -671,8 +682,14 @@ function makeStream<A, E>(
671682
onSome: (a) => ctx.setSelfSync(Result.success(a))
672683
})
673684
)
674-
}
685+
return Channel.void
686+
})
675687
}
688+
})
689+
690+
const cancel = runCallbackSync(runtime)(
691+
Channel.runDrain(Channel.pipeTo(Stream.toChannel(stream), writer)),
692+
constVoid
676693
)
677694
if (cancel !== undefined) {
678695
ctx.addFinalizer(cancel)

0 commit comments

Comments
 (0)