Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/laminar/src/main/scala/tausi/laminar/extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ extension [S[_]: StreamSource, A](stream: S[A])
*/
def toStateSignal(using owner: Owner): Signal[StreamState[A]] =
val variable = Var[StreamState[A]](StreamState.Running)

val subscription = stream.subscribe(
onNext = a => variable.set(StreamState.Value(a)),
onError = err => variable.set(StreamState.Failed(err)),
Expand All @@ -231,7 +232,11 @@ extension [S[_]: StreamSource, A](stream: S[A])
case StreamState.Value(lastValue) => variable.set(StreamState.CompletedWith(lastValue))
case _ => variable.set(StreamState.Completed)
)

// Register the subscription with the Owner for proper lifecycle management.
// When the Owner is killed (e.g., component unmounts), the subscription is automatically cancelled.
registerCleanup(owner, subscription)

variable.signal
end toStateSignal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,45 +54,8 @@ object EventLog:
given Runtime[Any] = Runtime.default

def apply(): HtmlElement =
// State var to hold the current stream state
val stateVar: Var[StreamState[EventMessage[SurveySubmittedEvent]]] =
Var(StreamState.Running)

div(
cls := "fixed bottom-4 right-4 w-80 bg-surface-800 border border-border rounded-lg shadow-lg overflow-hidden",
// Set up stream subscription when mounted
onMountUnmountCallback(
mount = ctx => {
given Owner = ctx.owner

org.scalajs.dom.console.log("[EventLog] Component mounted, setting up stream subscription")

// Create ZStream for survey submission events
// This stream emits EventMessage[SurveySubmittedEvent] for each event
val submissionStream = events.stream[SurveySubmittedEvent]

org.scalajs.dom.console.log("[EventLog] Created submissionStream, calling toStateSignal")

// Convert to Laminar Signal with full lifecycle state
// StreamSource[ZStreamIO] instance (from tausi.zio.ZStreamIO.given)
// bridges ZIO streams to Laminar observables
val stateSignal: Signal[StreamState[EventMessage[SurveySubmittedEvent]]] =
submissionStream.toStateSignal

org.scalajs.dom.console.log("[EventLog] toStateSignal returned, subscribing to signal")

// Forward stream state to our Var
stateSignal.foreach { state =>
org.scalajs.dom.console.log(s"[EventLog] Signal state changed: $state")
stateVar.set(state)
}

org.scalajs.dom.console.log("[EventLog] Stream subscription complete")
},
unmount = _ => {
org.scalajs.dom.console.log("[EventLog] Component unmounted")
}
),
// Header
div(
cls := "bg-surface-700 px-4 py-2 border-b border-border flex items-center justify-between",
Expand All @@ -104,9 +67,17 @@ object EventLog:
span(cls := "text-xs text-text-muted", "Live")
),
// Stream content - renders based on stream state
// The stream subscription is created when the element is mounted,
// ensuring the Laminar Owner is available for lifecycle management
div(
cls := "p-4 max-h-48 overflow-y-auto",
child <-- stateVar.signal.map(renderStreamState)
onMountCallback { ctx =>
given Owner = ctx.owner
// Subscribe to the stream within the mount context where Owner is available
ctx.thisNode.amend(
child <-- events.stream[SurveySubmittedEvent].toStateSignal.map(renderStreamState)
)
}
),
// Footer with pattern documentation
div(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,23 @@ object CompletePage:
state.isSubmitting.set(false)
state.submissionResult.set(Some(response))

org.scalajs.dom.console.log("[CompletePage] Emitting success event")
// Emit success event (demonstrates event emission)
events.emit(SurveySubmittedEvent(
success = true,
filePath = Some(response.filePath),
errorMessage = None
)).runWith(
onSuccess = _ => org.scalajs.dom.console.log("[CompletePage] Success event emitted successfully"),
onError = err => org.scalajs.dom.console.error(s"[CompletePage] Failed to emit success event: ${err.message}")
)
)).runWithUnsafe()
},
onError = err => {
state.isSubmitting.set(false)
state.submissionError.set(Some(err.message))

org.scalajs.dom.console.log("[CompletePage] Emitting failure event")
// Emit failure event
events.emit(SurveySubmittedEvent(
success = false,
filePath = None,
errorMessage = Some(err.message)
)).runWith(
onSuccess = _ => org.scalajs.dom.console.log("[CompletePage] Failure event emitted successfully"),
onError = err => org.scalajs.dom.console.error(s"[CompletePage] Failed to emit failure event: ${err.message}")
)
)).runWithUnsafe()
}
)

Expand Down
6 changes: 3 additions & 3 deletions modules/zio/src/main/scala/tausi/zio/ZStreamIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ object ZStreamIO:
}

StreamSubscription { () =>
// Use interruptFork (fire-and-forget) rather than interrupt.
// The interrupt effect cannot be run synchronously in JS because
// unsafe.run would block waiting for the fiber to complete.
// Interrupt the fibre on cancellation.
// This properly terminates the stream and releases resources.
Unsafe.unsafe { (unsafe: Unsafe) =>
given Unsafe = unsafe
// interruptFork is fire-and-forget, allowing synchronous cancellation in JS
runtime.unsafe.run(fibre.interruptFork).getOrThrowFiberFailure()
}
}
Expand Down
17 changes: 8 additions & 9 deletions modules/zio/src/main/scala/tausi/zio/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,23 +304,22 @@ package object zio:
ev: tausi.api.Event[A],
trace: Trace
): ZStream[Any, TauriError, EventMessage[A]] =
org.scalajs.dom.console.log(s"[events.stream] Creating stream for event: ${ev.name}")
ZStream.scoped {
org.scalajs.dom.console.log("[events.stream] Scoped effect starting")
for
queue <- ZIO.acquireRelease(Queue.unbounded[Either[TauriError.EventError, EventMessage[A]]])(_.shutdown)
_ = org.scalajs.dom.console.log("[events.stream] Queue created")
eitherHandler: (Either[TauriError.EventError, EventMessage[A]] => Unit) =
result =>
org.scalajs.dom.console.log(s"[events.stream] Event handler invoked with: $result")
_root_.zio.Unsafe.unsafe { implicit unsafe =>
_root_.zio.Runtime.default.unsafe.run(queue.offer(result).unit).getOrThrowFiberFailure()
// Offer to queue synchronously. If queue is shut down, offer will fail silently.
// We ignore the result because queue shutdown is expected during cleanup.
_root_.zio.Runtime.default.unsafe
.run(
queue.offer(result).ignore
)
.getOrThrowFiberFailure(): Unit
}
handle <- ZIO.acquireRelease(
ZIO.succeed(org.scalajs.dom.console.log("[events.stream] Registering event listener")) *>
listen(eitherHandler, options).tap(h =>
ZIO.succeed(org.scalajs.dom.console.log(s"[events.stream] Listener registered with handle: $h"))
)
listen(eitherHandler, options)
)(handle => unlisten(handle).orDie)
yield ZStream.fromQueue(queue).mapZIO(ZIO.fromEither(_).mapError(identity))
end for
Expand Down