Skip to content

Commit 15c202c

Browse files
authored
Merge pull request #9 from arashi01/laminar_stream_subscriptions
Fix Laminar stream subscription and clean ZIO event handling
2 parents 0771be7 + 202681a commit 15c202c

File tree

5 files changed

+27
-60
lines changed

5 files changed

+27
-60
lines changed

modules/laminar/src/main/scala/tausi/laminar/extensions.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ extension [S[_]: StreamSource, A](stream: S[A])
223223
*/
224224
def toStateSignal(using owner: Owner): Signal[StreamState[A]] =
225225
val variable = Var[StreamState[A]](StreamState.Running)
226+
226227
val subscription = stream.subscribe(
227228
onNext = a => variable.set(StreamState.Value(a)),
228229
onError = err => variable.set(StreamState.Failed(err)),
@@ -231,7 +232,11 @@ extension [S[_]: StreamSource, A](stream: S[A])
231232
case StreamState.Value(lastValue) => variable.set(StreamState.CompletedWith(lastValue))
232233
case _ => variable.set(StreamState.Completed)
233234
)
235+
236+
// Register the subscription with the Owner for proper lifecycle management.
237+
// When the Owner is killed (e.g., component unmounts), the subscription is automatically cancelled.
234238
registerCleanup(owner, subscription)
239+
235240
variable.signal
236241
end toStateSignal
237242

modules/sample/src/main/scala/tausi/sample/components/EventLog.scala

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -54,45 +54,8 @@ object EventLog:
5454
given Runtime[Any] = Runtime.default
5555

5656
def apply(): HtmlElement =
57-
// State var to hold the current stream state
58-
val stateVar: Var[StreamState[EventMessage[SurveySubmittedEvent]]] =
59-
Var(StreamState.Running)
60-
6157
div(
6258
cls := "fixed bottom-4 right-4 w-80 bg-surface-800 border border-border rounded-lg shadow-lg overflow-hidden",
63-
// Set up stream subscription when mounted
64-
onMountUnmountCallback(
65-
mount = ctx => {
66-
given Owner = ctx.owner
67-
68-
org.scalajs.dom.console.log("[EventLog] Component mounted, setting up stream subscription")
69-
70-
// Create ZStream for survey submission events
71-
// This stream emits EventMessage[SurveySubmittedEvent] for each event
72-
val submissionStream = events.stream[SurveySubmittedEvent]
73-
74-
org.scalajs.dom.console.log("[EventLog] Created submissionStream, calling toStateSignal")
75-
76-
// Convert to Laminar Signal with full lifecycle state
77-
// StreamSource[ZStreamIO] instance (from tausi.zio.ZStreamIO.given)
78-
// bridges ZIO streams to Laminar observables
79-
val stateSignal: Signal[StreamState[EventMessage[SurveySubmittedEvent]]] =
80-
submissionStream.toStateSignal
81-
82-
org.scalajs.dom.console.log("[EventLog] toStateSignal returned, subscribing to signal")
83-
84-
// Forward stream state to our Var
85-
stateSignal.foreach { state =>
86-
org.scalajs.dom.console.log(s"[EventLog] Signal state changed: $state")
87-
stateVar.set(state)
88-
}
89-
90-
org.scalajs.dom.console.log("[EventLog] Stream subscription complete")
91-
},
92-
unmount = _ => {
93-
org.scalajs.dom.console.log("[EventLog] Component unmounted")
94-
}
95-
),
9659
// Header
9760
div(
9861
cls := "bg-surface-700 px-4 py-2 border-b border-border flex items-center justify-between",
@@ -104,9 +67,17 @@ object EventLog:
10467
span(cls := "text-xs text-text-muted", "Live")
10568
),
10669
// Stream content - renders based on stream state
70+
// The stream subscription is created when the element is mounted,
71+
// ensuring the Laminar Owner is available for lifecycle management
10772
div(
10873
cls := "p-4 max-h-48 overflow-y-auto",
109-
child <-- stateVar.signal.map(renderStreamState)
74+
onMountCallback { ctx =>
75+
given Owner = ctx.owner
76+
// Subscribe to the stream within the mount context where Owner is available
77+
ctx.thisNode.amend(
78+
child <-- events.stream[SurveySubmittedEvent].toStateSignal.map(renderStreamState)
79+
)
80+
}
11081
),
11182
// Footer with pattern documentation
11283
div(

modules/sample/src/main/scala/tausi/sample/pages/CompletePage.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,31 +71,23 @@ object CompletePage:
7171
state.isSubmitting.set(false)
7272
state.submissionResult.set(Some(response))
7373

74-
org.scalajs.dom.console.log("[CompletePage] Emitting success event")
7574
// Emit success event (demonstrates event emission)
7675
events.emit(SurveySubmittedEvent(
7776
success = true,
7877
filePath = Some(response.filePath),
7978
errorMessage = None
80-
)).runWith(
81-
onSuccess = _ => org.scalajs.dom.console.log("[CompletePage] Success event emitted successfully"),
82-
onError = err => org.scalajs.dom.console.error(s"[CompletePage] Failed to emit success event: ${err.message}")
83-
)
79+
)).runWithUnsafe()
8480
},
8581
onError = err => {
8682
state.isSubmitting.set(false)
8783
state.submissionError.set(Some(err.message))
8884

89-
org.scalajs.dom.console.log("[CompletePage] Emitting failure event")
9085
// Emit failure event
9186
events.emit(SurveySubmittedEvent(
9287
success = false,
9388
filePath = None,
9489
errorMessage = Some(err.message)
95-
)).runWith(
96-
onSuccess = _ => org.scalajs.dom.console.log("[CompletePage] Failure event emitted successfully"),
97-
onError = err => org.scalajs.dom.console.error(s"[CompletePage] Failed to emit failure event: ${err.message}")
98-
)
90+
)).runWithUnsafe()
9991
}
10092
)
10193

modules/zio/src/main/scala/tausi/zio/ZStreamIO.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ object ZStreamIO:
9494
}
9595

9696
StreamSubscription { () =>
97-
// Use interruptFork (fire-and-forget) rather than interrupt.
98-
// The interrupt effect cannot be run synchronously in JS because
99-
// unsafe.run would block waiting for the fiber to complete.
97+
// Interrupt the fibre on cancellation.
98+
// This properly terminates the stream and releases resources.
10099
Unsafe.unsafe { (unsafe: Unsafe) =>
101100
given Unsafe = unsafe
101+
// interruptFork is fire-and-forget, allowing synchronous cancellation in JS
102102
runtime.unsafe.run(fibre.interruptFork).getOrThrowFiberFailure()
103103
}
104104
}

modules/zio/src/main/scala/tausi/zio/package.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -304,23 +304,22 @@ package object zio:
304304
ev: tausi.api.Event[A],
305305
trace: Trace
306306
): ZStream[Any, TauriError, EventMessage[A]] =
307-
org.scalajs.dom.console.log(s"[events.stream] Creating stream for event: ${ev.name}")
308307
ZStream.scoped {
309-
org.scalajs.dom.console.log("[events.stream] Scoped effect starting")
310308
for
311309
queue <- ZIO.acquireRelease(Queue.unbounded[Either[TauriError.EventError, EventMessage[A]]])(_.shutdown)
312-
_ = org.scalajs.dom.console.log("[events.stream] Queue created")
313310
eitherHandler: (Either[TauriError.EventError, EventMessage[A]] => Unit) =
314311
result =>
315-
org.scalajs.dom.console.log(s"[events.stream] Event handler invoked with: $result")
316312
_root_.zio.Unsafe.unsafe { implicit unsafe =>
317-
_root_.zio.Runtime.default.unsafe.run(queue.offer(result).unit).getOrThrowFiberFailure()
313+
// Offer to queue synchronously. If queue is shut down, offer will fail silently.
314+
// We ignore the result because queue shutdown is expected during cleanup.
315+
_root_.zio.Runtime.default.unsafe
316+
.run(
317+
queue.offer(result).ignore
318+
)
319+
.getOrThrowFiberFailure(): Unit
318320
}
319321
handle <- ZIO.acquireRelease(
320-
ZIO.succeed(org.scalajs.dom.console.log("[events.stream] Registering event listener")) *>
321-
listen(eitherHandler, options).tap(h =>
322-
ZIO.succeed(org.scalajs.dom.console.log(s"[events.stream] Listener registered with handle: $h"))
323-
)
322+
listen(eitherHandler, options)
324323
)(handle => unlisten(handle).orDie)
325324
yield ZStream.fromQueue(queue).mapZIO(ZIO.fromEither(_).mapError(identity))
326325
end for

0 commit comments

Comments
 (0)