Skip to content

Commit 4c87d52

Browse files
committed
refactored Expirable.caputure to return 3-variant value; added support of cps generators
1 parent f3186fb commit 4c87d52

29 files changed

+598
-197
lines changed

build.sbt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ val sharedSettings = Seq(
1010
scalaVersion := dottyVersion,
1111
name := "scala-gopher",
1212
resolvers += "Local Ivy Repository" at "file://"+Path.userHome.absolutePath+"/.ivy2/local",
13-
libraryDependencies += "com.github.rssh" %%% "dotty-cps-async" % "0.9.2",
13+
libraryDependencies += "com.github.rssh" %%% "dotty-cps-async" % "0.9.3-SNAPSHOT",
1414
libraryDependencies += "org.scalameta" %%% "munit" % "0.7.27" % Test,
1515
)
1616

@@ -34,6 +34,14 @@ lazy val gopher = crossProject(JSPlatform, JVMPlatform)
3434
.disablePlugins(SitePreviewPlugin)
3535
.jvmSettings(
3636
scalacOptions ++= Seq( "-unchecked", "-Ycheck:macros", "-uniqid", "-Xprint:types" ),
37+
fork := true,
38+
/*
39+
javaOptions ++= Seq(
40+
"--add-opens",
41+
"java.base/java.lang=ALL-UNNAMED",
42+
s"-javaagent:${System.getProperty("user.home")}/.ivy2/local/com.github.rssh/trackedfuture_3/0.5.0/jars/trackedfuture_3-assembly.jar"
43+
)
44+
*/
3745
).jsSettings(
3846
libraryDependencies += ("org.scala-js" %%% "scalajs-java-logging" % "1.0.0").cross(CrossVersion.for3Use2_13),
3947
// TODO: switch to ModuleES ?

js/src/main/scala/gopher/JSGopher.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package gopher
22

3-
import cps._
3+
import cps.*
44
import java.util.Timer
5-
import java.util.logging._
6-
import scala.concurrent.duration._
5+
import java.util.logging.*
6+
import scala.concurrent.ExecutionContext
7+
import scala.concurrent.duration.*
8+
import scala.scalajs.concurrent.*
79

810
class JSGopher[F[_]:CpsSchedulingMonad](cfg: JSGopherConfig) extends Gopher[F]:
911

@@ -28,6 +30,8 @@ class JSGopher[F[_]:CpsSchedulingMonad](cfg: JSGopherConfig) extends Gopher[F]:
2830
def log(level: Level, message: String, ex: Throwable| Null): Unit =
2931
currentLogFun.apply(level,message,ex)
3032

33+
def taskExecutionContext: ExecutionContext = JSExecutionContext.queue
34+
3135
private var currentLogFun: (Level, String, Throwable|Null )=> Unit = { (level,message,ex) =>
3236
System.err.println(s"${level}:${message}");
3337
if !(ex eq null) then
@@ -46,5 +50,6 @@ object JSGopher extends GopherAPI:
4650
val timer = new Timer("gopher")
4751

4852

53+
4954
val Gopher = JSGopher
5055

js/src/main/scala/gopher/impl/BaseChannel.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ abstract class BaseChannel[F[_],A](override val gopherApi: JSGopher[F]) extends
4242

4343
def addWriter(writer: Writer[A]): Unit =
4444
if (closed) {
45-
writer.capture().foreach{ (a,f) =>
46-
writer.markUsed()
47-
submitTask( () =>
48-
f(Failure(new ChannelClosedException()))
49-
)
50-
}
45+
writer.capture() match
46+
case Expirable.Capture.Ready((a,f)) =>
47+
writer.markUsed()
48+
submitTask( () =>
49+
f(Failure(new ChannelClosedException()))
50+
)
51+
case _ =>
5152
} else {
5253
writers.enqueue(writer)
5354
process()
@@ -56,13 +57,14 @@ abstract class BaseChannel[F[_],A](override val gopherApi: JSGopher[F]) extends
5657
def addDoneReader(reader: Reader[Unit]): Unit =
5758
if (closed && isEmpty) {
5859
reader.capture() match
59-
case Some(f) =>
60+
case Expirable.Capture.Ready(f) =>
6061
reader.markUsed()
6162
submitTask( () => f(Success(())))
62-
case None =>
63+
case Expirable.Capture.WaitChangeComplete =>
6364
// mb is blocked and will be evaluated in
6465
doneReaders.enqueue(reader)
6566
process()
67+
case Expirable.Capture.Expired =>
6668
} else {
6769
doneReaders.enqueue(reader)
6870
process()
@@ -79,10 +81,10 @@ abstract class BaseChannel[F[_],A](override val gopherApi: JSGopher[F]) extends
7981
val v = queue.dequeue()
8082
if (!v.isExpired) then
8183
v.capture() match
82-
case Some(a) =>
84+
case Expirable.Capture.Ready(a) =>
8385
v.markUsed()
8486
action(a)
85-
case None =>
87+
case _ =>
8688
// do nothing.
8789
// exists case, when this is possible: wheb we close channel from
8890
// select-group callback, which is evaluated now.

js/src/main/scala/gopher/impl/BufferedChannel.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ class BufferedChannel[F[_]:CpsAsyncMonad, A](gopherApi: JSGopher[F], bufSize: In
7171
if (!writers.isEmpty && !isFull) then
7272
val writer = writers.dequeue()
7373
writer.capture() match
74-
case Some((a,f)) =>
74+
case Expirable.Capture.Ready((a,f)) =>
7575
internalEnqueue(a)
7676
writer.markUsed()
7777
submitTask( () => f(Success(())) )
7878
progress = true
79-
case None =>
80-
if (!writer.isExpired) then
81-
// impossible, we have no parallel execution
82-
throw DeadlockDetected()
79+
case Expirable.Capture.WaitChangeComplete =>
80+
// impossible, we have no parallel execution
81+
throw DeadlockDetected()
82+
case Expirable.Capture.Expired =>
8383
progress
8484

8585

js/src/main/scala/gopher/impl/PromiseChannel.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,30 @@ class PromiseChannel[F[_]:CpsAsyncMonad, A](gopherApi: JSGopher[F]) extends Base
2121
// we have only one writer.
2222
while (!writers.isEmpty && value.isEmpty) {
2323
val w = writers.dequeue()
24-
if (!w.isExpired) then
25-
w.capture() match
26-
case Some((a,f)) =>
24+
w.capture() match
25+
case Expirable.Capture.Ready((a,f)) =>
2726
w.markUsed()
2827
submitTask(()=>f(Success(())))
2928
value = Some(a)
3029
closed = true
3130
// we can't havw more than one unexpired
32-
case None =>
33-
if (!w.isExpired) then
31+
case Expirable.Capture.WaitChangeComplete =>
3432
// impossible in js,
33+
// (mb processNextTick()?)
3534
throw new DeadlockDetected()
35+
case Expirable.Capture.Expired =>
3636
}
3737
if (!readers.isEmpty && value.isDefined) {
3838
while(!readers.isEmpty && !readed) {
3939
val r = readers.dequeue()
40-
if (!r.isExpired) then
41-
r.capture() match
42-
case Some(f) =>
40+
r.capture() match
41+
case Expirable.Capture.Ready(f) =>
4342
r.markUsed()
4443
submitTask(()=>f(Success(value.get)))
4544
readed = true
46-
case None =>
47-
if (!r.isExpired)
45+
case Expirable.Capture.WaitChangeComplete =>
4846
throw new DeadlockDetected()
47+
case Expirable.Capture.Expired =>
4948
}
5049
//if (readed) {
5150
// processCloseDone()

js/src/main/scala/gopher/impl/UnbufferedChannel.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,25 @@ class UnbufferedChannel[F[_]:CpsAsyncMonad, A](gopherApi: JSGopher[F]) extends B
2525
findWriter() match
2626
case Some(writer) =>
2727
reader.capture() match
28-
case Some(readFun) =>
28+
case Expirable.Capture.Ready(readFun) =>
2929
writer.capture() match
30-
case Some((a,writeFun)) =>
30+
case Expirable.Capture.Ready((a,writeFun)) =>
3131
submitTask( () => readFun(Success(a)))
3232
submitTask( () => writeFun(Success(())) )
3333
progress = true
3434
done = true
3535
writer.markUsed()
3636
reader.markUsed()
37-
case None =>
37+
case _ =>
3838
// impossible, because in js we have-no interleavinf, bug anyway
3939
// let's fallback
4040
reader.markFree()
4141
readers.prepend(reader)
42-
case None =>
42+
case Expirable.Capture.WaitChangeComplete =>
43+
// impossible, but let's fallback
44+
// TODO: prepend reader and skip event
45+
writers.prepend(writer)
46+
case Expirable.Capture.Expired =>
4347
// impossible, but let's fallback
4448
writers.prepend(writer)
4549
case None =>
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package gopher.util
2+
3+
import java.util.logging.{Level => LogLevel}
4+
5+
object Debug {
6+
7+
type InMemoryLog = java.util.concurrent.ConcurrentLinkedQueue[(Long, String, Throwable)]
8+
9+
def inMemoryLogFun(inMemoryLog: InMemoryLog): (LogLevel, String, Throwable|Null) => Unit =
10+
(level,msg, ex) => inMemoryLog.add((Thread.currentThread().getId(), msg,ex))
11+
12+
def showInMemoryLog(inMemoryLog: InMemoryLog): Unit = {
13+
while(!inMemoryLog.isEmpty) {
14+
val r = inMemoryLog.poll()
15+
if (r != null) {
16+
println(r)
17+
}
18+
}
19+
}
20+
21+
22+
def showTraces(maxTracesToShow: Int): Unit = {
23+
val traces = Thread.getAllStackTraces();
24+
val it = traces.entrySet().iterator()
25+
while(it.hasNext()) {
26+
val e = it.next();
27+
println(e.getKey());
28+
val elements = e.getValue()
29+
var sti = 0
30+
var wasPark = false
31+
while(sti < elements.length && sti < maxTracesToShow && !wasPark) {
32+
val st = elements(sti)
33+
println(" "*10 + st)
34+
sti = sti + 1;
35+
wasPark = (st.getMethodName == "park")
36+
}
37+
}
38+
}
39+
40+
41+
}

jvm/src/main/scala/gopher/JVMGopher.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import java.util.concurrent.ForkJoinPool
99
import java.util.concurrent.atomic.AtomicReference
1010
import java.util.Timer
1111
import java.util.logging._
12+
import scala.concurrent.ExecutionContext
1213
import scala.concurrent.duration._
1314

1415

@@ -18,7 +19,7 @@ class JVMGopher[F[_]:CpsSchedulingMonad](cfg: JVMGopherConfig) extends Gopher[F]
1819

1920
def makeChannel[A](bufSize:Int = 0, autoClose: Boolean = false) =
2021
if autoClose then
21-
PromiseChannel[F,A](this, taskExecutor)
22+
PromiseChannel[F,A](this, cfg.taskExecutor)
2223
else
2324
if (bufSize == 0)
2425
GuardedSPSCUnbufferedChannel[F,A](this, cfg.controlExecutor,cfg.taskExecutor)
@@ -35,7 +36,7 @@ class JVMGopher[F[_]:CpsSchedulingMonad](cfg: JVMGopherConfig) extends Gopher[F]
3536
def log(level: Level, message: String, ex: Throwable| Null): Unit =
3637
currentLogFun.get().apply(level,message,ex)
3738

38-
def taskExecutor = cfg.taskExecutor
39+
lazy val taskExecutionContext = ExecutionContext.fromExecutor(cfg.taskExecutor)
3940

4041
def scheduledExecutor = JVMGopher.scheduledExecutor
4142

0 commit comments

Comments
 (0)