Skip to content

Commit fec24d4

Browse files
committed
aggressively cut off previous fiber in useResultCallback
1 parent bcbad81 commit fec24d4

File tree

1 file changed

+47
-54
lines changed

1 file changed

+47
-54
lines changed

src/hooks/useResultCallback.ts

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
11
import * as Effect from "@effect/io/Effect"
22
import * as Fiber from "@effect/io/Fiber"
3-
import * as Queue from "@effect/io/Queue"
3+
import * as Ref from "@effect/io/Ref"
44
import * as Runtime from "@effect/io/Runtime"
55
import * as Stream from "@effect/stream/Stream"
66
import type { ResultBag } from "effect-react/hooks/useResultBag"
77
import { updateNext, useResultBag } from "effect-react/hooks/useResultBag"
88
import type { RuntimeContext } from "effect-react/internal/runtimeContext"
99
import * as Result from "effect-react/Result"
10-
import { useCallback, useContext, useEffect, useRef, useState } from "react"
10+
import { useCallback, useContext, useEffect, useState } from "react"
1111

12-
interface UseResultCallbackOptions {
13-
/**
14-
* Determines on-change behavior
15-
* If this is true, effects and streams from callbacks will be run through
16-
* on a best-effort basis instead of interrupting.
17-
* Before using this option, check if you do not actually want to use `Effect.uninterruptible` instead.
18-
*/
19-
uninterruptible?: true
12+
type FiberState<E> = { readonly _tag: "Idle" } | {
13+
readonly _tag: "Running"
14+
readonly fiber: Fiber.RuntimeFiber<E, void>
15+
readonly interruptingRef: Ref.Ref<boolean>
2016
}
2117

2218
export type UseResultCallback<R> = <Args extends Array<any>, R0 extends R, E, A>(
23-
callback: (...args: Args) => Effect.Effect<R0, E, A>,
24-
options?: UseResultCallbackOptions
19+
callback: (...args: Args) => Effect.Effect<R0, E, A>
2520
) => readonly [ResultBag<E, A>, (...args: Args) => void]
2621

2722
export const makeUseResultCallback: <R>(
@@ -30,57 +25,55 @@ export const makeUseResultCallback: <R>(
3025
runtimeContext: RuntimeContext<R>
3126
) =>
3227
<Args extends Array<any>, R0 extends R, E, A>(
33-
f: (...args: Args) => Stream.Stream<R0, E, A>,
34-
options?: UseResultCallbackOptions
28+
f: (...args: Args) => Stream.Stream<R0, E, A>
3529
) => {
36-
const runtime = useContext(runtimeContext)
37-
const fiberRef = useRef<Fiber.RuntimeFiber<E, void>>()
38-
const queueRef = useRef<Queue.Queue<[(...args: Args) => Stream.Stream<R0, E, A>, Args]>>()
39-
if (!queueRef.current) {
40-
queueRef.current = Effect.runSync(Queue.unbounded())
41-
}
4230
const [result, setResult] = useState<Result.Result<E, A>>(Result.initial())
4331
const [trackRef, resultBag] = useResultBag(result)
44-
45-
if (!fiberRef.current) {
46-
fiberRef.current = Stream.fromQueue(queueRef.current).pipe(
47-
Stream.tap(() =>
48-
Effect.sync(() => {
49-
setResult((prev) => updateNext(Result.waiting(prev), trackRef))
50-
})
51-
),
52-
Stream.flatMap(([f, args]) => f(...args), { switch: !options?.uninterruptible }),
53-
Stream.tap((value) =>
54-
Effect.sync(() => {
55-
setResult(updateNext(Result.success(value), trackRef))
56-
})
57-
),
58-
Stream.tapErrorCause((cause) =>
59-
Effect.sync(() => {
60-
setResult(updateNext(Result.failCause(cause), trackRef))
61-
})
62-
),
63-
Stream.runDrain,
64-
Runtime.runFork(runtime)
65-
)
66-
}
67-
6832
trackRef.current.currentStatus = result._tag
6933

70-
const run = useCallback((...args: Args) => {
71-
trackRef.current.invocationCount++
72-
queueRef.current!.unsafeOffer([f, args])
73-
}, [f])
74-
34+
const [fiberState, setFiberState] = useState<FiberState<E>>({ _tag: "Idle" })
7535
useEffect(() =>
7636
() => {
77-
if (queueRef.current) {
78-
Effect.runSync(Queue.shutdown(queueRef.current))
79-
}
80-
if (fiberRef.current) {
81-
Effect.runSync(Fiber.interruptFork(fiberRef.current))
37+
if (fiberState._tag === "Running") {
38+
Effect.runFork(Fiber.interruptFork(fiberState.fiber))
8239
}
8340
}, [])
8441

42+
const runtime = useContext(runtimeContext)
43+
const run = useCallback((...args: Args) => {
44+
if (fiberState._tag === "Running") {
45+
Effect.runSync(Ref.set(fiberState.interruptingRef, true))
46+
Effect.runFork(Fiber.interruptFork(fiberState.fiber))
47+
}
48+
49+
trackRef.current.invocationCount++
50+
51+
const interruptingRef = Ref.unsafeMake(false)
52+
const maybeSetResult = (result: Result.Result<E, A> | ((_: Result.Result<E, A>) => Result.Result<E, A>)) =>
53+
Effect.flatMap(
54+
Ref.get(interruptingRef),
55+
(interrupting) =>
56+
interrupting ? Effect.unit : Effect.sync(() => {
57+
setResult(result)
58+
})
59+
)
60+
61+
const fiber = Effect.sync(() => {
62+
setResult((prev) => updateNext(Result.waiting(prev), trackRef))
63+
}).pipe(
64+
Stream.flatMap(() => f(...args)),
65+
Stream.tap((value) => maybeSetResult(updateNext(Result.success(value), trackRef))),
66+
Stream.tapErrorCause((cause) => maybeSetResult(updateNext(Result.failCause(cause), trackRef))),
67+
Stream.runDrain,
68+
Runtime.runFork(runtime)
69+
)
70+
71+
setFiberState({
72+
_tag: "Running",
73+
fiber,
74+
interruptingRef
75+
})
76+
}, [f, fiberState])
77+
8578
return [resultBag, run] as const
8679
}

0 commit comments

Comments
 (0)