Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ libraryDependencies += "io.github.arashi01" %%% "tausi-cats" % "0.0.1-SNAPSHOT"

// Optional: ZIO Integration
libraryDependencies += "io.github.arashi01" %%% "tausi-zio" % "0.0.1-SNAPSHOT"

// Optional: Laminar Integration (stream-to-Laminar bridge)
libraryDependencies += "io.github.arashi01" %%% "tausi-laminar" % "0.0.1-SNAPSHOT"
```

## Usage
Expand Down Expand Up @@ -171,6 +174,60 @@ button(
)
```

### Laminar Integration (Stream-to-Laminar Bridge)

Tausi provides a type-safe bridge between effect streams (ZIO ZStream, fs2 Stream) and Laminar observables:

```scala
// Add dependency
libraryDependencies += "io.github.arashi01" %%% "tausi-laminar" % "0.0.1-SNAPSHOT"
```

```scala
import tausi.laminar.*
import tausi.zio.ZStreamIO
import tausi.zio.ZStreamIO.given
import zio.stream.ZStream

// Create a ZIO stream
val counter: ZStreamIO[Int] = ZStream.iterate(0)(_ + 1).take(10)

// Tier 1: Unsafe (errors logged, dropped) - for prototyping
val eventStream: EventStream[Int] = counter.toStreamUnsafe

// Tier 2: Either-based (errors as values)
val signal: Signal[Either[TauriError, Int]] =
counter.toSignal(Left(TauriError.StreamError("Loading...")))

// Tier 3: Full state (recommended for production)
val stateSignal: Signal[StreamState[Int]] = counter.toStateSignal
```

The `StreamState` ADT provides full lifecycle visibility:

```scala
enum StreamState[+A]:
case Running // Stream is loading
case Value(value: A) // Latest value received
case Failed(error: TauriError) // Stream failed
case Completed // Stream completed (no final value)
case CompletedWith(value: A) // Stream completed with final value
```

Usage in Laminar:

```scala
div(
child <-- myStream.toStateSignal.map {
case StreamState.Running => div(cls := "spinner", "Loading...")
case StreamState.Value(data) => renderData(data)
case StreamState.Failed(err) => div(cls := "error", err.message)
case StreamState.Completed => div("Done")
case StreamState.CompletedWith(d) => renderData(d)
}
)
```

## Defining Custom Commands

For custom Tauri plugins, define commands using the `Command.define` factory:
Expand Down
18 changes: 16 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,27 @@ val `tausi-zio` =
.settings(libraryDependencies += libraries.`zio-streams`.value)
.settings(libraryDependencies += libraries.`munit-zio`.value)

val `tausi-laminar` =
project
.in(file("modules/laminar"))
.enablePlugins(ScalaJSPlugin)
.settings(compilerSettings)
.settings(unitTestSettings)
.settings(fileHeaderSettings)
.settings(publishSettings)
.dependsOn(`tausi-api`.js)
.settings(libraryDependencies += libraries.laminar.value)

val `tausi-sample` =
project
.in(file("modules/sample"))
.enablePlugins(ScalaJSPlugin)
.dependsOn(`tausi-zio`)
.dependsOn(`tausi-zio`, `tausi-laminar`)
.settings(unitTestSettings)
.settings(libraryDependencies += libraries.laminar.value)
.settings(libraryDependencies += libraries.waypoint.value)
.settings(libraryDependencies += libraries.`scala-java-time`.value)
.settings(libraryDependencies += libraries.`munit-zio`.value)
.settings(scalaJSUseMainModuleInitializer := true)
.settings(scalaJSLinkerConfig ~= { c =>
import org.scalajs.linker.interface.*
Expand All @@ -118,7 +131,8 @@ val `tausi-js` =
.aggregate(
`tausi-api`.js,
`tausi-cats`,
`tausi-zio`
`tausi-zio`,
`tausi-laminar`
)

lazy val `tausi-root` =
Expand Down
28 changes: 21 additions & 7 deletions modules/api/js/src/main/scala/tausi/api/TauriError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ object TauriError:
cause: Option[Throwable] = None
) extends TauriError(message, cause)

/** Error occurred during stream operations.
*
* @param message Description of what went wrong
* @param cause Optional underlying exception
*/
final case class StreamError(
message: String,
cause: Option[Throwable] = None
) extends TauriError(message, cause)

/** Error occurred during event operations.
*
* @param eventName The event name
Expand Down Expand Up @@ -180,6 +190,11 @@ object TauriError:
def apply(channelId: CallbackId, message: String): ChannelError =
new ChannelError(channelId, message, None)

object StreamError:
/** Create StreamError without cause */
def apply(message: String): StreamError =
new StreamError(message, None)

object EventError:
/** Create EventError without cause */
def apply(eventName: String, message: String): EventError =
Expand All @@ -199,27 +214,29 @@ object TauriError:

extension (error: TauriError)
/** Extract human-readable error message from any TauriError variant. */
def message: String = error match
inline def message: String = error match
case e: InvokeError => e.message
case e: PluginError => e.message
case e: PermissionError => e.message
case e: ResourceError => e.message
case e: ConversionError => e.message
case e: CallbackError => e.message
case e: ChannelError => e.message
case e: StreamError => e.message
case e: EventError => e.message
case e: GenericError => e.message
case e: TauriNotAvailableError => e.message

/** Extract optional underlying cause from any TauriError variant. */
def cause: Option[Throwable] = error match
inline def cause: Option[Throwable] = error match
case e: InvokeError => e.cause
case e: PluginError => e.cause
case e: PermissionError => e.cause
case e: ResourceError => e.cause
case e: ConversionError => e.cause
case e: CallbackError => e.cause
case e: ChannelError => e.cause
case e: StreamError => e.cause
case e: EventError => e.cause
case e: GenericError => e.cause
case _: TauriNotAvailableError => None
Expand All @@ -231,12 +248,9 @@ object TauriError:

/** Wrap any Throwable into a TauriError.
*
* If already a TauriError, returns as-is. Otherwise, wraps in GenericError.
*
* @param t The throwable to wrap
* @return TauriError variant
* Returns the input unchanged if already a TauriError, otherwise wraps in [[GenericError]].
*/
def fromThrowable(t: Throwable): TauriError = t match
inline def fromThrowable(t: Throwable): TauriError = t match
case e: TauriError => e
case e => GenericError(s"Unexpected error: ${e.getMessage}", Some(e))

Expand Down
93 changes: 93 additions & 0 deletions modules/api/js/src/main/scala/tausi/api/stream/StreamSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2025 Tausi contributors.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package tausi.api.stream

import tausi.api.TauriError

/** Type class for subscribing to effect streams via callbacks.
*
* This abstraction enables UI frameworks (such as Laminar) to consume streams from any effect
* system (ZIO, Cats Effect, or custom) without depending on those effect systems directly. Effect
* modules provide instances; UI modules consume them through the type class interface.
*
* The callback-based design allows bridging between push-based effect streams and push-based UI
* observables without buffering or complex synchronisation.
*
* ==Type Parameters==
* - `S[_]`: The stream type constructor (e.g., `ZStream[Any, TauriError, *]` or `Stream[IO, *]`)
*
* ==Error Handling==
* All errors are normalised to [[TauriError]] variants, ensuring consistent error handling
* across effect systems. This follows the errors-as-values principle - errors are never
* thrown or silently dropped.
*
* ==Thread Safety==
* Implementations must ensure that:
* - Callbacks may be invoked from any thread
* - The returned [[StreamSubscription]] is safe to cancel from any thread
* - Cancellation stops further callback invocations (though in-flight calls may complete)
*
* ==Lifecycle==
* The subscription lifecycle is:
* 1. [[subscribe]] is called with callbacks
* 2. `onNext` is invoked for each stream element
* 3. Either `onError` or `onComplete` is invoked exactly once when the stream terminates
* 4. The returned [[StreamSubscription]] can be used to cancel early
*
* @see
* [[StreamSubscription]] for the cancellation handle
*/
trait StreamSource[S[_]]:

extension [A](stream: S[A])
/** Subscribe to the stream with callbacks for elements, errors, and completion.
*
* The subscription begins immediately upon calling this method. Elements are delivered via
* `onNext`, and the stream terminates with either `onError` or `onComplete` (never both).
*
* All errors are normalised to [[TauriError]] variants, ensuring consistent error handling
* across effect systems. Upstream errors are wrapped in [[TauriError.StreamError]].
*
* @param onNext
* Callback invoked for each element emitted by the stream
* @param onError
* Callback invoked if the stream fails with a [[TauriError]] (terminal)
* @param onComplete
* Callback invoked when the stream completes successfully (terminal)
* @return
* A [[StreamSubscription]] that can be used to cancel the subscription
*/
def subscribe(onNext: A => Unit, onError: TauriError => Unit, onComplete: () => Unit): StreamSubscription
end extension

end StreamSource

/** Companion for [[StreamSource]]. Provides summoner method. */
object StreamSource:

/** Summoner for type class instances.
*
* @tparam S
* The stream type constructor
* @return
* The [[StreamSource]] instance for `S`
*/
inline def apply[S[_]](using source: StreamSource[S]): StreamSource[S] = source
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2025 Tausi contributors.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package tausi.api.stream

/** Handle for cancelling a stream subscription.
*
* Represents a leaky resource that must be cleaned up when the subscription is no longer needed.
* Calling [[cancel]] releases all resources associated with the subscription and stops receiving
* new elements.
*
* [[cancel]] is idempotent and safe to call concurrently; only the first
* invocation has any effect.
*
* @see
* [[StreamSource]] for creating subscriptions from effect streams
*/
trait StreamSubscription:

/** Cancel the subscription and release all associated resources.
*
* This method is idempotent; calling it multiple times has no additional effect after the first
* call. Once cancelled, no further elements will be delivered to the subscriber.
*
* Implementations must not throw exceptions.
*/
def cancel(): Unit

/** Companion for [[StreamSubscription]]. Provides factory method. */
object StreamSubscription:

/** Concrete implementation of [[StreamSubscription]]. */
final class Impl @scala.annotation.publicInBinary private[StreamSubscription] (
cancelFn: () => Unit
) extends StreamSubscription:
private val cancelled = new java.util.concurrent.atomic.AtomicBoolean(false)

override def cancel(): Unit =
if cancelled.compareAndSet(false, true) then cancelFn()

/** Create a subscription from a cancel function.
*
* The returned subscription guarantees `cancelFn` is invoked at most once,
* even when [[cancel]] is called concurrently from multiple threads.
*
* @param cancelFn
* Function to invoke on cancellation. Must be idempotent and must not throw.
*/
inline def apply(cancelFn: () => Unit): StreamSubscription =
new Impl(cancelFn)

given CanEqual[StreamSubscription, StreamSubscription] = CanEqual.derived
end StreamSubscription
Loading