Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e762d85
Update scala version to add cc
natsukagami Apr 15, 2024
25ca9a7
Update cc for AsyncSupport
natsukagami Apr 15, 2024
bc9afb3
Properly add scala2-library-cc
natsukagami Apr 16, 2024
d28e84b
Async & Listener should almost compile
natsukagami Apr 16, 2024
54724a3
Add select and race interfaces
natsukagami Apr 17, 2024
914cefb
Async / Futures
natsukagami Apr 19, 2024
c78a5de
add cc to ListenerBehavior
natsukagami Apr 19, 2024
0052a02
Cancellation tests
natsukagami Apr 19, 2024
2c7c131
Timer and AsyncOperations
natsukagami Apr 19, 2024
dd184a1
Cancellable workaround
natsukagami Apr 19, 2024
5b1f712
Add tests for capture checking, per #67
natsukagami Apr 19, 2024
531d777
Uncomment mutable collector test
natsukagami Apr 20, 2024
e08dbfa
Make .await inline again, courtesy of scala/scala3#20241
natsukagami Apr 20, 2024
b06a9db
Update to latest compiler
natsukagami May 7, 2024
eed12ea
WIP
natsukagami Jun 17, 2024
511aad3
More WIP
natsukagami Jul 9, 2024
6af2d7a
Things compile again
natsukagami Jul 10, 2024
8d1c0be
Cancellables not needed
natsukagami Jul 10, 2024
c29822b
Update flake
natsukagami Jul 10, 2024
c49ee6f
Restore separate scheduler
natsukagami Jul 10, 2024
a7cf920
Minify changes to Async
natsukagami Jul 10, 2024
3081de6
Inline is now usable
natsukagami Jul 11, 2024
0bc53a4
Keep {src1, src2}
natsukagami Jul 11, 2024
1612af4
Update to current build
natsukagami Jul 12, 2024
5830cb2
Minimize changes
natsukagami Jul 12, 2024
9ce56ce
Minimize changes in futures
natsukagami Jul 12, 2024
74cde6d
Add cc to channels (mostly unsafe)
natsukagami Jul 12, 2024
73746f9
Workaround for spread arguments
natsukagami Aug 19, 2024
dfe9c53
Make MutableCollector's capture set explicit
natsukagami Aug 19, 2024
aca8282
Allow capturing futures in and/or/awaitAll/awaitFirst
natsukagami Aug 19, 2024
79ca7fb
Enable cc on the rest of the files
natsukagami Aug 19, 2024
80d09d3
Have explicit capture set parameter for withResolver cancellation
natsukagami Aug 26, 2024
ba5a2e7
Have explicit boundary/suspend capture sets
natsukagami Aug 26, 2024
2aaa0be
Adapt to latest main
natsukagami Aug 29, 2024
6f5542a
Let the tests compile
natsukagami Sep 25, 2024
71606fc
Update Scala version to latest snapshot, stop trying to compile on na…
natsukagami Jan 20, 2025
9cc903e
Reanme caps.unbox -> caps.use
natsukagami Jan 20, 2025
eb228ef
WIP
natsukagami Apr 25, 2025
00e10bf
Merge remote-tracking branch 'upstream/main' into add-cc
natsukagami May 6, 2025
a23298e
Get gears to compile again
natsukagami May 6, 2025
be34464
Get tests to compile for JVM
natsukagami May 6, 2025
b0d8d2d
Get tests to compile for JS and Native
natsukagami May 6, 2025
d88cde5
Let CC test pass
natsukagami May 6, 2025
fe57bfe
Merge remote-tracking branch 'upstream/main' into add-cc
natsukagami May 6, 2025
b38d1d1
Try to make Resource safe
natsukagami May 9, 2025
6188029
Update to Scala 3.8.0-RC1-bin-SNAPSHOT with capture-checked stdlib
natsukagami Aug 16, 2025
583ca69
Get gears tests to compile
natsukagami Aug 17, 2025
d904e4e
Should compile again, minus some tests...
natsukagami Aug 20, 2025
f49657d
Tests should pass for JVM
natsukagami Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rewrite {
expand = false
sort = ascii
groups = [
["language\\..*"],
["gears\\..*"],
["java.?\\..*", "scala\\..*"],
]
Expand Down
12 changes: 9 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import org.scalajs.linker.interface.ESVersion
import sbtcrossproject.CrossPlugin.autoImport.{CrossType, crossProject}
import scalanative.build._

ThisBuild / scalaVersion := "3.3.5"
val scala3Version = "3.8.0-RC1-bin-20250819-1f13619-NIGHTLY"
val scala = scala3Version
ThisBuild / scalaVersion := scala
ThisBuild / resolvers += ("Artifactory" at "https://repo.scala-lang.org/artifactory/maven-nightlies/")

publish / skip := true

Expand All @@ -28,7 +31,11 @@ lazy val root =
name := "Gears",
versionScheme := Some("early-semver"),
libraryDependencies += "org.scalameta" %%% "munit" % "1.1.1" % Test,
testFrameworks += new TestFramework("munit.Framework")
testFrameworks += new TestFramework("munit.Framework"),
scalacOptions ++= Seq(
// "-Ycc-debug",
// "-Xprint:cc"
)
)
)
.jvmSettings(
Expand All @@ -45,7 +52,6 @@ lazy val root =
)
.jsSettings(
Seq(
scalaVersion := "3.7.0",
// Emit ES modules with the Wasm backend
scalaJSLinkerConfig := {
scalaJSLinkerConfig.value
Expand Down
25 changes: 15 additions & 10 deletions js/src/main/scala/async/WasmJSPISuspend.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gears.async.js

import language.experimental.captureChecking

import gears.async.*

import scala.compiletime.uninitialized
Expand Down Expand Up @@ -28,7 +30,7 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport:
* Due to the promise possibly changing over time, within [[boundary]], we have to dynamically resolve the reference
* _after_ running the `body`.
*/
protected class WasmLabel[T]():
protected class WasmLabel[T]() extends caps.SharedCapability:
var (promise, resolve) = mkPromise[T]

def reset() =
Expand All @@ -37,24 +39,24 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport:
resolve = q

/** Creates a new [[js.Promise]] and returns both the Promise and its `resolve` function. */
inline def mkPromise[T]: (js.Promise[T], T => Any) =
var resolve: (T => Any) | Null = null
inline def mkPromise[T]: (js.Promise[T], T -> Any) =
var resolve: (T -> Any) | Null = null
val promise = js.Promise[T]((res, rej) => resolve = res)
(promise, resolve)

protected class WasmSuspension[-T, +R](label: Label[R], resolve: T => Any) extends gears.async.Suspension[T, R]:
protected class WasmSuspension[-T, +R](label: WasmLabel[R], resolve: T => Any) extends gears.async.Suspension[T, R]:
def resume(arg: T): R =
label.reset()
resolve(arg)
JSPI.await(label.promise)

// Implementation of the [[SuspendSupport]] interface.

opaque type Label[T] = WasmLabel[T]
type Label[T, Cap^] = WasmLabel[T]

opaque type Suspension[-T, +R] <: gears.async.Suspension[T, R] = WasmSuspension[T, R]
type Suspension[-T, +R] = WasmSuspension[T, R]

override def boundary[T](body: Label[T] ?=> T): T =
override def boundary[T, Cap^](body: Label[T, Cap]^ ?->{Cap} T): T =
val label = WasmLabel[T]()
JSPI.async:
val r = body(using label)
Expand All @@ -66,10 +68,13 @@ trait WasmJSPISuspend(using AsyncToken) extends SuspendSupport:
* @note
* Should return immediately if resume is called from within body
*/
override def suspend[T, R](body: Suspension[T, R] => R)(using label: Label[R]): T =
override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap} ->{Cap} R)(using label: Label[R, Cap]^): T =
val (suspPromise, suspResolve) = mkPromise[T]
val suspend = WasmSuspension[T, R](label, suspResolve)
label.resolve(body(suspend))
label.resolve(body(
// SAFETY: will only be stored and returned by the Suspension resumption mechanism
caps.unsafe.unsafeAssumePure(suspend)
))
JSPI.await(suspPromise)
end WasmJSPISuspend

Expand Down Expand Up @@ -102,7 +107,7 @@ final class WasmAsyncSupport(using AsyncToken) extends AsyncSupport with WasmJSP
*/
private[async] class JsAsync(val group: CompletionGroup)(using support: WasmAsyncSupport, sched: JsAsyncScheduler.type)
extends Async(using support, sched):
override def await[T](src: Async.Source[T]) =
override def await[T](src: Async.Source[T]^) =
src
.poll()
.getOrElse:
Expand Down
17 changes: 10 additions & 7 deletions jvm/src/main/scala/PosixLikeIO/PIO.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package PosixLikeIO

import language.experimental.captureChecking
import caps.CapSet

import gears.async.Scheduler
import gears.async.default.given
import gears.async.{Async, Future}
Expand All @@ -17,7 +20,7 @@ import scala.util.{Failure, Success, Try}
import Future.Promise

object File:
extension (resolver: Future.Resolver[Int])
extension[Cap^] (resolver: Future.Resolver[Int, Cap])
private[File] def toCompletionHandler = new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit = resolver.resolve(result)
override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e)
Expand All @@ -44,7 +47,7 @@ class File(val path: String) {
def read(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

Future.withResolver[Int]: resolver =>
Future.withResolver[Int, CapSet]: resolver =>
channel.get.read(
buffer,
0,
Expand All @@ -57,7 +60,7 @@ class File(val path: String) {
assert(size >= 0)

val buffer = ByteBuffer.allocate(size)
Future.withResolver[String]: resolver =>
Future.withResolver[String, CapSet]: resolver =>
channel.get.read(
buffer,
0,
Expand All @@ -72,7 +75,7 @@ class File(val path: String) {
def write(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

Future.withResolver[Int]: resolver =>
Future.withResolver[Int, CapSet]: resolver =>
channel.get.write(
buffer,
0,
Expand Down Expand Up @@ -114,7 +117,7 @@ class SocketUDP() {
def send(data: ByteBuffer, address: String, port: Int): Future[Unit] =
assert(socket.isDefined)

Future.withResolver: resolver =>
Future.withResolver[Unit, CapSet]: resolver =>
resolver.spawn:
val packet: DatagramPacket =
new DatagramPacket(data.array(), data.limit(), InetAddress.getByName(address), port)
Expand All @@ -123,7 +126,7 @@ class SocketUDP() {
def receive(): Future[DatagramPacket] =
assert(socket.isDefined)

Future.withResolver: resolver =>
Future.withResolver[DatagramPacket, CapSet]: resolver =>
resolver.spawn:
val buffer = Array.fill[Byte](10 * 1024)(0)
val packet: DatagramPacket = DatagramPacket(buffer, 10 * 1024)
Expand All @@ -138,7 +141,7 @@ class SocketUDP() {
}

object SocketUDP:
extension [T](resolver: Future.Resolver[T])
extension [T, Cap^](resolver: Future.Resolver[T, Cap])
private[SocketUDP] inline def spawn(body: => T)(using s: Scheduler) =
s.execute(() =>
resolver.complete(Try(body).recover { case _: InterruptedException =>
Expand Down
2 changes: 1 addition & 1 deletion jvm/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import gears.async._

given AsyncOperations = JvmAsyncOperations
given VThreadSupport.type = VThreadSupport
given VThreadSupport.Scheduler = VThreadScheduler
given VThreadScheduler.type = VThreadScheduler
2 changes: 2 additions & 0 deletions jvm/src/main/scala/async/JvmAsyncOperations.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gears.async

import language.experimental.captureChecking

object JvmAsyncOperations extends AsyncOperations:
override def sleep(millis: Long)(using Async): Unit =
jvmInterruptible(Thread.sleep(millis))
Expand Down
40 changes: 27 additions & 13 deletions jvm/src/main/scala/async/VThreadSupport.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package gears.async

import language.experimental.captureChecking

import java.lang.invoke.{MethodHandles, VarHandle}
import java.util.concurrent.locks.ReentrantLock
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import scala.annotation.constructorOnly
import scala.collection.mutable

object VThreadScheduler extends Scheduler:
private val VTFactory = Thread
Expand All @@ -14,10 +18,18 @@ object VThreadScheduler extends Scheduler:
override def execute(body: Runnable): Unit =
val th = VTFactory.newThread(body)
th.start()
()

private[gears] inline def unsafeExecute(body: Runnable^): Unit = execute(caps.unsafe.unsafeAssumePure(body))

override def schedule(delay: FiniteDuration, body: Runnable): Cancellable =
import caps.unsafe.unsafeAssumePure

override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduledRunnable(delay, body)
val sr = ScheduledRunnable(delay, body)
// SAFETY: should not be able to access body, only for cancellation
sr.unsafeAssumePure: Cancellable

private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable) extends Cancellable {
private final class ScheduledRunnable(delay: FiniteDuration, body: Runnable) extends Cancellable:
@volatile var interruptGuard = true // to avoid interrupting the body

val th = VTFactory.newThread: () =>
Expand All @@ -28,7 +40,7 @@ object VThreadScheduler extends Scheduler:

final override def cancel(): Unit =
if ScheduledRunnable.interruptGuardVar.getAndSet(this, false) then th.interrupt()
}
end ScheduledRunnable

private object ScheduledRunnable:
val interruptGuardVar =
Expand All @@ -38,10 +50,9 @@ object VThreadScheduler extends Scheduler:
.findVarHandle(classOf[ScheduledRunnable], "interruptGuard", classOf[Boolean])

object VThreadSupport extends AsyncSupport:

type Scheduler = VThreadScheduler.type

private final class VThreadLabel[R]():
final class VThreadLabel[R]() extends caps.SharedCapability:
private var result: Option[R] = None
private val lock = ReentrantLock()
private val cond = lock.newCondition()
Expand All @@ -65,11 +76,11 @@ object VThreadSupport extends AsyncSupport:
result.get
finally lock.unlock()

override opaque type Label[R] = VThreadLabel[R]
override type Label[R, Cap >: caps.CapSet <: caps.CapSet^] = VThreadLabel[R]

// outside boundary: waiting on label
// inside boundary: waiting on suspension
private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: Label[R] @uncheckedVariance)
private final class VThreadSuspension[-T, +R](using private[VThreadSupport] val l: VThreadLabel[R] @uncheckedVariance)
extends gears.async.Suspension[T, R]:
private var nextInput: Option[T] = None
private val lock = ReentrantLock()
Expand Down Expand Up @@ -98,9 +109,9 @@ object VThreadSupport extends AsyncSupport:

override opaque type Suspension[-T, +R] <: gears.async.Suspension[T, R] = VThreadSuspension[T, R]

override def boundary[R](body: (Label[R]) ?=> R): R =
override def boundary[R, Cap^](body: Label[R, Cap]^ ?->{Cap} R): R =
val label = VThreadLabel[R]()
VThreadScheduler.execute: () =>
VThreadScheduler.unsafeExecute: () =>
val result = body(using label)
label.setResult(result)

Expand All @@ -110,13 +121,16 @@ object VThreadSupport extends AsyncSupport:
suspension.l.clearResult()
suspension.setInput(arg)

override def scheduleBoundary(body: (Label[Unit]) ?=> Unit)(using Scheduler): Unit =
override def scheduleBoundary(body: Label[Unit, {}] ?-> Unit)(using Scheduler): Unit =
VThreadScheduler.execute: () =>
val label = VThreadLabel[Unit]()
body(using label)

override def suspend[T, R](body: Suspension[T, R] => R)(using l: Label[R]): T =
val sus = new VThreadSuspension[T, R]()
override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap} ->{Cap} R)(using l: Label[R, Cap]^): T =
val sus = new VThreadSuspension[T, R](using caps.unsafe.unsafeAssumePure(l))
val res = body(sus)
l.setResult(res)
l.setResult(
// SAFETY: will only be stored and returned by the Suspension resumption mechanism
caps.unsafe.unsafeAssumePure(res)
)
sus.waitInput()
2 changes: 2 additions & 0 deletions jvm/src/main/scala/measurements/measureTimes.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package measurements

import language.experimental.captureChecking

import gears.async.default.given
import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel}

Expand Down
2 changes: 2 additions & 0 deletions jvm/src/test/scala/CancellationBehavior.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import language.experimental.captureChecking

import gears.async.AsyncOperations.*
import gears.async.default.given
import gears.async.{Async, AsyncSupport, Future, uninterruptible}
Expand Down
2 changes: 1 addition & 1 deletion native/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import gears.async.native.ForkJoinSupport

object DefaultSupport extends ForkJoinSupport

given AsyncSupport = DefaultSupport
given DefaultSupport.type = DefaultSupport
given DefaultSupport.Scheduler = DefaultSupport
given AsyncOperations = DefaultSupport
18 changes: 12 additions & 6 deletions native/src/main/scala/async/ForkJoinSupport.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gears.async.native

import language.experimental.captureChecking

import gears.async.Future.Promise
import gears.async._

Expand All @@ -15,14 +17,18 @@ class NativeContinuation[-T, +R] private[native] (val cont: T => R) extends Susp
def resume(arg: T): R = cont(arg)

trait NativeSuspend extends SuspendSupport:
type Label[R] = nativeContinuations.BoundaryLabel[R]
import caps.unsafe.unsafeAssumePure
type Label[R, Cap^] = nativeContinuations.BoundaryLabel[R]
type Suspension[T, R] = NativeContinuation[T, R]

override def boundary[R](body: (Label[R]) ?=> R): R =
nativeContinuations.boundary(body)
override def boundary[R, Cap^](body: (Label[R, Cap]^) ?->{Cap^} R): R =
val f = (l: Label[R, Cap]^) => body(using l)
val pf = f.unsafeAssumePure
run(v ?=> pf(v))

override def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T =
nativeContinuations.suspend[T, R](f => body(NativeContinuation(f)))
override def suspend[T, R, Cap^](body: Suspension[T, R]^{Cap^} ->{Cap^} R)(using Label[R, Cap]^): T =
val pbody = body.unsafeAssumePure
nativeContinuations.suspend[T, R](f => pbody(NativeContinuation(f)))
end NativeSuspend

/** Spawns a single thread that does all the sleeping. */
Expand Down Expand Up @@ -82,7 +88,7 @@ class SuspendExecutorWithSleep(exec: ExecutionContext)
with AsyncSupport
with AsyncOperations
with NativeSuspend {
type Scheduler = this.type
type Scheduler = ExecutorWithSleepThread
}

class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool())
6 changes: 6 additions & 0 deletions native/src/main/scala/async/ForkJoinWithoutCaptures.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package gears.async.native

import scalanative.runtime.{Continuations => nativeContinuations}

def run[R](body: nativeContinuations.BoundaryLabel[R] ?=> R): R =
nativeContinuations.boundary(body) // SAFETY: tracked by this package's mechanism
Loading