diff --git a/.scalafmt.conf b/.scalafmt.conf index 7089d68c..735690dc 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -11,6 +11,7 @@ rewrite { expand = false sort = ascii groups = [ + ["language\\..*"], ["gears\\..*"], ["java.?\\..*", "scala\\..*"], ] diff --git a/build.sbt b/build.sbt index 48e550f6..7cf37b3b 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,10 @@ import org.scalajs.linker.interface.ESVersion import sbtcrossproject.CrossPlugin.autoImport.{CrossType, crossProject} import scalanative.build._ -ThisBuild / scalaVersion := "3.3.5" +val scala3Version = "3.8.0-RC1-bin-20250819-1f13619-NIGHTLY" +val scala = scala3Version +ThisBuild / scalaVersion := scala +ThisBuild / resolvers += ("Artifactory" at "https://repo.scala-lang.org/artifactory/maven-nightlies/") publish / skip := true @@ -28,7 +31,11 @@ lazy val root = name := "Gears", versionScheme := Some("early-semver"), libraryDependencies += "org.scalameta" %%% "munit" % "1.1.1" % Test, - testFrameworks += new TestFramework("munit.Framework") + testFrameworks += new TestFramework("munit.Framework"), + scalacOptions ++= Seq( + // "-Ycc-debug", + // "-Xprint:cc" + ) ) ) .jvmSettings( @@ -45,7 +52,6 @@ lazy val root = ) .jsSettings( Seq( - scalaVersion := "3.7.0", // Emit ES modules with the Wasm backend scalaJSLinkerConfig := { scalaJSLinkerConfig.value diff --git a/js/src/main/scala/async/WasmJSPISuspend.scala b/js/src/main/scala/async/WasmJSPISuspend.scala index d9dae09d..c50ca7f7 100644 --- a/js/src/main/scala/async/WasmJSPISuspend.scala +++ b/js/src/main/scala/async/WasmJSPISuspend.scala @@ -1,5 +1,7 @@ package gears.async.js +import language.experimental.captureChecking + import gears.async.* import scala.compiletime.uninitialized @@ -28,7 +30,7 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport: * Due to the promise possibly changing over time, within [[boundary]], we have to dynamically resolve the reference * _after_ running the `body`. */ - protected class WasmLabel[T](): + protected class WasmLabel[T]() extends caps.SharedCapability: var (promise, resolve) = mkPromise[T] def reset() = @@ -37,12 +39,12 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport: resolve = q /** Creates a new [[js.Promise]] and returns both the Promise and its `resolve` function. */ - inline def mkPromise[T]: (js.Promise[T], T => Any) = - var resolve: (T => Any) | Null = null + inline def mkPromise[T]: (js.Promise[T], T -> Any) = + var resolve: (T -> Any) | Null = null val promise = js.Promise[T]((res, rej) => resolve = res) (promise, resolve) - protected class WasmSuspension[-T, +R](label: Label[R], resolve: T => Any) extends gears.async.Suspension[T, R]: + protected class WasmSuspension[-T, +R](label: WasmLabel[R], resolve: T => Any) extends gears.async.Suspension[T, R]: def resume(arg: T): R = label.reset() resolve(arg) @@ -50,11 +52,11 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport: // Implementation of the [[SuspendSupport]] interface. - opaque type Label[T] = WasmLabel[T] + type Label[T, Cap^] = WasmLabel[T] - opaque type Suspension[-T, +R] <: gears.async.Suspension[T, R] = WasmSuspension[T, R] + type Suspension[-T, +R] = WasmSuspension[T, R] - override def boundary[T](body: Label[T] ?=> T): T = + override def boundary[T, Cap^](body: Label[T, Cap]^ ?->{Cap} T): T = val label = WasmLabel[T]() JSPI.async: val r = body(using label) @@ -66,10 +68,13 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport: * @note * Should return immediately if resume is called from within body */ - override def suspend[T, R](body: Suspension[T, R] => R)(using label: Label[R]): T = + override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap} ->{Cap} R)(using label: Label[R, Cap]^): T = val (suspPromise, suspResolve) = mkPromise[T] val suspend = WasmSuspension[T, R](label, suspResolve) - label.resolve(body(suspend)) + label.resolve(body( + // SAFETY: will only be stored and returned by the Suspension resumption mechanism + caps.unsafe.unsafeAssumePure(suspend) + )) JSPI.await(suspPromise) end WasmJSPISuspend @@ -102,7 +107,7 @@ final class WasmAsyncSupport(using AsyncToken) extends AsyncSupport with WasmJSP */ private[async] class JsAsync(val group: CompletionGroup)(using support: WasmAsyncSupport, sched: JsAsyncScheduler.type) extends Async(using support, sched): - override def await[T](src: Async.Source[T]) = + override def await[T](src: Async.Source[T]^) = src .poll() .getOrElse: diff --git a/jvm/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala index c61eca85..8f3b8770 100644 --- a/jvm/src/main/scala/PosixLikeIO/PIO.scala +++ b/jvm/src/main/scala/PosixLikeIO/PIO.scala @@ -1,5 +1,8 @@ package PosixLikeIO +import language.experimental.captureChecking +import caps.CapSet + import gears.async.Scheduler import gears.async.default.given import gears.async.{Async, Future} @@ -17,7 +20,7 @@ import scala.util.{Failure, Success, Try} import Future.Promise object File: - extension (resolver: Future.Resolver[Int]) + extension[Cap^] (resolver: Future.Resolver[Int, Cap]) private[File] def toCompletionHandler = new CompletionHandler[Integer, ByteBuffer] { override def completed(result: Integer, attachment: ByteBuffer): Unit = resolver.resolve(result) override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e) @@ -44,7 +47,7 @@ class File(val path: String) { def read(buffer: ByteBuffer): Future[Int] = assert(channel.isDefined) - Future.withResolver[Int]: resolver => + Future.withResolver[Int, CapSet]: resolver => channel.get.read( buffer, 0, @@ -57,7 +60,7 @@ class File(val path: String) { assert(size >= 0) val buffer = ByteBuffer.allocate(size) - Future.withResolver[String]: resolver => + Future.withResolver[String, CapSet]: resolver => channel.get.read( buffer, 0, @@ -72,7 +75,7 @@ class File(val path: String) { def write(buffer: ByteBuffer): Future[Int] = assert(channel.isDefined) - Future.withResolver[Int]: resolver => + Future.withResolver[Int, CapSet]: resolver => channel.get.write( buffer, 0, @@ -114,7 +117,7 @@ class SocketUDP() { def send(data: ByteBuffer, address: String, port: Int): Future[Unit] = assert(socket.isDefined) - Future.withResolver: resolver => + Future.withResolver[Unit, CapSet]: resolver => resolver.spawn: val packet: DatagramPacket = new DatagramPacket(data.array(), data.limit(), InetAddress.getByName(address), port) @@ -123,7 +126,7 @@ class SocketUDP() { def receive(): Future[DatagramPacket] = assert(socket.isDefined) - Future.withResolver: resolver => + Future.withResolver[DatagramPacket, CapSet]: resolver => resolver.spawn: val buffer = Array.fill[Byte](10 * 1024)(0) val packet: DatagramPacket = DatagramPacket(buffer, 10 * 1024) @@ -138,7 +141,7 @@ class SocketUDP() { } object SocketUDP: - extension [T](resolver: Future.Resolver[T]) + extension [T, Cap^](resolver: Future.Resolver[T, Cap]) private[SocketUDP] inline def spawn(body: => T)(using s: Scheduler) = s.execute(() => resolver.complete(Try(body).recover { case _: InterruptedException => diff --git a/jvm/src/main/scala/async/DefaultSupport.scala b/jvm/src/main/scala/async/DefaultSupport.scala index 7f12d2d4..53dbe01e 100644 --- a/jvm/src/main/scala/async/DefaultSupport.scala +++ b/jvm/src/main/scala/async/DefaultSupport.scala @@ -4,4 +4,4 @@ import gears.async._ given AsyncOperations = JvmAsyncOperations given VThreadSupport.type = VThreadSupport -given VThreadSupport.Scheduler = VThreadScheduler +given VThreadScheduler.type = VThreadScheduler diff --git a/jvm/src/main/scala/async/JvmAsyncOperations.scala b/jvm/src/main/scala/async/JvmAsyncOperations.scala index 5db38822..01a4f90c 100644 --- a/jvm/src/main/scala/async/JvmAsyncOperations.scala +++ b/jvm/src/main/scala/async/JvmAsyncOperations.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + object JvmAsyncOperations extends AsyncOperations: override def sleep(millis: Long)(using Async): Unit = jvmInterruptible(Thread.sleep(millis)) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 8b2dc371..2160a794 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -1,9 +1,13 @@ package gears.async +import language.experimental.captureChecking + import java.lang.invoke.{MethodHandles, VarHandle} import java.util.concurrent.locks.ReentrantLock import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration +import scala.annotation.constructorOnly +import scala.collection.mutable object VThreadScheduler extends Scheduler: private val VTFactory = Thread @@ -14,10 +18,18 @@ object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = val th = VTFactory.newThread(body) th.start() + () + + private[gears] inline def unsafeExecute(body: Runnable^): Unit = execute(caps.unsafe.unsafeAssumePure(body)) + + override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = + import caps.unsafe.unsafeAssumePure - override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduledRunnable(delay, body) + val sr = ScheduledRunnable(delay, body) + // SAFETY: should not be able to access body, only for cancellation + sr.unsafeAssumePure: Cancellable - private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable) extends Cancellable { + private final class ScheduledRunnable(delay: FiniteDuration, body: Runnable) extends Cancellable: @volatile var interruptGuard = true // to avoid interrupting the body val th = VTFactory.newThread: () => @@ -28,7 +40,7 @@ object VThreadScheduler extends Scheduler: final override def cancel(): Unit = if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then th.interrupt() - } + end ScheduledRunnable private object ScheduledRunnable: val interruptGuardVar = @@ -38,10 +50,9 @@ object VThreadScheduler extends Scheduler: .findVarHandle(classOf[ScheduledRunnable], "interruptGuard", classOf[Boolean]) object VThreadSupport extends AsyncSupport: - type Scheduler = VThreadScheduler.type - private final class VThreadLabel[R](): + final class VThreadLabel[R]() extends caps.SharedCapability: private var result: Option[R] = None private val lock = ReentrantLock() private val cond = lock.newCondition() @@ -65,11 +76,11 @@ object VThreadSupport extends AsyncSupport: result.get finally lock.unlock() - override opaque type Label[R] = VThreadLabel[R] + override type Label[R, Cap >: caps.CapSet <: caps.CapSet^] = VThreadLabel[R] // outside boundary: waiting on label // inside boundary: waiting on suspension - private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: Label[R] @uncheckedVariance) + private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: VThreadLabel[R] @uncheckedVariance) extends gears.async.Suspension[T, R]: private var nextInput: Option[T] = None private val lock = ReentrantLock() @@ -98,9 +109,9 @@ object VThreadSupport extends AsyncSupport: override opaque type Suspension[-T, +R] <: gears.async.Suspension[T, R] = VThreadSuspension[T, R] - override def boundary[R](body: (Label[R]) ?=> R): R = + override def boundary[R, Cap^](body: Label[R, Cap]^ ?->{Cap} R): R = val label = VThreadLabel[R]() - VThreadScheduler.execute: () => + VThreadScheduler.unsafeExecute: () => val result = body(using label) label.setResult(result) @@ -110,13 +121,16 @@ object VThreadSupport extends AsyncSupport: suspension.l.clearResult() suspension.setInput(arg) - override def scheduleBoundary(body: (Label[Unit]) ?=> Unit)(using Scheduler): Unit = + override def scheduleBoundary(body: Label[Unit, {}] ?-> Unit)(using Scheduler): Unit = VThreadScheduler.execute: () => val label = VThreadLabel[Unit]() body(using label) - override def suspend[T, R](body: Suspension[T, R] => R)(using l: Label[R]): T = - val sus = new VThreadSuspension[T, R]() + override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap} ->{Cap} R)(using l: Label[R, Cap]^): T = + val sus = new VThreadSuspension[T, R](using caps.unsafe.unsafeAssumePure(l)) val res = body(sus) - l.setResult(res) + l.setResult( + // SAFETY: will only be stored and returned by the Suspension resumption mechanism + caps.unsafe.unsafeAssumePure(res) + ) sus.waitInput() diff --git a/jvm/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala index 1b3c4e6f..5be5c96a 100644 --- a/jvm/src/main/scala/measurements/measureTimes.scala +++ b/jvm/src/main/scala/measurements/measureTimes.scala @@ -1,5 +1,7 @@ package measurements +import language.experimental.captureChecking + import gears.async.default.given import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel} diff --git a/jvm/src/test/scala/CancellationBehavior.scala b/jvm/src/test/scala/CancellationBehavior.scala index 7e214f0a..1d14707c 100644 --- a/jvm/src/test/scala/CancellationBehavior.scala +++ b/jvm/src/test/scala/CancellationBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.AsyncOperations.* import gears.async.default.given import gears.async.{Async, AsyncSupport, Future, uninterruptible} diff --git a/native/src/main/scala/async/DefaultSupport.scala b/native/src/main/scala/async/DefaultSupport.scala index ca4808ef..51811d7e 100644 --- a/native/src/main/scala/async/DefaultSupport.scala +++ b/native/src/main/scala/async/DefaultSupport.scala @@ -5,6 +5,6 @@ import gears.async.native.ForkJoinSupport object DefaultSupport extends ForkJoinSupport -given AsyncSupport = DefaultSupport +given DefaultSupport.type = DefaultSupport given DefaultSupport.Scheduler = DefaultSupport given AsyncOperations = DefaultSupport diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala index 4aa16504..75f580a3 100644 --- a/native/src/main/scala/async/ForkJoinSupport.scala +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -1,5 +1,7 @@ package gears.async.native +import language.experimental.captureChecking + import gears.async.Future.Promise import gears.async._ @@ -15,14 +17,18 @@ class NativeContinuation[-T, +R] private[native] (val cont: T => R) extends Susp def resume(arg: T): R = cont(arg) trait NativeSuspend extends SuspendSupport: - type Label[R] = nativeContinuations.BoundaryLabel[R] + import caps.unsafe.unsafeAssumePure + type Label[R, Cap^] = nativeContinuations.BoundaryLabel[R] type Suspension[T, R] = NativeContinuation[T, R] - override def boundary[R](body: (Label[R]) ?=> R): R = - nativeContinuations.boundary(body) + override def boundary[R, Cap^](body: (Label[R, Cap]^) ?->{Cap^} R): R = + val f = (l: Label[R, Cap]^) => body(using l) + val pf = f.unsafeAssumePure + run(v ?=> pf(v)) - override def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T = - nativeContinuations.suspend[T, R](f => body(NativeContinuation(f))) + override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap^} ->{Cap^} R)(using Label[R, Cap]^): T = + val pbody = body.unsafeAssumePure + nativeContinuations.suspend[T, R](f => pbody(NativeContinuation(f))) end NativeSuspend /** Spawns a single thread that does all the sleeping. */ @@ -82,7 +88,7 @@ class SuspendExecutorWithSleep(exec: ExecutionContext) with AsyncSupport with AsyncOperations with NativeSuspend { - type Scheduler = this.type + type Scheduler = ExecutorWithSleepThread } class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool()) diff --git a/native/src/main/scala/async/ForkJoinWithoutCaptures.scala b/native/src/main/scala/async/ForkJoinWithoutCaptures.scala new file mode 100644 index 00000000..1a2b4630 --- /dev/null +++ b/native/src/main/scala/async/ForkJoinWithoutCaptures.scala @@ -0,0 +1,6 @@ +package gears.async.native + +import scalanative.runtime.{Continuations => nativeContinuations} + +def run[R](body: nativeContinuations.BoundaryLabel[R] ?=> R): R = + nativeContinuations.boundary(body) // SAFETY: tracked by this package's mechanism diff --git a/shared/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala index 97775d58..a6c187ca 100644 --- a/shared/src/main/scala/async/Async.scala +++ b/shared/src/main/scala/async/Async.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import gears.async.Listener.NumberedLock import gears.async.Listener.withLock @@ -30,14 +32,14 @@ import scala.util.boundary * @see * [[Async$.group Async.group]] and [[Future$.apply Future.apply]] for [[Async]]-subscoping operations. */ -trait Async private[async] (using val support: AsyncSupport, val scheduler: support.Scheduler): +trait Async private[async] (using val support: AsyncSupport, val scheduler: support.Scheduler) extends caps.SharedCapability: /** Waits for completion of source `src` and returns the result. Suspends the computation. * * @see * [[Async.Source.awaitResult]] and [[Async$.await]] for extension methods calling [[Async!.await]] from the source * itself. */ - def await[T](src: Async.Source[T]): T + def await[T](src: Async.Source[T]^): T /** Returns the cancellation group for this [[Async]] context. */ def group: CompletionGroup @@ -59,7 +61,7 @@ object Async extends AsyncImpl: private val condVar = lock.newCondition() /** Wait for completion of async source `src` and return the result */ - override def await[T](src: Async.Source[T]): T = + override def await[T](src: Async.Source[T]^): T = src .poll() .getOrElse: @@ -98,24 +100,24 @@ object Async extends AsyncImpl: /** Execute asynchronous computation `body` using the given [[FromSync]] implementation. */ - inline def fromSync[T](using fs: FromSync)(body: Async.Spawn ?=> T): fs.Output[T] = + def fromSync[T](using fs: FromSync)(body: Async.Spawn ?=> T): fs.Output[T] = fs(body) /** Execute asynchronous computation `body` from the context. Requires a [[FromSync.Blocking]] implementation. */ - inline def blocking[T](using fromSync: FromSync.Blocking)( + def blocking[T](using fromSync: FromSync.Blocking)( body: Async.Spawn ?=> T ): T = fromSync(body) /** Returns the currently executing Async context. Equivalent to `summon[Async]`. */ - inline def current(using async: Async): Async = async + inline def current(using async: Async): async.type = async /** [[Async.Spawn]] is a special subtype of [[Async]], also capable of spawning runnable [[Future]]s. * * Most functions should not take [[Spawn]] as a parameter, unless the function explicitly wants to spawn "dangling" * runnable [[Future]]s. Instead, functions should take [[Async]] and spawn scoped futures within [[Async.group]]. */ - opaque type Spawn <: Async = Async + final opaque type Spawn <: Async = Async /** Runs `body` inside a spawnable context where it is allowed to spawn concurrently runnable [[Future]]s. When the * body returns, all spawned futures are cancelled and waited for. @@ -145,11 +147,12 @@ object Async extends AsyncImpl: * * Note that the [[Spawn]] from the resource must not be used for awaiting after allocation. */ - val spawning = new Resource[Spawn]: - override def use[V](body: Spawn => V)(using Async): V = group(spawn ?=> body(spawn)) - override def allocated(using allocAsync: Async): (Spawn, (Async) ?=> Unit) = - val group = CompletionGroup() // not linked to allocAsync's group because it would not unlink itself - (allocAsync.withGroup(group), closeAsync ?=> cancelAndWaitGroup(group)(using closeAsync)) + // not sure if we can capture-check this for now + // val spawning = new Resource[Spawn]: + // override def use[V](body: Spawn => V)(using Async): V = group(spawn ?=> body(spawn)) + // override def allocated(using allocAsync: Async): (Spawn, (Async) ?=> Unit) = + // val group = CompletionGroup() // not linked to allocAsync's group because it would not unlink itself + // (allocAsync.withGroup(group), closeAsync ?=> cancelAndWaitGroup(group)(using closeAsync)) /** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same * data to calls of [[Source!.poll]] and [[Source!.onComplete]]. An ephemeral source can pass new data in every call. @@ -160,6 +163,8 @@ object Async extends AsyncImpl: * An example of an ephemeral source is [[gears.async.Channel]]. */ trait Source[+T]: + /** The unique symbol representing the current source. */ + val symbol: SourceSymbol[T] = SourceSymbol.next /** Checks whether data is available at present and pass it to `k` if so. Calls to `poll` are always synchronous and * non-blocking. * @@ -178,14 +183,14 @@ object Async extends AsyncImpl: * Whether poll was able to pass data to `k`. Note that this is regardless of `k` being available to receive the * data. In most cases, one should pass `k` into [[Source!.onComplete]] if `poll` returns `false`. */ - def poll(k: Listener[T]): Boolean + def poll(k: Listener[T]^): Boolean /** Once data is available, pass it to the listener `k`. `onComplete` is always non-blocking. * * Note that `k`'s methods will be executed on the same thread as the [[Source]], usually in sequence. It is hence * important that the listener itself does not perform expensive operations. */ - def onComplete(k: Listener[T]): Unit + def onComplete(k: Listener[T]^): Unit /** Signal that listener `k` is dead (i.e. will always fail to acquire locks from now on), and should be removed * from `onComplete` queues. @@ -193,7 +198,7 @@ object Async extends AsyncImpl: * This permits original, (i.e. non-derived) sources like futures or channels to drop the listener from their * waiting sets. */ - def dropListener(k: Listener[T]): Unit + def dropListener(k: Listener[T]^): Unit /** Similar to [[Async.Source!.poll(k:Listener[T])* poll]], but instead of passing in a listener, directly return * the value `T` if it is available. @@ -210,18 +215,27 @@ object Async extends AsyncImpl: final def awaitResult(using ac: Async) = ac.await(this) end Source - extension [T](src: Source[scala.util.Try[T]]) + // an opaque identity for symbols + opaque type SourceSymbol[+T] = Long + private [Async] object SourceSymbol: + private val index = AtomicLong() + inline def next: SourceSymbol[Any] = + index.incrementAndGet() + // ... it can be quickly obtained from any Source + given[T]: scala.Conversion[Source[T], SourceSymbol[T]] = _.symbol + + extension [T](src: Source[scala.util.Try[T]]^) /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns. * @see * [[Source!.awaitResult awaitResult]] for non-unwrapping await. */ - inline def await(using Async) = src.awaitResult.get - extension [E, T](src: Source[Either[E, T]]) + def await(using Async): T = src.awaitResult.get + extension [E, T](src: Source[Either[E, T]]^) /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns. * @see * [[Source!.awaitResult awaitResult]] for non-unwrapping await. */ - inline def await(using Async) = src.awaitResult.right.get + inline def await(using inline async: Async) = src.awaitResult.right.get /** An original source has a standard definition of [[Source.onComplete onComplete]] in terms of [[Source.poll poll]] * and [[OriginalSource.addListener addListener]]. @@ -231,9 +245,9 @@ object Async extends AsyncImpl: */ abstract class OriginalSource[+T] extends Source[T]: /** Add `k` to the listener set of this source. */ - protected def addListener(k: Listener[T]): Unit + protected def addListener(k: Listener[T]^): Unit - def onComplete(k: Listener[T]): Unit = synchronized: + def onComplete(k: Listener[T]^): Unit = synchronized: if !poll(k) then addListener(k) end OriginalSource @@ -250,7 +264,7 @@ object Async extends AsyncImpl: val q = java.util.concurrent.ConcurrentLinkedQueue[T]() q.addAll(values.asJavaCollection) new Source[T]: - override def poll(k: Listener[T]): Boolean = + override def poll(k: Listener[T]^): Boolean = if q.isEmpty() then false else if !k.acquireLock() then true else @@ -262,11 +276,11 @@ object Async extends AsyncImpl: k.complete(item, this) true - override def onComplete(k: Listener[T]): Unit = poll(k) - override def dropListener(k: Listener[T]): Unit = () + override def onComplete(k: Listener[T]^): Unit = poll(k) + override def dropListener(k: Listener[T]^): Unit = () end values - extension [T](src: Source[T]) + extension [Origin](src: Source[Origin]^) /** Create a new source that requires the original source to run the given transformation function on every value * received. * @@ -277,21 +291,21 @@ object Async extends AsyncImpl: * the transformation function to be run on every value. `f` is run *before* the item is passed to the * [[Listener]]. */ - def transformValuesWith[U](f: T => U) = + def transformValuesWith[U](f: Origin => U): Source[U]^{f, src} = new Source[U]: - selfSrc => - def transform(k: Listener[U]) = - new Listener.ForwardingListener[T](selfSrc, k): - val lock = k.lock - def complete(data: T, source: Async.Source[T]) = - k.complete(f(data), selfSrc) - - def poll(k: Listener[U]): Boolean = - src.poll(transform(k)) - def onComplete(k: Listener[U]): Unit = - src.onComplete(transform(k)) - def dropListener(k: Listener[U]): Unit = - src.dropListener(transform(k)) + self: Source[U]^{f, src} => + def transform(k: Listener[U]^): Listener.ForwardingListener[Origin]^{k, f} = + new Listener.ForwardingListener[Origin](self, k): + val lock = k.lock + def complete(data: Origin, source: SourceSymbol[Origin]) = + k.complete(f(data), self.symbol) + + def poll(k: Listener[U]^): Boolean = + src.poll(transform(k)) + def onComplete(k: Listener[U]^): Unit = + src.onComplete(transform(k)) + def dropListener(k: Listener[U]^): Unit = + src.dropListener(transform(k)) /** Creates a source that "races" a list of sources. * @@ -304,33 +318,40 @@ object Async extends AsyncImpl: * @see * [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]]. */ - def race[T](sources: Source[T]*): Source[T] = raceImpl[T, T]((v, _) => v)(sources*) + def race[T, C^](sources: Seq[Source[T]^{C}]): Source[T]^{C} = raceImpl((v: T, _: SourceSymbol[T]) => v)(sources) + def race[T](s1: Source[T]^): Source[T]^{s1} = race(Seq(s1)) + def race[T](s1: Source[T]^, s2: Source[T]^): Source[T]^{s1, s2} = race(Seq(s1, s2)) + def race[T](s1: Source[T]^, s2: Source[T]^, s3: Source[T]^): Source[T]^{s1, s2, s3} = race(Seq(s1, s2, s3)) /** Like [[race]], but the returned value includes a reference to the upstream source that the item came from. * @see * [[Async$.select Async.select]] for a convenient syntax to race sources and awaiting them with [[Async]]. */ - def raceWithOrigin[T](sources: Source[T]*): Source[(T, Source[T])] = - raceImpl[(T, Source[T]), T]((v, src) => (v, src))(sources*) + def raceWithOrigin[T, C^](sources: (Source[T]^{C})*): Source[(T, SourceSymbol[T])]^{C} = + raceImpl((v: T, src: SourceSymbol[T]) => (v, src))(sources) /** Pass first result from any of `sources` to the continuation */ - private def raceImpl[T, U](map: (U, Source[U]) => T)(sources: Source[U]*): Source[T] = - new Source[T] { selfSrc => - def poll(k: Listener[T]): Boolean = + private def raceImpl[T1, U, C^](map: (U, SourceSymbol[U]) -> T1)(sources: Seq[Source[U]^{C}]): Source[T1]^{C} = + new Source[T1] { selfSrc: Source[T1]^{C} => + def poll(k: Listener[T1]^): Boolean = val it = sources.iterator var found = false - val listener = new Listener.ForwardingListener[U](this, k): + val listener: Listener[U]^{k} = new Listener.ForwardingListener[U](selfSrc, k): val lock = k.lock - def complete(data: U, source: Async.Source[U]) = - k.complete(map(data, source), selfSrc) + def complete(data: U, source: SourceSymbol[U]) = + k.complete(map(data, source), selfSrc.symbol) end listener while it.hasNext && !found do found = it.next.poll(listener) + found - def onComplete(k: Listener[T]): Unit = - val listener = new Listener.ForwardingListener[U](this, k) { self => + def dropAll(l: Listener[U]^) = sources.foreach(_.dropListener(l)) + + def onComplete(k: Listener[T1]^): Unit = + val listener: Listener[U]^{k, C} = new Listener.ForwardingListener[U](this, k) { + val self = this inline def lockIsOurs = k.lock == null val lock = if k.lock != null then @@ -345,7 +366,7 @@ object Async extends AsyncImpl: found = true old } - then sources.foreach(_.dropListener(self)) // same as dropListener(k), but avoids an allocation + then dropAll(self) // same as dropListener(k), but avoids an allocation false else if found then k.lock.release() @@ -369,21 +390,20 @@ object Async extends AsyncImpl: var found = false - def complete(item: U, src: Async.Source[U]) = + def complete(item: U, src: SourceSymbol[U]) = found = true if lockIsOurs then lock.release() - sources.foreach(s => if s != src then s.dropListener(self)) - k.complete(map(item, src), selfSrc) + sources.foreach(s => if s.symbol != src then s.dropListener(self)) + k.complete(map(item, src), selfSrc.symbol) } // end listener sources.foreach(_.onComplete(listener)) - def dropListener(k: Listener[T]): Unit = - val listener = Listener.ForwardingListener.empty[U](this, k) + def dropListener(k: Listener[T1]^): Unit = + val listener = Listener.ForwardingListener.empty(this, k) sources.foreach(_.dropListener(listener)) - } - end raceImpl + /** Cases for handling async sources in a [[select]]. [[SelectCase]] can be constructed by extension methods `handle` * of [[Source]]. @@ -393,21 +413,27 @@ object Async extends AsyncImpl: * @see * [[Async$.select Async.select]] where [[SelectCase]] is used. */ - opaque type SelectCase[T] = (Source[?], Nothing => T) - // ^ unsafe types, but we only construct SelectCase from `handle` which is safe + trait SelectCase[+T]: + type Src + val src: Source[Src]^{this} + val f: Src => T + inline final def apply(input: Src) = f(input) - extension [T](src: Source[T]) + extension [T](_src: Source[T]^) /** Attach a handler to `src`, creating a [[SelectCase]]. * @see * [[Async$.select Async.select]] where [[SelectCase]] is used. */ - inline def handle[U](f: T => U): SelectCase[U] = (src, f) + def handle[U](_f: T => U): SelectCase[U]^{_src, _f} = new SelectCase: + type Src = T + val src = _src + val f = _f /** Alias for [[handle]] * @see * [[Async$.select Async.select]] where [[SelectCase]] is used. */ - inline def ~~>[U](f: T => U): SelectCase[U] = src.handle(f) + inline def ~~>[U](_f: T => U): SelectCase[U]^{_src, _f} = _src.handle(_f) /** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]], * [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in @@ -429,10 +455,10 @@ object Async extends AsyncImpl: * ) * }}} */ - def select[T](cases: SelectCase[T]*)(using Async) = - val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult - val (_, handler) = cases.find(_._1 == which).get - handler.asInstanceOf[input.type => T](input) + def select[T, C^](cases: (SelectCase[T]^{C})*)(using Async) = + val (input, which) = raceWithOrigin(cases.map(_.src)*).awaitResult + val sc = cases.find(_.src.symbol == which).get + sc(input.asInstanceOf[sc.Src]) /** Race two sources, wrapping them respectively in [[Left]] and [[Right]] cases. * @return @@ -441,6 +467,9 @@ object Async extends AsyncImpl: * @see * [[race]] and [[select]] for racing more than two sources. */ - def either[T1, T2](src1: Source[T1], src2: Source[T2]): Source[Either[T1, T2]] = - race(src1.transformValuesWith(Left(_)), src2.transformValuesWith(Right(_))) + def either[T1, T2](src1: Source[T1]^, src2: Source[T2]^): Source[Either[T1, T2]]^{src1, src2} = + val left = src1.transformValuesWith(Left(_)) + val right = src2.transformValuesWith(Right(_)) + race(left, right) end Async + diff --git a/shared/src/main/scala/async/AsyncOperations.scala b/shared/src/main/scala/async/AsyncOperations.scala index aea71fd8..a87e9698 100644 --- a/shared/src/main/scala/async/AsyncOperations.scala +++ b/shared/src/main/scala/async/AsyncOperations.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import gears.async.AsyncOperations.sleep import java.util.concurrent.TimeoutException @@ -16,7 +18,7 @@ trait AsyncOperations: /** Suspends the current [[Async]] context for at least `millis` milliseconds. */ def sleep(millis: Long)(using async: Async): Unit = Future - .withResolver[Unit]: resolver => + .withResolver[Unit, caps.CapSet^{}]: resolver => val cancellable = async.scheduler.schedule(millis.millis, () => resolver.resolve(())) resolver.onCancel: () => cancellable.cancel() @@ -27,7 +29,7 @@ trait AsyncOperations: /** Yields the current [[Async]] context, possibly allowing other computations to run. */ def `yield`()(using async: Async) = Future - .withResolver[Unit]: resolver => + .withResolver[Unit, caps.CapSet^{}]: resolver => async.scheduler.execute(() => resolver.resolve(())) .link() .await @@ -55,7 +57,7 @@ object AsyncOperations: * [[java.util.concurrent.TimeoutException]] is thrown. */ def withTimeout[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOperations, Async): T = - Async.group: + Async.group: spawn ?=> Async.select( Future(op).handle(_.get), Future(sleep(timeout)).handle: _ => diff --git a/shared/src/main/scala/async/AsyncSupport.scala b/shared/src/main/scala/async/AsyncSupport.scala index 0505ebf4..a1d255f3 100644 --- a/shared/src/main/scala/async/AsyncSupport.scala +++ b/shared/src/main/scala/async/AsyncSupport.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import scala.concurrent.duration._ /** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T` @@ -9,18 +11,18 @@ trait Suspension[-T, +R]: def resume(arg: T): R /** Support for suspension capabilities through a delimited continuation interface. */ -trait SuspendSupport: +trait SuspendSupport extends caps.Pure: /** A marker for the "limit" of "delimited continuation". */ - type Label[R] + type Label[R, Cap^] <: caps.SharedCapability /** The provided suspension type. */ type Suspension[-T, +R] <: gears.async.Suspension[T, R] /** Set the suspension marker as the body's caller, and execute `body`. */ - def boundary[R](body: Label[R] ?=> R): R + def boundary[R, Cap^](body: Label[R, Cap]^ ?->{Cap} R): R /** Should return immediately if resume is called from within body */ - def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T + def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap} ->{Cap} R)(using Label[R, Cap]^): T /** Extends [[SuspendSupport]] with "asynchronous" boundary/resume functions, in the presence of a [[Scheduler]] */ trait AsyncSupport extends SuspendSupport: @@ -31,7 +33,7 @@ trait AsyncSupport extends SuspendSupport: s.execute(() => suspension.resume(arg)) /** Schedule a computation with the suspension boundary already created. */ - private[async] def scheduleBoundary(body: Label[Unit] ?=> Unit)(using s: Scheduler): Unit = + private[async] def scheduleBoundary(body: Label[Unit, {}]^ ?-> Unit)(using s: Scheduler): Unit = s.execute(() => boundary(body)) /** A scheduler implementation, with the ability to execute a computation immediately or after a delay. */ diff --git a/shared/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala index 9ad3c92f..14bef9e4 100644 --- a/shared/src/main/scala/async/Cancellable.scala +++ b/shared/src/main/scala/async/Cancellable.scala @@ -1,8 +1,9 @@ package gears.async +import language.experimental.captureChecking + /** A trait for cancellable entities that can be grouped. */ trait Cancellable: - private var group: CompletionGroup = scala.compiletime.uninitialized /** Issue a cancel request */ @@ -11,9 +12,9 @@ trait Cancellable: /** Add this cancellable to the given group after removing it from the previous group in which it was. */ def link(group: CompletionGroup): this.type = synchronized: - if this.group != null then this.group.drop(this) + if this.group != null then this.group.drop(this.unsafeAssumePure) this.group = group - this.group.add(this) + this.group.add(this.unsafeAssumePure) this /** Link this cancellable to the cancellable group of the current async context. @@ -25,6 +26,10 @@ trait Cancellable: def unlink(): this.type = link(CompletionGroup.Unlinked) + /** Assume that the [[Cancellable]] is pure, in the case that cancellation does *not* refer to captured resources. + */ + inline def unsafeAssumePure: Cancellable = caps.unsafe.unsafeAssumePure(this) + end Cancellable object Cancellable: diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index e98af034..e9b3fead 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -1,4 +1,6 @@ package gears.async +import language.experimental.captureChecking + import scala.collection.mutable import scala.util.Success diff --git a/shared/src/main/scala/async/Listener.scala b/shared/src/main/scala/async/Listener.scala index ca4e7c3c..afb88332 100644 --- a/shared/src/main/scala/async/Listener.scala +++ b/shared/src/main/scala/async/Listener.scala @@ -1,6 +1,9 @@ package gears.async +import language.experimental.captureChecking + import gears.async.Async.Source +import gears.async.Async.SourceSymbol import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock @@ -24,17 +27,17 @@ trait Listener[-T]: * * The listener must automatically release its own lock upon completion. */ - def complete(data: T, source: Async.Source[T]): Unit + def complete(data: T, source: Async.SourceSymbol[T]): Unit /** Represents the exposed API for synchronization on listeners at receiving time. If the listener does not have any * form of synchronization, [[lock]] should be `null`. */ - val lock: Listener.ListenerLock | Null + val lock: (Listener.ListenerLock^) | Null /** Attempts to acquire locks and then calling [[complete]] with the given item and source. If locking fails, * [[releaseLock]] is automatically called. */ - def completeNow(data: T, source: Async.Source[T]): Boolean = + def completeNow(data: T, source: Async.SourceSymbol[T]): Boolean = if acquireLock() then this.complete(data, source) true @@ -50,25 +53,25 @@ trait Listener[-T]: object Listener: /** A simple [[Listener]] that always accepts the item and sends it to the consumer. */ - inline def acceptingListener[T](inline consumer: (T, Source[T]) => Unit) = + /* inline bug */ def acceptingListener[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} = new Listener[T]: val lock = null - def complete(data: T, source: Source[T]) = consumer(data, source) + def complete(data: T, source: SourceSymbol[T]) = consumer(data, source) /** Returns a simple [[Listener]] that always accepts the item and sends it to the consumer. */ - inline def apply[T](consumer: (T, Source[T]) => Unit): Listener[T] = acceptingListener(consumer) + def apply[T](consumer: (T, SourceSymbol[T]) => Unit): Listener[T]^{consumer} = acceptingListener(consumer) /** A special class of listener that forwards the inner listener through the given source. For purposes of * [[Async.Source.dropListener]] these listeners are compared for equality by the hash of the source and the inner * listener. */ - abstract case class ForwardingListener[T](src: Async.Source[?], inner: Listener[?]) extends Listener[T] + abstract case class ForwardingListener[-T](src: Async.Source[?]^, inner: Listener[?]^) extends Listener[T] object ForwardingListener: - /** Create an empty [[ForwardingListener]] for equality comparison. */ - def empty[T](src: Async.Source[?], inner: Listener[?]) = new ForwardingListener[T](src, inner): + /** Creates an empty [[ForwardingListener]] for equality comparison. */ + def empty(src: Async.Source[?]^, inner: Listener[?]^): ForwardingListener[Any]^{src, inner} = new ForwardingListener[Any](src, inner): val lock = null - override def complete(data: T, source: Async.Source[T]) = ??? + override def complete(data: Any, source: SourceSymbol[Any]) = ??? /** A lock required by a listener to be acquired before accepting values. Should there be multiple listeners that * needs to be locked at the same time, they should be locked by larger-number-first. diff --git a/shared/src/main/scala/async/Resource.scala b/shared/src/main/scala/async/Resource.scala index bc15f75f..8d13ab27 100644 --- a/shared/src/main/scala/async/Resource.scala +++ b/shared/src/main/scala/async/Resource.scala @@ -1,10 +1,17 @@ package gears.async +import language.experimental.captureChecking + /** A Resource wraps allocation to some asynchronously allocatable and releasable resource and grants access to it. It * allows both structured access (similar to [[scala.util.Using]]) and unstructured allocation. */ trait Resource[+T]: - self => + self: Resource[T]^ => + + /** Pear is a (T, Async ?=> Unit) pair without generics. */ + trait Pear: + val item: T^{this} + def cleanup(using Async): Unit /** Run a structured action on the resource. It is allocated and released automatically. * @@ -13,10 +20,10 @@ trait Resource[+T]: * @return * the result of [[body]] */ - def use[V](body: T => V)(using Async): V = + def use[V](body: Pear^ => V)(using Async): V = val res = allocated - try body(res._1) - finally res._2 + try body(res) + finally res.cleanup /** Allocate the resource and leak it. **Use with caution**. The programmer is responsible for closing the resource * with the returned handle. @@ -24,7 +31,7 @@ trait Resource[+T]: * @return * the allocated access to the resource data as well as a handle to close it */ - def allocated(using Async): (T, Async ?=> Unit) + def allocated(using Async): Pear^ /** Create a derived resource that inherits the close operation. * @@ -33,17 +40,23 @@ trait Resource[+T]: * @return * the transformed resource used to access the mapped resource data */ - def map[U](fn: T => Async ?=> U): Resource[U] = new Resource[U]: - override def use[V](body: U => V)(using Async): V = self.use(t => body(fn(t))) - override def allocated(using Async): (U, (Async) ?=> Unit) = + def map[U](fn: Async ?=> (t: T^) => U): Resource[U]^{fn, self} = new Resource[U]: + override def use[V](body: Pear^ => V)(using Async): V = self.use: t => + body: + new Pear: + val item = fn(t.item) + def cleanup(using Async): Unit = t.cleanup + override def allocated(using Async) = val res = self.allocated try - (fn(res._1), res._2) + new Pear: + val item = fn(res.item) + def cleanup(using Async) = res.cleanup catch e => - res._2 + res.cleanup throw e - override def map[Q](fn2: U => (Async) ?=> Q): Resource[Q] = self.map(t => fn2(fn(t))) + // override def map[Q](fn2: U => Async ?=> Q): Resource[Q]^{fn, this} = self.map(t => fn2(fn(t))) /** Create a derived resource that creates a inner resource from the resource data. The inner resource will be * acquired simultaneously, thus it can both transform the resource data and add a new cleanup action. @@ -53,22 +66,26 @@ trait Resource[+T]: * @return * the transformed resource that provides the two-levels-in-one access */ - def flatMap[U](fn: T => Async ?=> Resource[U]): Resource[U] = new Resource[U]: - override def use[V](body: U => V)(using Async): V = self.use(t => fn(t).use(body)) - override def allocated(using Async): (U, (Async) ?=> Unit) = + def flatMap[U](fn: Async ?=> (t: T^) => Resource[U]^): Resource[U]^{fn, this} = new Resource[U]: + override def use[V](body: Pear^ => V)(using Async): V = self.use: t => + val u = fn(t.item) + val inner = u.allocated + body: + new Pear: + val item = inner.item + def cleanup(using Async): Unit = inner.cleanup + override def allocated(using Async) = val res = self.allocated try - val mapped = fn(res._1).allocated - ( - mapped._1, - { closeAsync ?=> - try mapped._2(using closeAsync) // close inner first - finally res._2(using closeAsync) // then close second, even if first failed - } - ) + val mapped = fn(res.item).allocated + new Pear: + val item = mapped.item + def cleanup(using Async) = + try mapped.cleanup // close inner first + finally res.cleanup // then close second, even if first failed catch e => - res._2 + res.cleanup throw e end Resource @@ -85,9 +102,11 @@ object Resource: */ inline def apply[T](inline alloc: Async ?=> T, inline close: T => Async ?=> Unit): Resource[T] = new Resource[T]: - def allocated(using Async): (T, (Async) ?=> Unit) = + def allocated(using Async) = val res = alloc - (res, close(res)) + new Pear: + val item = res + def cleanup(using Async) = close(res) /** Create a concurrent computation resource from an allocator function. It can use the given capability to spawn * [[Future]]s and return a handle to communicate with them. Allocation is only complete after that allocator @@ -104,7 +123,7 @@ object Resource: * @return * a new resource wrapping access to the spawnBody's results */ - inline def spawning[T](inline spawnBody: Async.Spawn ?=> T) = Async.spawning.map(spawn => spawnBody(using spawn)) + // inline def spawning[T](inline spawnBody: Async.Spawn ?=> T) = Async.spawning.map(spawn => spawnBody(using spawn)) /** Create a resource that does not need asynchronous allocation nor cleanup. * @@ -126,29 +145,31 @@ object Resource: * @return * a new resource wrapping access to the combined element */ - def both[T, U, V](res1: Resource[T], res2: Resource[U])(join: (T, U) => V): Resource[V] = new Resource[V]: - override def allocated(using Async): (V, (Async) ?=> Unit) = - val alloc1 = res1.allocated - val alloc2 = + def both[T, U, V](res1: Resource[T]^, res2: Resource[U]^)(join: (t: T^, u: U^) => V): Resource[V]^{res1, res2, join} = new Resource[V]: + override def allocated(using async: Async) = + val p1 = res1.allocated + val p2 = try res2.allocated catch e => - alloc1._2 + p1.cleanup throw e - try - val joined = join(alloc1._1, alloc2._1) - ( - joined, - { closeAsync ?=> - try alloc1._2(using closeAsync) - finally alloc2._2(using closeAsync) - } - ) + val joined = join(p1.item, p2.item) + new Pear: + val item = joined + def cleanup(using Async) = + try + p1.cleanup + finally + p2.cleanup + catch e => - try alloc1._2 - finally alloc2._2 + try + p1.cleanup + finally + p2.cleanup throw e end both @@ -159,8 +180,8 @@ object Resource: * @return * the resource of the list of elements provided by the single resources */ - def all[T](ress: List[Resource[T]]): Resource[List[T]] = ress match - case Nil => just(Nil) - case head :: Nil => head.map(List(_)) - case head :: next => both(head, all(next))(_ :: _) + // def all[T, C^](ress: List[Resource[T]^{C}]): Resource[List[T^{C}]]^{C} = ress match + // case Nil => just(Nil) + // case head :: Nil => head.map(t => List(t)) + // case head :: next => both(head, all(next))(_ :: _) end Resource diff --git a/shared/src/main/scala/async/ScalaConverters.scala b/shared/src/main/scala/async/ScalaConverters.scala index e3d9a7dd..fadbf5dd 100644 --- a/shared/src/main/scala/async/ScalaConverters.scala +++ b/shared/src/main/scala/async/ScalaConverters.scala @@ -1,27 +1,29 @@ package gears.async +import language.experimental.captureChecking + import scala.concurrent.ExecutionContext import scala.concurrent.{Future as StdFuture, Promise as StdPromise} import scala.util.Try /** Converters from Gears types to Scala API types and back. */ object ScalaConverters: - extension [T](fut: StdFuture[T]) + extension [T](fut: StdFuture[T]^) /** Converts a [[scala.concurrent.Future Scala Future]] into a gears [[Future]]. Requires an * [[scala.concurrent.ExecutionContext ExecutionContext]], as the job of completing the returned [[Future]] will be * done through this context. Since [[scala.concurrent.Future Scala Future]] cannot be cancelled, the returned * [[Future]] will *not* clean up the pending job when cancelled. */ - def asGears(using ExecutionContext): Future[T] = - Future.withResolver[T]: resolver => + def asGears(using ExecutionContext): Future[T]^{fut} = + Future.withResolver[T, caps.CapSet]: resolver => fut.andThen(result => resolver.complete(result)) - extension [T](fut: Future[T]) + extension [T](fut: Future[T]^) /** Converts a gears [[Future]] into a Scala [[scala.concurrent.Future Scala Future]]. Note that if `fut` is * cancelled, the returned [[scala.concurrent.Future Scala Future]] will also be completed with * `Failure(CancellationException)`. */ - def asScala: StdFuture[T] = + def asScala: StdFuture[T]^{fut} = val p = StdPromise[T]() fut.onComplete(Listener((res, _) => p.complete(res))) p.future diff --git a/shared/src/main/scala/async/Timer.scala b/shared/src/main/scala/async/Timer.scala index fbce50c3..1f00b0e8 100644 --- a/shared/src/main/scala/async/Timer.scala +++ b/shared/src/main/scala/async/Timer.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import gears.async.Listener import java.util.concurrent.CancellationException @@ -9,10 +11,12 @@ import scala.collection.mutable import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} +import scala.annotation.unchecked.uncheckedCaptures import AsyncOperations.sleep import Future.Promise + /** Timer exposes a steady [[Async.Source]] of ticks that happens every `tickDuration` milliseconds. Note that the timer * does not start ticking until `start` is called (which is a blocking operation, until the timer is cancelled). * @@ -27,23 +31,32 @@ class Timer(tickDuration: Duration) extends Cancellable { private var isCancelled = false private object Source extends Async.OriginalSource[this.TimerEvent] { - val listeners = mutable.Set[Listener[TimerEvent]]() - def tick() = synchronized { + private val listeners : mutable.Set[(Listener[TimerEvent]^) @uncheckedCaptures] = + mutable.Set[(Listener[TimerEvent]^) @uncheckedCaptures]() + + def tick(): Unit = synchronized { listeners.filterInPlace(l => - l.completeNow(TimerEvent.Tick, this) + l.completeNow(TimerEvent.Tick, src) false ) } - override def poll(k: Listener[TimerEvent]): Boolean = + override def poll(k: Listener[TimerEvent]^): Boolean = if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else false // subscribing to a timer always takes you to the next tick - override def dropListener(k: Listener[TimerEvent]): Unit = listeners -= k - override protected def addListener(k: Listener[TimerEvent]): Unit = + override def dropListener(k: Listener[TimerEvent]^): Unit = listeners -= k + override protected def addListener(k: Listener[TimerEvent]^): Unit = if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else Timer.this.synchronized: if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else listeners += k + + def cancel(): Unit = + synchronized { isCancelled = true } + src.synchronized { + Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src)) + Source.listeners.clear() + } } /** Ticks of the timer are delivered through this source. Note that ticks are ephemeral. */ @@ -62,10 +75,6 @@ class Timer(tickDuration: Duration) extends Cancellable { Source.tick() loop() - override def cancel(): Unit = - synchronized { isCancelled = true } - src.synchronized { - Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src)) - Source.listeners.clear() - } + override def cancel(): Unit = Source.cancel() } + diff --git a/shared/src/main/scala/async/channels.scala b/shared/src/main/scala/async/channels.scala index 15c9e98b..adb2d693 100644 --- a/shared/src/main/scala/async/channels.scala +++ b/shared/src/main/scala/async/channels.scala @@ -1,8 +1,12 @@ package gears.async + +import language.experimental.captureChecking + import gears.async.Async.Source import gears.async.Listener.acceptingListener import gears.async.listeners.lockBoth +import scala.annotation.unchecked.uncheckedCaptures import scala.collection.mutable import scala.util.control.Breaks.{break, breakable} import scala.util.{Failure, Success, Try} @@ -138,11 +142,11 @@ object SyncChannel: def apply[T](): SyncChannel[T] = Impl() private class Impl[T] extends Channel.Impl[T] with SyncChannel[T]: - override def pollRead(r: Reader): Boolean = synchronized: + override def pollRead(r: Reader^): Boolean = synchronized: // match reader with buffer of senders checkClosed(readSource, r) || cells.matchReader(r) - override def pollSend(src: CanSend, s: Sender): Boolean = synchronized: + override def pollSend(src: CanSend, s: Sender^): Boolean = synchronized: // match reader with buffer of senders checkClosed(src, s) || cells.matchSender(src, s) end Impl @@ -157,12 +161,12 @@ object BufferedChannel: val buf = new mutable.Queue[T](size) // Match a reader -> check space in buf -> fail - override def pollSend(src: CanSend, s: Sender): Boolean = synchronized: + override def pollSend(src: CanSend, s: Sender^): Boolean = synchronized: checkClosed(src, s) || cells.matchSender(src, s) || senderToBuf(src, s) // Check space in buf -> fail // If we can pop from buf -> try to feed a sender - override def pollRead(r: Reader): Boolean = synchronized: + override def pollRead(r: Reader^): Boolean = synchronized: if checkClosed(readSource, r) then true else if !buf.isEmpty then if r.completeNow(Right(buf.head), readSource) then @@ -175,7 +179,7 @@ object BufferedChannel: else false // Try to add a sender to the buffer - def senderToBuf(src: CanSend, s: Sender): Boolean = + def senderToBuf(src: CanSend, s: Sender^): Boolean = if buf.size < size then if s.completeNow(Right(()), src) then buf += src.item true @@ -195,7 +199,7 @@ object UnboundedChannel: pollSend(CanSend(x), acceptingListener((r, _) => result = r)) if result.isLeft then throw ChannelClosedException() - override def pollRead(r: Reader): Boolean = synchronized: + override def pollRead(r: Reader^): Boolean = synchronized: if checkClosed(readSource, r) then true else if !buf.isEmpty then if r.completeNow(Right(buf.head), readSource) then @@ -204,7 +208,7 @@ object UnboundedChannel: true else false - override def pollSend(src: CanSend, s: Sender): Boolean = synchronized: + override def pollSend(src: CanSend, s: Sender^): Boolean = synchronized: if checkClosed(src, s) || cells.matchSender(src, s) then true else if s.completeNow(Right(()), src) then buf += src.item @@ -228,21 +232,21 @@ object Channel: var isClosed = false val cells = CellBuf() // Poll a reader, returning false if it should be put into queue - def pollRead(r: Reader): Boolean + def pollRead(r: Reader^): Boolean // Poll a reader, returning false if it should be put into queue - def pollSend(src: CanSend, s: Sender): Boolean + def pollSend(src: CanSend, s: Sender^): Boolean - protected final def checkClosed[T](src: Async.Source[Res[T]], l: Listener[Res[T]]): Boolean = + protected final def checkClosed[T](src: Async.Source[Res[T]], l: Listener[Res[T]]^): Boolean = if isClosed then l.completeNow(Left(Closed), src) true else false override val readSource: Source[ReadResult] = new Source { - override def poll(k: Reader): Boolean = pollRead(k) - override def onComplete(k: Reader): Unit = Impl.this.synchronized: + override def poll(k: Reader^): Boolean = pollRead(k) + override def onComplete(k: Reader^): Unit = Impl.this.synchronized: if !pollRead(k) then cells.addReader(k) - override def dropListener(k: Reader): Unit = Impl.this.synchronized: + override def dropListener(k: Reader^): Unit = Impl.this.synchronized: if !isClosed then cells.dropReader(k) } override final def sendSource(x: T): Source[SendResult] = CanSend(x) @@ -253,7 +257,7 @@ object Channel: cells.cancel() /** Complete a pair of locked sender and reader. */ - protected final def complete(src: CanSend, reader: Listener[ReadResult], sender: Listener[SendResult]) = + protected final def complete(src: CanSend, reader: Listener[ReadResult]^, sender: Listener[SendResult]^) = reader.complete(Right(src.item), readSource) sender.complete(Right(()), src) @@ -261,10 +265,10 @@ object Channel: // dependent on a (possibly odd) equality of T. Users do not expect that // cancelling a send of a given item might in fact cancel that of an equal one. protected final class CanSend(val item: T) extends Source[SendResult] { - override def poll(k: Listener[SendResult]): Boolean = pollSend(this, k) - override def onComplete(k: Listener[SendResult]): Unit = Impl.this.synchronized: + override def poll(k: Listener[SendResult]^): Boolean = pollSend(this, k) + override def onComplete(k: Listener[SendResult]^): Unit = Impl.this.synchronized: if !pollSend(this, k) then cells.addSender(this, k) - override def dropListener(k: Listener[SendResult]): Unit = Impl.this.synchronized: + override def dropListener(k: Listener[SendResult]^): Unit = Impl.this.synchronized: if !isClosed then cells.dropSender(this, k) } @@ -272,6 +276,8 @@ object Channel: * there are *only* all readers or all senders. It must be externally synchronized. */ private[async] class CellBuf(): + import caps.unsafe.unsafeAssumePure // very unsafe WIP + type Cell = Reader | (CanSend, Sender) // reader == 0 || sender == 0 always private var reader = 0 @@ -292,27 +298,27 @@ object Channel: def dequeue() = pending.dequeue() if reader > 0 then reader -= 1 else sender -= 1 - def addReader(r: Reader): this.type = + def addReader(r: Reader^): this.type = require(sender == 0) reader += 1 - pending.enqueue(r) + pending.enqueue(r.unsafeAssumePure) this - def addSender(src: CanSend, s: Sender): this.type = + def addSender(src: CanSend, s: Sender^): this.type = require(reader == 0) sender += 1 - pending.enqueue((src, s)) + pending.enqueue((src, s.unsafeAssumePure)) this - def dropReader(r: Reader): this.type = + def dropReader(r: Reader^): this.type = if reader > 0 then if pending.removeFirst(_ == r).isDefined then reader -= 1 this - def dropSender(src: CanSend, s: Sender): this.type = + def dropSender(src: CanSend, s: Sender^): this.type = if sender > 0 then if pending.removeFirst(_ == (src, s)).isDefined then sender -= 1 this /** Match a possible reader to a queue of senders: try to go through the queue with lock pairing, stopping when * finding a good pair. */ - def matchReader(r: Reader): Boolean = + def matchReader(r: Reader^): Boolean = while hasSender do val (src, s) = nextSender tryComplete(src, s)(r) match @@ -324,7 +330,7 @@ object Channel: /** Match a possible sender to a queue of readers: try to go through the queue with lock pairing, stopping when * finding a good pair. */ - def matchSender(src: CanSend, s: Sender): Boolean = + def matchSender(src: CanSend, s: Sender^): Boolean = while hasReader do val r = nextReader tryComplete(src, s)(r) match @@ -333,7 +339,7 @@ object Channel: case _ => dequeue() // drop gone reader from queue false - private inline def tryComplete(src: CanSend, s: Sender)(r: Reader): s.type | r.type | Unit = + private inline def tryComplete(src: CanSend, s: Sender^)(r: Reader^): s.type | r.type | Unit = lockBoth(r, s) match case true => Impl.this.complete(src, r, s) @@ -394,30 +400,29 @@ object ChannelMultiplexer: while (!shouldTerminate) { val publishersCopy = synchronized(publishers.toSeq) - Async.select( - (infoChannel.readSource ~~> { - case Left(_) | Right(Message.Quit) => - val subscribersCopy = synchronized(subscribers.toList) - for (s <- subscribersCopy) s.send(Failure(ChannelClosedException())) - shouldTerminate = true - case Right(Message.Refresh) => () - }) +: - publishersCopy.map { pub => - pub.readSource ~~> { - case Right(v) => - val subscribersCopy = synchronized(subscribers.toList) - var c = 0 - for (s <- subscribersCopy) { - c += 1 - try s.send(Success(v)) - catch - case closedEx: ChannelClosedException => - removeSubscriber(s) - } - case Left(_) => removePublisher(pub) - } - }* - ) + val pubCases = + publishersCopy.map: pub => + pub.readSource.handle: + case Right(v) => + val subscribersCopy = synchronized(subscribers.toList) + var c = 0 + for (s <- subscribersCopy) { + c += 1 + try s.send(Success(v)) + catch + case closedEx: ChannelClosedException => + removeSubscriber(s) + } + case Left(_) => removePublisher(pub) + + val infoCase = infoChannel.readSource.handle: + case Left(_) | Right(Message.Quit) => + val subscribersCopy = synchronized(subscribers.toList) + for (s <- subscribersCopy) s.send(Failure(ChannelClosedException())) + shouldTerminate = true + case Right(Message.Refresh) => () + + Async.select((infoCase +: pubCases)*) } } diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 01505efb..e44304aa 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import java.util.concurrent.CancellationException import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.tailrec @@ -9,6 +11,8 @@ import scala.compiletime.uninitialized import scala.util import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} +import gears.async.Async.SourceSymbol +import scala.annotation.meta.companionMethod /** Futures are [[Async.Source Source]]s that has the following properties: * - They represent a single value: Once resolved, [[Async.await await]]-ing on a [[Future]] should always return the @@ -48,7 +52,7 @@ object Future: * - withResolver: Completion is done by external request set up from a block of code. */ private class CoreFuture[+T] extends Future[T]: - + self: CoreFuture[T]^ => @volatile protected var hasCompleted: Boolean = false protected var cancelRequest = AtomicBoolean(false) private var result: Try[T] = uninitialized // guaranteed to be set if hasCompleted = true @@ -56,17 +60,19 @@ object Future: // Async.Source method implementations - def poll(k: Listener[Try[T]]): Boolean = + import caps.unsafe.unsafeAssumePure + + def poll(k: Listener[Try[T]]^): Boolean = if hasCompleted then - k.completeNow(result, this) + k.completeNow(result, this.symbol) true else false - def addListener(k: Listener[Try[T]]): Unit = synchronized: - waiting += k + def addListener(k: Listener[Try[T]]^): Unit = synchronized: + waiting += k.unsafeAssumePure - def dropListener(k: Listener[Try[T]]): Unit = synchronized: - waiting -= k + def dropListener(k: Listener[Try[T]]^): Unit = synchronized: + waiting -= k.unsafeAssumePure // Cancellable method implementations @@ -77,8 +83,8 @@ object Future: // though hasCompleted is accessible without "synchronized", // we want it not to be run while the future was trying to complete. synchronized: - if !hasCompleted || group == CompletionGroup.Unlinked then super.link(group) - else this + val t: this.type = if !hasCompleted || group == CompletionGroup.Unlinked then super.link(group) else this + t /** Sets the cancellation state and returns `true` if the future has not been completed and cancelled before. */ protected final def setCancelled(): Boolean = @@ -101,14 +107,15 @@ object Future: waiting.clear() unlink() ws - for listener <- toNotify do listener.completeNow(result, this) + for listener <- toNotify do listener.completeNow(result, this.symbol) end CoreFuture /** A future that is completed by evaluating `body` as a separate asynchronous operation in the given `scheduler` */ - private class RunnableFuture[+T](body: Async.Spawn ?=> T)(using ac: Async) extends CoreFuture[T]: - + private class RunnableFuture[+T](body: Async.Spawn ?-> T)(using ac: Async) extends CoreFuture[T]: + private given acSupport: ac.support.type = ac.support + private given acScheduler: ac.support.Scheduler = ac.scheduler /** RunnableFuture maintains its own inner [[CompletionGroup]], that is separated from the provided Async * instance's. When the future is cancelled, we only cancel this CompletionGroup. This effectively means any * `.await` operations within the future is cancelled *only if they link into this group*. The future body run with @@ -119,27 +126,27 @@ object Future: private def checkCancellation(): Unit = if cancelRequest.get() then throw new CancellationException() - private class FutureAsync(val group: CompletionGroup)(using label: ac.support.Label[Unit]) + private class FutureAsync[Cap^](val group: CompletionGroup)(using label: ac.support.Label[Unit, Cap]^) extends Async(using ac.support, ac.scheduler): - private class AwaitListener[T](src: Async.Source[T]) - extends Function[ac.support.Suspension[T | Null, Unit], Unit], - Listener[T], + private class AwaitListener[T](@annotation.constructorOnly src: Async.Source[T]^) + extends Listener[T], Listener.ListenerLock, Listener.NumberedLock, Cancellable: import AwaitListener.* var state: State = stateUnused + val pureSrc= caps.unsafe.unsafeAssumePure(src) // we only use it for onComplete / dropListener // guarded by lock; null = before apply or after resume - private var sus: ac.support.Suspension[T | Null, Unit] | Null = null + private var sus: ac.support.Suspension[T | Null, Unit]^{Cap} | Null = null @volatile private var cancelRequest = false // if cancellation request received, checked after releasing lock // == Function, to be passed to suspend. Call this only once and before any other usage of this class. - def apply(sus: ac.support.Suspension[T | Null, Unit]): Unit = + def apply(sus: ac.support.Suspension[T | Null, Unit]^{Cap}): Unit = this.sus = sus this.link(group) // may resume + remove listener immediately - if !cancelled then src.onComplete(this) + if !cancelled then pureSrc.onComplete(this) // == Cancellable, to be registered with Async's CompletionGroup (see apply) def cancel(): Unit = @@ -160,7 +167,7 @@ object Future: finally numberedLock.unlock() // ignore concurrent cancelRequest // drop the listener without the lock held (might deadlock in Source otherwise) - if cancelledNow then src.dropListener(this) + if cancelledNow then pureSrc.dropListener(this) end cancelLocked // == ListenerLock, to be exposed by Listener interface (see lock) @@ -183,15 +190,15 @@ object Future: if cancelRequest then cancel() // == Listener, to be registered with Source (see apply) - val lock: Listener.ListenerLock | Null = this - def complete(data: T, source: Async.Source[T]): Unit = + val lock = this + def complete(data: T, source: Async.SourceSymbol[T]): Unit = // might have missed the cancelled -> but we ignore it -> still cancelled = false ac.support.resumeAsync(sus.asInstanceOf[ac.support.Suspension[T | Null, Unit]])(data) sus = null state = stateDone numberedLock.unlock() - inline def cancelled = state == stateCancelled + def cancelled = state == stateCancelled end AwaitListener object AwaitListener: type State = stateUnused.type | stateLocked.type | stateDone.type | stateCancelled.type @@ -202,20 +209,20 @@ object Future: /** Await a source first by polling it, and, if that fails, by suspending in a onComplete call. */ - override def await[U](src: Async.Source[U]): U = + override def await[U](src: Async.Source[U]^): U = if group.isCancelled then throw new CancellationException() src .poll() .getOrElse: - val listener = AwaitListener[U](src) - val res = ac.support.suspend(listener) // linking and src.onComplete happen in listener + val listener: AwaitListener[U]^{Cap} = AwaitListener[U](src) + val res = ac.support.suspend(susp => listener(susp)) // linking and src.onComplete happen in listener listener.unlink() if listener.cancelled then throw CancellationException() else res.asInstanceOf[U] // not cancelled -> result from Source -> type U (not U | Null, still could be null) - override def withGroup(group: CompletionGroup) = FutureAsync(group) + override def withGroup(group: CompletionGroup): Async = FutureAsync[Cap](group) override def cancel(): Unit = if setCancelled() then this.innerGroup.cancel() @@ -232,11 +239,17 @@ object Future: end RunnableFuture + /** Create a future that asynchronously executes `body` that wraps its execution in a [[scala.util.Try]]. The returned * future is linked to the given [[Async.Spawn]] scope by default, i.e. it is cancelled when this scope ends. */ - def apply[T](body: Async.Spawn ?=> T)(using async: Async, spawnable: Async.Spawn & async.type): Future[T] = - RunnableFuture(body) + def apply[T](body: Async.Spawn ?=> T)(using async: Async, spawnable: Async.Spawn)( + using async.type =:= spawnable.type + ): Future[T]^{body, spawnable} = + val f = (async: Async.Spawn) => body(using async) + val puref = caps.unsafe.unsafeAssumePure(f) + // SAFETY: body is recorded in the capture set of Future, which should be cancelled when gone out of scope. + RunnableFuture(async ?=> puref(async))(using spawnable) /** A future that is immediately completed with the given result. */ def now[T](result: Try[T]): Future[T] = @@ -253,12 +266,12 @@ object Future: /** A future that immediately rejects with the given exception. Similar to `Future.now(Failure(exception))`. */ inline def rejected(exception: Throwable): Future[Nothing] = now(Failure(exception)) - extension [T](f1: Future[T]) + extension [T](f1: Future[T]^) /** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise, * fail with the failure that was returned first. */ - def zip[U](f2: Future[U]): Future[(T, U)] = - Future.withResolver: r => + def zip[U](f2: Future[U]^): Future[(T, U)]^{f1, f2} = + Future.withResolver[(T, U), caps.CapSet^{f1, f2}]: r => Async .either(f1, f2) .onComplete(Listener { (v, _) => @@ -290,23 +303,23 @@ object Future: * @see * [[orWithCancel]] for an alternative version where the slower future is cancelled. */ - def or(f2: Future[T]): Future[T] = orImpl(false)(f2) + def or(f2: Future[T]^): Future[T]^{f1, f2} = orImpl(false)(f2) /** Like `or` but the slower future is cancelled. If either task succeeds, succeed with the success that was * returned first and the other is cancelled. Otherwise, fail with the failure that was returned last. */ - def orWithCancel(f2: Future[T]): Future[T] = orImpl(true)(f2) + def orWithCancel(f2: Future[T]^): Future[T]^{f1, f2} = orImpl(true)(f2) - inline def orImpl(inline withCancel: Boolean)(f2: Future[T]): Future[T] = Future.withResolver: r => + inline def orImpl(inline withCancel: Boolean)(f2: Future[T]^): Future[T]^{f1, f2} = Future.withResolver[T, {f1, f2}]: r => Async - .raceWithOrigin(f1, f2) + .raceWithOrigin[Try[T], {f1, f2}](f1, f2) .onComplete(Listener { case ((v, which), _) => v match case Success(value) => inline if withCancel then (if which == f1 then f2 else f1).cancel() r.resolve(value) case Failure(_) => - (if which == f1 then f2 else f1).onComplete(Listener((v, _) => r.complete(v))) + (if which == f1.symbol then f2 else f1).onComplete(Listener((v, _) => r.complete(v))) }) end extension @@ -339,7 +352,7 @@ object Future: /** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of * resolve/reject/complete may be used and only once. */ - trait Resolver[-T]: + trait Resolver[-T, Cap^]: /** Complete the future with a data item successfully */ def resolve(item: T): Unit = complete(Success(item)) @@ -356,7 +369,7 @@ object Future: * may be used. The handler should eventually complete the Future using one of complete/resolve/reject*. The * default handler is set up to [[rejectAsCancelled]] immediately. */ - def onCancel(handler: () => Unit): Unit + def onCancel(handler: (() -> Unit)^{Cap}): Unit end Resolver /** Create a promise that may be completed asynchronously using external means. @@ -366,19 +379,42 @@ object Future: * * If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]]. */ - def withResolver[T](body: Resolver[T] => Unit): Future[T] = - val future = new CoreFuture[T] with Resolver[T] with Promise[T] { - @volatile var cancelHandle = () => rejectAsCancelled() - override def onCancel(handler: () => Unit): Unit = cancelHandle = handler + def withResolver[T, Cap^](body: Resolver[T, Cap]^{Cap} => Unit): Future[T]^{Cap} = + val future: (CoreFuture[T] & Resolver[T, Cap] & Promise[T])^{Cap} = new CoreFuture[T] with Resolver[T, Cap] with Promise[T]: + // TODO: undo this once bug is fixed + @volatile var cancelHandle: (() -> Unit) = () => rejectAsCancelled() + override def onCancel(handler: (() -> Unit)^{Cap}): Unit = + cancelHandle = /* TODO remove */ caps.unsafe.unsafeAssumePure(handler) override def complete(result: Try[T]): Unit = super.complete(result) override def cancel(): Unit = if setCancelled() then cancelHandle() - } + end future body(future) future end withResolver + sealed abstract class BaseCollector[T, Cap^](): + private val ch = UnboundedChannel[Future[T]^{Cap}]() + + private val futMap = mutable.Map[SourceSymbol[Try[T]], Future[T]^{Cap}]() + + /** Output channels of all finished futures. */ + final def results: ReadableChannel[Future[T]^{Cap}] = ch.asReadable + + private val listener = Listener((_, fut) => + // safe, as we only attach this listener to Future[T] + val future = futMap.synchronized: + futMap.remove(fut.asInstanceOf[SourceSymbol[Try[T]]]).get + ch.sendImmediately(future) + ) + + protected final def addFuture(future: Future[T]^{Cap}) = + futMap.synchronized { futMap += (future.symbol -> future) } + future.onComplete(listener) + end BaseCollector + + /** Collects a list of futures into a channel of futures, arriving as they finish. * @example * {{{ @@ -394,29 +430,18 @@ object Future: * [[Future.awaitAll]] and [[Future.awaitFirst]] for simple usage of the collectors to get all results or the first * succeeding one. */ - class Collector[T](futures: Future[T]*): - private val ch = UnboundedChannel[Future[T]]() - - /** Output channels of all finished futures. */ - final def results = ch.asReadable - - private val listener = Listener((_, fut) => - // safe, as we only attach this listener to Future[T] - ch.sendImmediately(fut.asInstanceOf[Future[T]]) - ) - - protected final def addFuture(future: Future[T]) = future.onComplete(listener) - + class Collector[T, C^](futures: (Future[T]^{C})*) extends BaseCollector[T, caps.CapSet^{C}]: futures.foreach(addFuture) end Collector /** Like [[Collector]], but exposes the ability to add futures after creation. */ - class MutableCollector[T](futures: Future[T]*) extends Collector[T](futures*): + class MutableCollector[T, Cap^](futures: (Future[T]^{Cap})*) extends BaseCollector[T, Cap]: + futures.foreach(addFuture) /** Add a new [[Future]] into the collector. */ - inline def add(future: Future[T]) = addFuture(future) - inline def +=(future: Future[T]) = add(future) + inline def add(future: Future[T]^{Cap}) = addFuture(future) + inline def +=(future: Future[T]^{Cap}) = add(future) - extension [T](fs: Seq[Future[T]]) + extension [T, C^](fs: Seq[Future[T]^{C}]) /** `.await` for all futures in the sequence, returns the results in a sequence, or throws if any futures fail. */ def awaitAll(using Async) = val collector = Collector(fs*) @@ -442,7 +467,7 @@ object Future: def awaitFirstWithCancel(using Async): T = awaitFirstImpl(true) private inline def awaitFirstImpl(withCancel: Boolean)(using Async): T = - val collector = Collector(fs*) + val collector = Collector[T, C](fs*) @scala.annotation.tailrec def loop(attempt: Int): T = collector.results.read().right.get.awaitResult match @@ -475,10 +500,10 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T): def run()(using Async, AsyncOperations): T = body /** Start a future computed from the `body` of this task */ - def start()(using async: Async, spawn: Async.Spawn & async.type, asyncOps: AsyncOperations) = + def start()(using async: Async, spawn: Async.Spawn)(using asyncOps: AsyncOperations)(using async.type =:= spawn.type): Future[T]^{body, spawn} = Future(body)(using async, spawn) - def schedule(s: TaskSchedule): Task[T] = + def schedule(s: TaskSchedule): Task[T]^{body} = s match { case TaskSchedule.Every(millis, maxRepetitions) => assert(millis >= 1) diff --git a/shared/src/main/scala/async/listeners/locking.scala b/shared/src/main/scala/async/listeners/locking.scala index 3d24d012..e07ec390 100644 --- a/shared/src/main/scala/async/listeners/locking.scala +++ b/shared/src/main/scala/async/listeners/locking.scala @@ -1,6 +1,8 @@ /** Package listeners provide some auxilliary methods to work with listeners. */ package gears.async.listeners +import language.experimental.captureChecking + import gears.async._ import scala.annotation.tailrec @@ -10,7 +12,7 @@ import Listener.ListenerLock /** Two listeners being locked at the same time, while having the same [[Listener.ListenerLock.selfNumber lock number]]. */ case class ConflictingLocksException( - listeners: (Listener[?], Listener[?]) + listeners: (Listener[?]^, Listener[?]^) ) extends Exception /** Attempt to lock both listeners belonging to possibly different sources at the same time. Lock orders are respected @@ -23,16 +25,16 @@ case class ConflictingLocksException( * listeners. */ def lockBoth[T, U]( - lt: Listener[T], - lu: Listener[U] -): lt.type | lu.type | true = + lt: Listener[T]^, + lu: Listener[U]^ +): (lt.type | lu.type | true) = val lockT = if lt.lock == null then return (if lu.acquireLock() then true else lu) else lt.lock val lockU = if lu.lock == null then return (if lt.acquireLock() then true else lt) else lu.lock - inline def doLock[T, U](lt: Listener[T], lu: Listener[U])( - lockT: ListenerLock, - lockU: ListenerLock - ): lt.type | lu.type | true = + def doLock[T, U](lt: Listener[T]^, lu: Listener[U]^)( + lockT: ListenerLock^{lt}, + lockU: ListenerLock^{lu} + ): (lt.type | lu.type | true) = // assert(lockT.number > lockU.number) if !lockT.acquire() then lt else if !lockU.acquire() then diff --git a/shared/src/main/scala/async/package.scala b/shared/src/main/scala/async/package.scala index 1b97ea02..13066a18 100644 --- a/shared/src/main/scala/async/package.scala +++ b/shared/src/main/scala/async/package.scala @@ -1,5 +1,7 @@ package gears +import language.experimental.captureChecking + /** Asynchronous programming support with direct-style Scala. * @see * [[gears.async.Async]] for an introduction to the [[Async]] context and how to create them. diff --git a/shared/src/main/scala/async/retry.scala b/shared/src/main/scala/async/retry.scala index ed42f85d..d06507f5 100644 --- a/shared/src/main/scala/async/retry.scala +++ b/shared/src/main/scala/async/retry.scala @@ -1,5 +1,7 @@ package gears.async +import language.experimental.captureChecking + import gears.async.Async import gears.async.AsyncOperations.sleep import gears.async.Retry.Delay diff --git a/shared/src/test/scala/CCBehavior.scala b/shared/src/test/scala/CCBehavior.scala new file mode 100644 index 00000000..38dfb060 --- /dev/null +++ b/shared/src/test/scala/CCBehavior.scala @@ -0,0 +1,121 @@ +import language.experimental.captureChecking + +import gears.async.AsyncOperations.* +import gears.async.default.given +import gears.async.{Async, AsyncSupport, Future, uninterruptible} + +import java.util.concurrent.CancellationException +import scala.annotation.capability +import scala.concurrent.duration.{Duration, DurationInt} +import scala.util.Success +import gears.async.Channel +import gears.async.SyncChannel + +type Result[+T, +E] = Either[E, T] +object Result: + import scala.util.boundary + type Label[-T, -E] = boundary.Label[Result[T, E]] + // ^ opaque doesn't work? + + inline def apply[T, E](body: Label[T, E] ?=> T): Result[T, E] = + boundary(lbl ?=> Right(body(using lbl))) + + extension [U, E](r: Result[U, E])(using Label[Nothing, E]) + inline def ok: U = r match + case Left(value) => boundary.break(Left(value)) + case Right(value) => value + +class CaptureCheckingBehavior extends munit.FunSuite: + import Result.* + import caps.use + import scala.collection.mutable + + test("good") { + // don't do this in real code! capturing Async.blocking's Async context across functions is hard to track + Async.fromSync: async ?=> + def good1[T, E, C^](frs: List[Future[Result[T, E]]^{C}]): Future[Result[List[T], E]]^{C, async} = + Future: fut ?=> + Result[List[T], E]: ret ?=> + frs.map(_.await.ok) + + def good2[T, E, C^](rf: Result[Future[T]^{C}, E]): Future[Result[T, E]]^{C, async} = + Future: + Result[T, E]: + rf.ok.await // OK, Future argument has type Result[T] + + def useless4[T, E](fr: Future[Result[T, E]]^) = + fr.await.map(Future(_)) + } + + // test("bad - collectors") { + // val futs: Seq[Future[Int]^] = Async.blocking: async ?=> + // val fs: Seq[Future[Int]^{async}] = (0 to 10).map(i => Future { i }) + // fs + // Async.blocking: + // futs.awaitAll // should not compile + // } + + test("future withResolver capturing") { + class File(): + def close() = () + def read(callback: Int => Unit) = () + val f: File^ = File() + val read = Future.withResolver[Int, caps.CapSet^{f}]: r => + f.read(r.resolve) + r.onCancel(f.close) + } + + test("awaitAll/awaitFirst") { + trait File: + def readFut(): Future[Int]^{this} + object File: + def open[T](filename: String)(body: File^ => T)(using Async): T = body: + new File: + def readFut(): Future[Int]^{this} = Future.resolved(0) + + def readAll[C^](files: (File^{C})*): Seq[Future[Int]^{C}] = files.map(f => f.readFut()) + + Async.fromSync: + File.open("a.txt"): a => + val aa = a // TODO workaround + File.open("b.txt"): b => + val bb = b + val futs: Seq[Future[Int]^{aa, bb}] = readAll(aa, bb) + val allFuts = Future(futs.awaitAll) + allFuts + .await // uncomment to leak + } + + // test("channel") { + // trait File extends caps.Capability: + // def read(): Int = ??? + // Async.blocking: + // val ch = SyncChannel[File]() + // // Sender + // val sender = Future: + // val f = new File {} + // ch.send(f) + // val recv = Future: + // val f = ch.read().right.get + // f.read() + // } + + test("very bad") { + Async.fromSync: async ?=> + def fail3[T, E](fr: Future[Result[T, E]]^): Result[Any, Any] = + Result: label ?=> + Future: fut ?=> + fr.await.ok // error, escaping label from Result + + // val fut = Future(Left(5)) + // val res = fail3(fut) + // println(res.right.get.asInstanceOf[Future[Any]].awaitResult) + } + + // test("bad") { + // Async.blocking: async ?=> + // def fail3[T, E](fr: Future[Result[T, E]]^): Result[Future[T]^{async}, E] = + // Result: label ?=> + // Future: fut ?=> + // fr.await.ok // error, escaping label from Result + // } diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index edd40418..cbd616cf 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.* import gears.async.AsyncOperations.* import gears.async.default.given @@ -12,9 +14,9 @@ import boundary.break class CancellationBehavior extends munit.FunSuite: enum State: case Ready - case Initialized(f: Future[?]) + case Initialized case RunningEarly - case Running(f: Future[?]) + case Running case Failed(t: Throwable) case Cancelled case Completed @@ -30,19 +32,21 @@ class CancellationBehavior extends munit.FunSuite: state match case State.Ready => state = State.RunningEarly - case State.Initialized(f) => - state = State.Running(f) + case State.Initialized => + state = State.Running case _ => fail(s"running failed, state is $state") - def initialize(f: Future[?]) = + def initialize(f: Future[?]^) = synchronized: state match case State.Ready => - state = State.Initialized(f) + state = State.Initialized case State.RunningEarly => - state = State.Running(f) + state = State.Running case _ => fail(s"initializing failed, state is $state") - private def startFuture(info: Info, body: Async ?=> Unit = {})(using a: Async, s: Async.Spawn & a.type) = + private def startFuture(info: Info, body: Async ?=> Unit = {})(using a: Async, s: Async.Spawn)(using + a.type =:= s.type + ) = val f = Future: info.run() try @@ -71,7 +75,7 @@ class CancellationBehavior extends munit.FunSuite: test("group cancel"): var x = 0 Async.fromSync: - Async.group: + Async.group[Unit]: Future: sleep(400) x = 1 @@ -114,7 +118,7 @@ class CancellationBehavior extends munit.FunSuite: sleep(500) x1 = 1 x2 = 1 - Async.group: + Async.group: groupSpawn ?=> Async.current.group.cancel() // cancel now f.link() assertEquals(x1, 0) diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index 0127fcff..962b33f9 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.AsyncOperations.* import gears.async.default.given import gears.async.{ @@ -255,8 +257,8 @@ class ChannelBehavior extends munit.FunSuite { } val race = Async.race( (0 until 100).map(i => - Async.race((10 * i until 10 * i + 10).map(idx => channels(idx).readSource.transformValuesWith(_.right.get))*) - )* + Async.race((10 * i until 10 * i + 10).map(idx => channels(idx).readSource.transformValuesWith(_.right.get))) + ) ) var sum = 0 for i <- 0 until 1000 do sum += race.awaitResult @@ -282,7 +284,7 @@ class ChannelBehavior extends munit.FunSuite { val ch = SyncChannel[Int]() var timesSent = 0 val race = Async.race( - (for i <- 0 until 1000 yield ch.sendSource(i))* + (for i <- 0 until 1000 yield ch.sendSource(i)) ) Future { while race.awaitResult.isRight do { @@ -320,66 +322,66 @@ class ChannelBehavior extends munit.FunSuite { assert(Async.race(a.readSource, b.readSource).awaitResult.isLeft) } - test("ChannelMultiplexer multiplexes - all subscribers read the same stream") { - Async.fromSync: - val m = ChannelMultiplexer[Int]() - val c = SyncChannel[Int]() - m.addPublisher(c) - - val receivers = (1 to 3).map { _ => - val cr = SyncChannel[Try[Int]]() - m.addSubscriber(cr) - () => - (ac: Async) ?=> - val l = ArrayBuffer[Int]() - for i <- 1 to 4 do l += cr.read().right.get.get - assertEquals(l, ArrayBuffer[Int](1, 2, 3, 4)) - } - - Future { m.run() } - Future { - for i <- 1 to 4 do c.send(i) - } - - receivers.map(v => Future(v())).awaitAll - } - - test("ChannelMultiplexer multiple readers and writers") { - Async.fromSync: - val m = ChannelMultiplexer[Int]() - - val sendersCount = 3 - val sendersMessage = 4 - val receiversCount = 3 - - val senders = (0 until sendersCount).map { idx => - val cc = SyncChannel[Int]() - m.addPublisher(cc) - () => - (ac: Async) ?=> - for (i <- 0 until sendersMessage) - cc.send(i) - m.removePublisher(cc) - } - - val receivers = (0 until receiversCount).map { idx => - val cr = SyncChannel[Try[Int]]() - m.addSubscriber(cr) - () => - (ac: Async) ?=> - sleep(idx * 500) - var sum = 0 - for (i <- 0 until sendersCount * sendersMessage) { - sum += cr.read().right.get.get - } - assertEquals(sum, sendersMessage * (sendersMessage - 1) / 2 * sendersCount) - } - Future { m.run() } - - (senders ++ receivers) - .map(v => Future(v())) - .awaitAll - } + // test("ChannelMultiplexer multiplexes - all subscribers read the same stream") { + // Async.fromSync: + // val m = ChannelMultiplexer[Int]() + // val c = SyncChannel[Int]() + // m.addPublisher(c) + + // val receivers = (1 to 3).map { _ => + // val cr = SyncChannel[Try[Int]]() + // m.addSubscriber(cr) + // () => + // (ac: Async) ?=> + // val l = ArrayBuffer[Int]() + // for i <- 1 to 4 do l += cr.read().right.get.get + // assertEquals(l, ArrayBuffer[Int](1, 2, 3, 4)) + // } + + // Future { m.run() } + // Future { + // for i <- 1 to 4 do c.send(i) + // } + + // receivers.map(v => Future(v())).awaitAll + // } + + // test("ChannelMultiplexer multiple readers and writers") { + // Async.fromSync: + // val m = ChannelMultiplexer[Int]() + + // val sendersCount = 3 + // val sendersMessage = 4 + // val receiversCount = 3 + + // val senders = (0 until sendersCount).map { idx => + // val cc = SyncChannel[Int]() + // m.addPublisher(cc) + // () => + // (ac: Async) ?=> + // for (i <- 0 until sendersMessage) + // cc.send(i) + // m.removePublisher(cc) + // } + + // val receivers = (0 until receiversCount).map { idx => + // val cr = SyncChannel[Try[Int]]() + // m.addSubscriber(cr) + // () => + // (ac: Async) ?=> + // sleep(idx * 500) + // var sum = 0 + // for (i <- 0 until sendersCount * sendersMessage) { + // sum += cr.read().right.get.get + // } + // assertEquals(sum, sendersMessage * (sendersMessage - 1) / 2 * sendersCount) + // } + // Future { m.run() } + + // (senders ++ receivers) + // .map(v => Future(v())) + // .awaitAll + // } def getChannels = List(SyncChannel[Int](), BufferedChannel[Int](1024), UnboundedChannel[Int]()) } diff --git a/shared/src/test/scala/CollectorTests.scala b/shared/src/test/scala/CollectorTests.scala new file mode 100644 index 00000000..2484bc78 --- /dev/null +++ b/shared/src/test/scala/CollectorTests.scala @@ -0,0 +1,7 @@ +import language.experimental.captureChecking + +import gears.async.* +import gears.async.default.given + +// class CollectorTests extends munit.FunSuite: +// test("queues") diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index 33cb9d31..807b1ccb 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.* import gears.async.AsyncOperations.* import gears.async.Future.{Promise, zip} @@ -53,7 +55,7 @@ class FutureBehavior extends munit.FunSuite { } val res = a.or(b).await res - val _: Future[Int | Boolean] = z + val _: Future[Int | Boolean]^{z} = z assertEquals(x.await, 33) assertEquals(y.await, (22, 11)) } @@ -332,8 +334,8 @@ class FutureBehavior extends munit.FunSuite { } test("Future.withResolver cancel handler is not run after being completed") { - val num = AtomicInteger(0) - val fut = Future.withResolver[Int]: r => + val num: AtomicInteger^ = AtomicInteger(0) + val fut = Future.withResolver[Int, caps.CapSet^{num}]: r => r.onCancel { () => num.incrementAndGet() } r.resolve(1) fut.cancel() @@ -342,7 +344,7 @@ class FutureBehavior extends munit.FunSuite { test("Future.withResolver is only completed after handler decides") { val prom = Future.Promise[Unit]() - val fut = Future.withResolver[Unit]: r => + val fut = Future.withResolver[Unit, caps.CapSet]: r => r.onCancel(() => prom.onComplete(Listener { (_, _) => r.rejectAsCancelled() })) assert(fut.poll().isEmpty) @@ -415,25 +417,25 @@ class FutureBehavior extends munit.FunSuite { assert(!lastFutureFinished) } - test("future collection: awaitFirst*") { - Async.fromSync: - val range = (0 to 10) - def futs = range.map(i => Future { sleep(i * 100); i }) - assert(range contains futs.awaitFirst) - - val exc = new Exception("a") - def futsWithFail = futs ++ Seq(Future { throw exc }) - assert(range contains futsWithFail.awaitFirst) - - val excs = range.map(i => new Exception(i.toString())) - def futsAllFail = range.zip(excs).map((i, exc) => Future { sleep(i * 100); throw exc }) - assertEquals(Try(futsAllFail.awaitFirst), Failure(excs.last)) - - var lastFutureFinished = false - def futsWithSleepy = futsWithFail ++ Seq(Future { sleep(200000); lastFutureFinished = true; 0 }) - assert(range contains futsWithSleepy.awaitFirst) - assert(!lastFutureFinished) - } + // test("future collection: awaitFirst*") { + // Async.fromSync: + // val range = (0 to 10) + // def futs = range.map(i => Future { sleep(i * 100); i }) + // assert(range contains futs.awaitFirst) + + // val exc = new Exception("a") + // def futsWithFail = futs ++ Seq(Future { throw exc }) + // assert(range contains futsWithFail.awaitFirst) + + // val excs = range.map(i => new Exception(i.toString())) + // def futsAllFail = range.zip(excs).map((i, exc) => Future { sleep(i * 100); throw exc }) + // assertEquals(Try(futsAllFail.awaitFirst), Failure(excs.last)) + + // var lastFutureFinished = false + // def futsWithSleepy = futsWithFail ++ Seq(Future { sleep(200000); lastFutureFinished = true; 0 }) + // assert(range contains futsWithSleepy.awaitFirst) + // assert(!lastFutureFinished) + // } test("uninterruptible should continue even when Future is cancelled") { Async.fromSync: diff --git a/shared/src/test/scala/ListenerBehavior.scala b/shared/src/test/scala/ListenerBehavior.scala index f99c8646..475ecb1b 100644 --- a/shared/src/test/scala/ListenerBehavior.scala +++ b/shared/src/test/scala/ListenerBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.Async import gears.async.Async.Source import gears.async.Async.race @@ -36,7 +38,7 @@ class ListenerBehavior extends munit.FunSuite: var listener1Locked = false val listener1 = new Listener[Nothing]: val lock = null - def complete(data: Nothing, src: Async.Source[Nothing]): Unit = + def complete(data: Nothing, src: Async.SourceSymbol[Nothing]): Unit = fail("should not succeed") def release() = listener1Locked = false @@ -55,7 +57,7 @@ class ListenerBehavior extends munit.FunSuite: Async.race(source1).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 1))) Async.race(source2).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 2))) - assertEquals(lockBoth(source1.listener.get, source2.listener.get), true) + assert(lockBoth(source1.listener.get, source2.listener.get) == true) source1.completeWith(1) source2.completeWith(2) @@ -66,7 +68,7 @@ class ListenerBehavior extends munit.FunSuite: Async.race(source1).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 1))) Async.race(source2).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 2))) - assertEquals(lockBoth(source2.listener.get, source1.listener.get), true) + assert(lockBoth(source2.listener.get, source1.listener.get) == true) source1.completeWith(1) source2.completeWith(2) @@ -78,7 +80,7 @@ class ListenerBehavior extends munit.FunSuite: Async.race(Async.race(source2)).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 2))) Async.race(race1).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 1))) - assertEquals(lockBoth(source1.listener.get, source2.listener.get), true) + assert(lockBoth(source1.listener.get, source2.listener.get) == true) source1.completeWith(1) source2.completeWith(2) @@ -90,7 +92,7 @@ class ListenerBehavior extends munit.FunSuite: Async.race(Async.race(source2)).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 2))) Async.race(race1).onComplete(Listener.acceptingListener[Int]((x, _) => assertEquals(x, 1))) - assertEquals(lockBoth(source2.listener.get, source1.listener.get), true) + assert(lockBoth(source2.listener.get, source1.listener.get) == true) source1.completeWith(1) source2.completeWith(2) @@ -139,9 +141,9 @@ class ListenerBehavior extends munit.FunSuite: test("race polling"): val source1 = new Async.Source[Int](): - override def poll(k: Listener[Int]): Boolean = k.completeNow(1, this) || true - override def onComplete(k: Listener[Int]): Unit = ??? - override def dropListener(k: Listener[Int]): Unit = ??? + override def poll(k: Listener[Int]^): Boolean = k.completeNow(1, this) || true + override def onComplete(k: Listener[Int]^): Unit = ??? + override def dropListener(k: Listener[Int]^): Unit = ??? val source2 = TSource() val listener = TestListener(1) @@ -188,31 +190,31 @@ class ListenerBehavior extends munit.FunSuite: ??? catch case ConflictingLocksException(base) => - assertEquals(base, (l1, l2)) + assert(base == (l1, l2)) try lockBoth(l2, l1) ??? catch case ConflictingLocksException(base) => - assertEquals(base, (l2, l1)) + assert(base == (l2, l1)) try lockBoth(l, l2) ??? catch case ConflictingLocksException(base) => - assertEquals(base, (l, l2)) + assert(base == (l, l2)) try lockBoth(l1, l) ??? catch case ConflictingLocksException(base) => - assertEquals(base, (l1, l)) + assert(base == (l1, l)) try lockBoth(l, l) ??? catch case ConflictingLocksException(base) => - assertEquals(base, (l, l)) + assert(base == (l, l)) test("failing downstream listener is dropped in race"): val source1 = TSource() @@ -247,7 +249,7 @@ class ListenerBehavior extends munit.FunSuite: private class TestListener(expected: Int)(using asst: munit.Assertions) extends Listener[Int]: val lock = null - def complete(data: Int, source: Source[Int]): Unit = + def complete(data: Int, source: Async.SourceSymbol[Int]): Unit = asst.assertEquals(data, expected) private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, expected: Int)(using munit.Assertions) @@ -279,19 +281,20 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, /** Dummy source that never completes */ private object Dummy extends Async.Source[Nothing]: - def poll(k: Listener[Nothing]): Boolean = false - def onComplete(k: Listener[Nothing]): Unit = () - def dropListener(k: Listener[Nothing]): Unit = () + def poll(k: Listener[Nothing]^): Boolean = false + def onComplete(k: Listener[Nothing]^): Unit = () + def dropListener(k: Listener[Nothing]^): Unit = () private class TSource(using asst: munit.Assertions) extends Async.Source[Int]: var listener: Option[Listener[Int]] = None - def poll(k: Listener[Int]): Boolean = false - def onComplete(k: Listener[Int]): Unit = + def poll(k: Listener[Int]^): Boolean = false + def onComplete(k: Listener[Int]^): Unit = + import caps.unsafe.unsafeAssumePure assert(listener.isEmpty) - listener = Some(k) - def dropListener(k: Listener[Int]): Unit = + listener = Some(k.unsafeAssumePure) + def dropListener(k: Listener[Int]^): Unit = if listener.isDefined then - asst.assertEquals(k, listener.get) + asst.assert(k == listener.get) listener = None def lockListener() = val r = listener.get.acquireLock() diff --git a/shared/src/test/scala/ResourceBehavior.scala b/shared/src/test/scala/ResourceBehavior.scala index 6d3519fa..d9daa871 100644 --- a/shared/src/test/scala/ResourceBehavior.scala +++ b/shared/src/test/scala/ResourceBehavior.scala @@ -25,7 +25,7 @@ class ResourceBehavior extends munit.FunSuite { container.assertInitial() val res = container.res.allocated try container.waitAcquired() - finally res._2 + finally res.cleanup container.waitReleased() def mappedUse(container: Container) = @@ -35,7 +35,7 @@ class ResourceBehavior extends munit.FunSuite { "a" container.assertInitial() res.use: str => - assertEquals(str, "a") + assertEquals(str.item: String, "a") container.waitAcquired() container.waitReleased() @@ -48,13 +48,13 @@ class ResourceBehavior extends munit.FunSuite { val ress = res.allocated try - assertEquals(ress._1, "a") + assertEquals(ress.item: String, "a") container.waitAcquired() - finally ress._2 + finally ress.cleanup container.waitReleased() for - (implName, impl) <- Seq(("apply", () => ResContainer()), ("Future", () => AsyncResContainer())) + (implName, impl) <- Seq(("apply", () => ResContainer())) (testName, testCase) <- Seq( ("use", use), ("allocated", allocated), @@ -63,15 +63,24 @@ class ResourceBehavior extends munit.FunSuite { ) do test(s"$implName - $testName")(testCase(impl())) - test("leak future") { + // test("leak future") { + // Async.fromSync: + // val container = AsyncResContainer() + // val res = Async.group: + // container.res.allocated + // container.waitAcquired() + // res.cleanup + // container.waitReleased() + // } + + test("leak"): Async.fromSync: - val container = AsyncResContainer() - val res = Async.group: - container.res.allocated - container.waitAcquired() - res._2 - container.waitReleased() - } + class A() + val r = new Resource[A]: + def allocated(using Async): Pear = new Pear: + val item: A = A() + def cleanup(using Async) = () + val leak = r.use[A](p => p.item) abstract class Container: var acq = Promise[Unit]() @@ -111,19 +120,19 @@ class ResourceBehavior extends munit.FunSuite { _ => { assertAcquiredNow(); setReleased() } ) - class AsyncResContainer extends Container: - val ch = SyncChannel[Unit]() + // class AsyncResContainer extends Container: + // val ch = SyncChannel[Unit]() - override def waitAcquired()(using Async): Unit = ch.read().right.get + // override def waitAcquired()(using Async): Unit = ch.read().right.get - val res = Resource.spawning(Future { - assertInitial() - setAcquired() - while true do ch.send(()) - }.onComplete(Listener.acceptingListener { (tryy, _) => - assert(tryy.isFailure) - assertAcquiredNow() - setReleased() - })) + // val res = Resource.spawning(Future { + // assertInitial() + // setAcquired() + // while true do ch.send(()) + // }.onComplete(Listener.acceptingListener { (tryy, _) => + // assert(tryy.isFailure) + // assertAcquiredNow() + // setReleased() + // })) } diff --git a/shared/src/test/scala/RetryBehavior.scala b/shared/src/test/scala/RetryBehavior.scala index 322fc5e3..d0056cad 100644 --- a/shared/src/test/scala/RetryBehavior.scala +++ b/shared/src/test/scala/RetryBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.default.given import gears.async.{Async, Future, Retry, Task, TaskSchedule} diff --git a/shared/src/test/scala/SchedulerBehavior.scala b/shared/src/test/scala/SchedulerBehavior.scala index ce598062..a5ddb574 100644 --- a/shared/src/test/scala/SchedulerBehavior.scala +++ b/shared/src/test/scala/SchedulerBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.AsyncOperations.* import gears.async.Future.Promise import gears.async.default.given diff --git a/shared/src/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala index 8890cc81..b86a71d7 100644 --- a/shared/src/test/scala/SourceBehavior.scala +++ b/shared/src/test/scala/SourceBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.* import gears.async.AsyncOperations.* import gears.async.default.given @@ -56,7 +58,7 @@ class SourceBehavior extends munit.FunSuite { Async.fromSync: val timeBefore = System.currentTimeMillis() val f = Future { - sleep(50); + sleep(50) Future { sleep(70) Future { @@ -82,7 +84,7 @@ class SourceBehavior extends munit.FunSuite { test("poll()") { Async.fromSync: - val f: Future[Int] = Future { + val f = Future { sleep(100) 1 } @@ -131,7 +133,7 @@ class SourceBehavior extends munit.FunSuite { test("transform values with") { Async.fromSync: - val f: Future[Int] = Future { 10 } + val f = Future { 10 } assertEquals( f.transformValuesWith: case Success(i) => i + 1 @@ -139,7 +141,7 @@ class SourceBehavior extends munit.FunSuite { .awaitResult, 11 ) - val g: Future[Int] = Future.now(Failure(AssertionError(1123))) + val g = Future.now(Failure(AssertionError(1123))) assertEquals( g.transformValuesWith: case Failure(_) => 17 @@ -154,7 +156,7 @@ class SourceBehavior extends munit.FunSuite { var aRan = Future.Promise[Unit]() var bRan = Future.Promise[Unit]() val wait = Future.Promise[Unit]() - val f: Future[Int] = Future { + val f = Future { wait.await 10 } diff --git a/shared/src/test/scala/Stress.scala b/shared/src/test/scala/Stress.scala index c24c66b5..bb0b260b 100644 --- a/shared/src/test/scala/Stress.scala +++ b/shared/src/test/scala/Stress.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.AsyncOperations.* import gears.async.Future.MutableCollector import gears.async.Timer @@ -12,12 +14,12 @@ class StressTest extends munit.FunSuite: test("survives a stress test that hammers on creating futures") { val total = 200_000L - Async.fromSync: + Async.fromSync: ac ?=> Seq[Long](1, 2, 4, 16, 10000).foreach: parallelism => val k = AtomicInteger(0) def compute(using Async) = k.incrementAndGet() - val collector = MutableCollector((1L to parallelism).map(_ => Future { compute })*) + val collector = MutableCollector[Int, caps.CapSet^{ac}]((1L to parallelism).map(_ => Future { compute })*) var sum = 0L for i <- parallelism + 1 to total do sum += collector.results.read().right.get.await diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index 0782b1cd..623fd757 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.default.given import gears.async.{Async, Future, Task, TaskSchedule} diff --git a/shared/src/test/scala/TimerBehavior.scala b/shared/src/test/scala/TimerBehavior.scala index 2e229d73..5e46094f 100644 --- a/shared/src/test/scala/TimerBehavior.scala +++ b/shared/src/test/scala/TimerBehavior.scala @@ -1,3 +1,5 @@ +import language.experimental.captureChecking + import gears.async.AsyncOperations._ import gears.async._ @@ -25,7 +27,7 @@ class TimerBehavior extends munit.FunSuite { assert(timer.src.awaitResult == timer.TimerEvent.Tick) } - def `cancel future after timeout`[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Try[T] = + def `cancel future after timeout`[T](d: Duration, f: Future[T]^)(using Async, AsyncOperations): Try[T] = Async.group: f.link() val t = Future { sleep(d.toMillis) }