Skip to content

Commit 632445b

Browse files
committed
Fiexed order of read-done and read in DuplicatedChannel, to make done always be called first.
1 parent c5432ca commit 632445b

File tree

8 files changed

+118
-16
lines changed

8 files changed

+118
-16
lines changed

jvm/src/main/scala/gopher/JVMGopher.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,6 @@ object JVMGopher extends GopherAPI:
7272
}
7373

7474

75+
final val MAX_SPINS = 400
76+
7577
val Gopher = JVMGopher

jvm/src/main/scala/gopher/impl/GuardedSPSCBaseChannel.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import scala.util.Try
1212
import scala.util.Success
1313
import scala.util.Failure
1414

15+
import java.util.logging.{Level => LogLevel}
16+
1517

1618
/**
1719
* Guarded channel work in the next way:
@@ -33,7 +35,6 @@ abstract class GuardedSPSCBaseChannel[F[_]:CpsAsyncMonad,A](override val gopherA
3335
protected val stepGuard = new AtomicInteger(STEP_FREE)
3436

3537
protected val stepRunnable: Runnable = (()=>entryStep())
36-
3738

3839
def addReader(reader: Reader[A]): Unit =
3940
if (reader.canExpire) then
@@ -66,6 +67,7 @@ abstract class GuardedSPSCBaseChannel[F[_]:CpsAsyncMonad,A](override val gopherA
6667

6768
protected def entryStep(): Unit =
6869
var done = false
70+
var nSpins = 0
6971
while(!done) {
7072
if (stepGuard.compareAndSet(STEP_FREE,STEP_BUSY)) {
7173
done = true
@@ -77,6 +79,7 @@ abstract class GuardedSPSCBaseChannel[F[_]:CpsAsyncMonad,A](override val gopherA
7779
done = true
7880
} else {
7981
// other set updates, we should spinLock
82+
nSpins = nSpins + 1
8083
Thread.onSpinWait()
8184
}
8285
}
@@ -99,7 +102,7 @@ abstract class GuardedSPSCBaseChannel[F[_]:CpsAsyncMonad,A](override val gopherA
99102
while(!readers.isEmpty) {
100103
val r = readers.poll()
101104
if (!(r eq null) && !r.isExpired) then
102-
r.capture() match
105+
r.capture() match
103106
case Some(f) =>
104107
progress = true
105108
taskExecutor.execute(() => f(Failure(new ChannelClosedException())) )
@@ -173,9 +176,12 @@ abstract class GuardedSPSCBaseChannel[F[_]:CpsAsyncMonad,A](override val gopherA
173176
if (!v.isExpired)
174177
if (queue.isEmpty)
175178
Thread.onSpinWait()
179+
// if (nSpins > JVMGopher.MAX_SPINS)
180+
// Thread.`yield`()
176181
queue.addLast(v)
177182

178183

184+
179185
object GuardedSPSCBaseChannel:
180186

181187
final val STEP_FREE = 0

jvm/src/main/scala/gopher/impl/GuardedSPSCBufferedChannel.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import scala.util.Try
99
import scala.util.Success
1010
import scala.util.Failure
1111

12+
import java.util.logging.{Level => LogLevel}
1213

1314
class GuardedSPSCBufferedChannel[F[_]:CpsAsyncMonad,A](gopherApi: JVMGopher[F], bufSize: Int,
1415
controlExecutor: ExecutorService,
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package gopher.channels
2+
3+
import cps._
4+
import cps.monads.FutureAsyncMonad
5+
import gopher._
6+
import munit._
7+
8+
import scala.concurrent._
9+
import scala.concurrent.duration._
10+
import scala.language.postfixOps
11+
import scala.util._
12+
13+
class DuppedChannelsMultipleSuite extends FunSuite {
14+
15+
import scala.concurrent.ExecutionContext.Implicits.global
16+
given gopherApi: Gopher[Future] = SharedGopherAPI.apply[Future]()
17+
18+
val inMemoryLog = new java.util.concurrent.ConcurrentLinkedQueue[(Int, Long, String, Throwable)]()
19+
20+
21+
test("on closing of main stream dupped outputs also closed N times.") {
22+
val N = 1000
23+
var logIndex = 0
24+
gopherApi.setLogFun( (level,msg, ex) => inMemoryLog.add((logIndex, Thread.currentThread().getId(), msg,ex)) )
25+
for(i <- 1 to N) {
26+
logIndex = i
27+
val ch = makeChannel[Int](1)
28+
val (in1, in2) = ch.dup()
29+
val f1 = async{
30+
ch.write(1)
31+
ch.close()
32+
}
33+
val f = for{ fx <- f1
34+
x <- in1.aread()
35+
r <- in1.aread().transformWith {
36+
case Success(u) =>
37+
Future failed new IllegalStateException("Mist be closed")
38+
case Failure(u) =>
39+
Future successful (assert(x == 1))
40+
}
41+
} yield {
42+
r
43+
}
44+
try {
45+
val r = Await.result(f, 30 seconds);
46+
}catch{
47+
case ex: TimeoutException =>
48+
showTraces(20)
49+
println("---")
50+
showInMemoryLog()
51+
throw ex
52+
}
53+
}
54+
55+
}
56+
57+
def showTraces(maxTracesToShow: Int): Unit = {
58+
val traces = Thread.getAllStackTraces();
59+
val it = traces.entrySet().iterator()
60+
while(it.hasNext()) {
61+
val e = it.next();
62+
println(e.getKey());
63+
val elements = e.getValue()
64+
var sti = 0
65+
var wasPark = false
66+
while(sti < elements.length && sti < maxTracesToShow && !wasPark) {
67+
val st = elements(sti)
68+
println(" "*10 + st)
69+
sti = sti + 1;
70+
wasPark = (st.getMethodName == "park")
71+
}
72+
}
73+
}
74+
75+
def showInMemoryLog(): Unit = {
76+
while(!inMemoryLog.isEmpty) {
77+
val r = inMemoryLog.poll()
78+
if (r != null) {
79+
println(r)
80+
}
81+
}
82+
}
83+
84+
85+
}
86+
87+

shared/src/main/scala/gopher/ReadChannel.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import scala.util.Success
77
import scala.util.Failure
88
import scala.concurrent.duration.Duration
99

10+
import java.util.logging.{Level => LogLevel}
11+
12+
1013
trait ReadChannel[F[_], A]:
1114

1215
thisReadChannel =>

shared/src/main/scala/gopher/SelectGroup.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import scala.util._
1010
import scala.concurrent.duration._
1111
import scala.language.postfixOps
1212

13+
import java.util.logging.{Level => LogLevel}
14+
1315

1416
/**
1517
* Select group is a virtual 'lock' object, where only

shared/src/main/scala/gopher/SelectLoop.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import scala.quoted._
55
import scala.compiletime._
66
import scala.concurrent.duration._
77

8+
import java.util.logging.{Level => LogLevel}
9+
10+
811
class SelectLoop[F[_]](api: Gopher[F]) extends SelectGroupBuilder[F,Boolean, Unit](api):
912

1013

@@ -25,7 +28,3 @@ class SelectLoop[F[_]](api: Gopher[F]) extends SelectGroupBuilder[F,Boolean, Uni
2528
}
2629

2730

28-
29-
30-
31-

shared/src/main/scala/gopher/impl/DuppedInput.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import scala.util._
88
import java.util.concurrent.ConcurrentLinkedQueue
99
import java.util.concurrent.atomic.AtomicInteger
1010

11+
import java.util.logging.{Level => LogLevel}
1112

1213

1314

@@ -21,15 +22,16 @@ class DuppedInput[F[_],A](origin:ReadChannel[F,A], bufSize: Int=1)(using api:Gop
2122

2223
given CpsSchedulingMonad[F] = api.asyncMonad
2324

24-
val runner = SelectLoop[F](api).onReadAsync(origin){a => async{
25-
val f1 = sink1.write(a)
26-
val f2 = sink2.write(a)
27-
true
28-
}}.onRead(origin.done){ _ =>
29-
sink1.close()
30-
sink2.close()
31-
false
32-
}.runAsync()
33-
api.asyncMonad.spawn(runner)
25+
val runner = SelectLoop[F](api).
26+
onRead(origin.done){ _ =>
27+
sink1.close()
28+
sink2.close()
29+
false
30+
}.onReadAsync(origin){a => async{
31+
val f1 = sink1.write(a)
32+
val f2 = sink2.write(a)
33+
true
34+
}}.runAsync()
35+
api.asyncMonad.spawn(runner)
3436

3537
}

0 commit comments

Comments
 (0)