Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2848,51 +2848,63 @@ val d: Int < Sync =
a.map(_.pending)
```

### Barrier: Multi-party Rendezvous
### Gate: Coordinated Concurrent Passage

The `Barrier` effect provides a synchronization primitive that allows a fixed number of parties to wait for each other to reach a common point of execution. It's particularly useful in scenarios where multiple fibers need to synchronize their progress.
`Gate` coordinates concurrent parties through a shared passage point — parties arrive and wait until all expected parties are present, then everyone passes through together. The gate then resets for the next group, like a revolving door.

Two variants are available: `Gate` for a fixed set of participants (similar to Java's `CyclicBarrier` but with pass tracking and non-blocking arrival), and `Gate.Dynamic` for dynamic membership with `join`/`leave` and hierarchical subgroups (similar to Java's `Phaser`).

Unlike `Latch`, which is asymmetric (some tasks release while others wait), `Gate` is symmetric: all parties participate equally and pass through together.

```scala
import kyo.*

// Initialize a barrier for 3 parties
val a: Barrier < Sync =
Barrier.init(3)
// Initialize a gate for 3 parties
val a: Gate < Sync =
Gate.initUnscoped(3)

// Wait for the barrier to be released
val b: Unit < Async =
a.map(_.await)
// Pass through the gate (blocks until all parties arrive)
val b: Unit < (Async & Abort[Closed]) =
a.map(_.pass)

// Get the number of parties still waiting
val c: Int < Sync =
a.map(_.pending)
// Signal arrival without waiting for others
val c: Unit < Sync =
a.map(_.arrive)

// Get the number of parties still expected
val d: Int < Sync =
a.map(_.pendingCount)

// Get how many times the gate has been fully passed through
val e: Int < Sync =
a.map(_.passCount)

// Example usage with multiple fibers
val d: Unit < Async =
val f: Unit < (Async & Abort[Closed]) =
for
barrier <- Barrier.init(3)
_ <- Async.zip(
barrier.await,
barrier.await,
barrier.await
)
gate <- Gate.initUnscoped(3)
_ <- Async.zip(
gate.pass,
gate.pass,
gate.pass
)
yield ()

// Fibers can join the barrier at different points of the computation
val e: Unit < Async =
// Fibers can join the gate at different points of the computation
val g: Unit < (Async & Abort[Closed]) =
for
barrier <- Barrier.init(3)
fiber1 <- Fiber.initUnscoped(Async.sleep(1.second))
fiber2 <- Fiber.initUnscoped(Async.sleep(2.seconds))
_ <- Async.zip(
fiber1.get.map(_ => barrier.await),
fiber2.get.map(_ => barrier.await),
Fiber.initUnscoped(barrier.await).map(_.get)
)
gate <- Gate.initUnscoped(3)
fiber1 <- Fiber.initUnscoped(Async.sleep(1.second))
fiber2 <- Fiber.initUnscoped(Async.sleep(2.seconds))
_ <- Async.zip(
fiber1.get.map(_ => gate.pass),
fiber2.get.map(_ => gate.pass),
Fiber.initUnscoped(gate.pass).map(_.get)
)
yield ()
```

The `Barrier` is initialized with a specific number of parties. Each party calls `await` when it reaches the barrier point. The barrier releases all waiting parties when the last party arrives. After all parties have been released, the barrier cannot be reset or reused.
Use `init` (with `Scope`) for automatic cleanup or `initUnscoped` with manual `close`. A custom stop condition or fixed pass count can be provided at creation to auto-close the gate.

### Atomic: Concurrent State

Expand Down
95 changes: 0 additions & 95 deletions kyo-core/shared/src/main/scala/kyo/Barrier.scala

This file was deleted.

Loading
Loading