Skip to content

Commit 29c6e40

Browse files
committed
fully implemented fold over select
1 parent deced3e commit 29c6e40

File tree

9 files changed

+370
-142
lines changed

9 files changed

+370
-142
lines changed

src/main/scala/gopher/channels/Continuated.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ case class ContWrite[A,B](function: ContWrite[A,B] => Option[(A,Future[Continuat
116116

117117
object ContWrite
118118
{
119-
119+
type Aux[A,B] = ContWrite[A,B]
120+
type AuxF[A,B] = ContWrite[A,B]=>Option[(A,Future[Continuated[B]])]
120121
}
121122

122123
/**

src/main/scala/gopher/channels/FoldSelectorBuilder.scala

Lines changed: 150 additions & 65 deletions
Large diffs are not rendered by default.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package gopher.channels
2+
3+
4+
import scala.language.experimental.macros
5+
import scala.reflect.macros.blackbox.Context
6+
import scala.reflect.api._
7+
import gopher._
8+
import gopher.util._
9+
10+
import scala.concurrent._
11+
import scala.concurrent.duration._
12+
import scala.annotation.unchecked._
13+
import java.util.function.{BiConsumer => JBiConsumer}
14+
15+
import scala.collection.mutable
16+
import scala.ref.WeakReference
17+
18+
19+
/**
20+
* effected input inside fold. We know, that exists only one call of
21+
* FoldSelectorInput, generated in our fold statement
22+
*/
23+
trait FoldSelectorEffectedInput[A,B] extends Input[A]
24+
{
25+
def current: Input[A]
26+
val foldSelector: FoldSelect[B]
27+
val index: Int
28+
29+
30+
def cbread[C](f: ContRead[A,C] => Option[ContRead.In[A] => Future[Continuated[C]]],ft: FlowTermination[C]): Unit = {
31+
// ignore f, because cbread called only from match in select fold, since this input is not visible to programmer
32+
cbreadIfNotActive()
33+
}
34+
35+
36+
def refreshReader():Unit = {
37+
foldSelector.inputIndices.put(current,index)
38+
cbreadIfNotActive()
39+
}
40+
41+
/**
42+
* Call cbread on current channel with dispath function if current channels is not active
43+
* (i.e. if we not in wait of other cbread call)
44+
*/
45+
private def cbreadIfNotActive(): Unit =
46+
{
47+
val s = current
48+
val notSet = foldSelector.activeReaders.get(s).isEmpty
49+
if (notSet) {
50+
foldSelector.activeReaders.put(s, true)
51+
val ft = foldSelector.selector
52+
s.cbread((cont: ContRead[A, B]) => {
53+
foldSelector.activeReaders.remove(s)
54+
foldSelector.dispathReader[A, B](s, ft)
55+
}, ft)
56+
}
57+
}
58+
59+
}
60+
61+
object FoldSelectorEffectedInput{
62+
def apply[A,B](foldSelect: FoldSelect[B], index:Int, chFun: ()=>Input[A]): FoldSelectorEffectedInput[A,B]
63+
= new CFoldSelectorEffectedInput[A,B](foldSelect,index,chFun)
64+
}
65+
66+
class CFoldSelectorEffectedInput[A,B](val foldSelector:FoldSelect[B], val index:Int, chFun: ()=>Input[A]) extends FoldSelectorEffectedInput[A,B]
67+
{
68+
val api = chFun().api
69+
def current() = chFun()
70+
}
71+
72+
73+
trait FoldSelectorEffectedOutput[A,B] extends Output[A]
74+
{
75+
76+
def current: Output[A]
77+
def foldSelector: FoldSelect[B]
78+
def index: Int
79+
80+
81+
override def cbwrite[C](f: (ContWrite[A, C]) => Option[(A, Future[Continuated[C]])], ft: FlowTermination[C]): Unit =
82+
{
83+
cbwriterIfNotActive()
84+
}
85+
86+
87+
def refreshWriter():Unit =
88+
{
89+
foldSelector.outputIndices.put(current,index)
90+
cbwriterIfNotActive()
91+
}
92+
93+
private def cbwriterIfNotActive(): Unit =
94+
{
95+
val s = current
96+
val notSet = foldSelector.activeWriters.get(s).isEmpty
97+
if (notSet) {
98+
foldSelector.activeWriters.put(s,true)
99+
val ft = foldSelector.selector
100+
s.cbwrite((cw:ContWrite[A,B]) => {
101+
foldSelector.activeWriters.remove(s)
102+
foldSelector.dispathWriter(s, ft)
103+
}, ft)
104+
}
105+
106+
}
107+
108+
}
109+
110+
object FoldSelectorEffectedOutput
111+
{
112+
def apply[A,B](foldSelect: FoldSelect[B], index: Int, chFun: ()=>Output[A]):FoldSelectorEffectedOutput[A,B]=
113+
new CFoldSelectorEffectedOutput(foldSelect, index, chFun)
114+
}
115+
116+
class CFoldSelectorEffectedOutput[A,B](val foldSelector: FoldSelect[B], val index: Int, chFun:()=>Output[A]) extends FoldSelectorEffectedOutput[A,B]
117+
{
118+
val api = chFun().api
119+
override def current = chFun()
120+
}
121+
122+
trait FoldSelectorEffectedChannel[A,B] extends FoldSelectorEffectedInput[A,B] with FoldSelectorEffectedOutput[A,B]
123+
{
124+
125+
override def current: Channel[A]
126+
127+
val foldSelector: FoldSelect[B]
128+
val index: Int
129+
130+
131+
}
132+
133+
object FoldSelectorEffectedChannel
134+
{
135+
def apply[A,B](foldSelect:FoldSelect[B], index:Int, chFun:()=>Channel[A]):FoldSelectorEffectedChannel[A,B]=
136+
new CFoldSelectorEffectedChannel(foldSelect,index,chFun)
137+
}
138+
139+
class CFoldSelectorEffectedChannel[A,B](val foldSelector: FoldSelect[B], val index:Int,chFun:()=>Channel[A]) extends FoldSelectorEffectedChannel[A,B]
140+
{
141+
override val api = chFun().api
142+
override def current(): Channel[A] = chFun()
143+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package gopher.channels
2+
3+
import gopher._
4+
5+
trait GopherAPIProvider
6+
{
7+
def api: GopherAPI
8+
}

src/main/scala/gopher/channels/Input.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,12 @@ import scala.concurrent.duration._
55
import scala.language.experimental.macros
66
import scala.language.reflectiveCalls
77
import scala.reflect.macros.blackbox.Context
8-
import scala.reflect.api._
9-
import scala.util._
10-
import java.util.concurrent.ConcurrentLinkedQueue
11-
128
import gopher._
139
import gopher.util._
14-
15-
1610
import java.util.concurrent.atomic._
1711

12+
import gopher.channels.ContRead.In
13+
1814
/**
1915
* Entity, from which we can read objects of type A.
2016
*
@@ -112,7 +108,10 @@ trait Input[A] extends GopherAPIProvider
112108
thisInput.cbread[B]({ cont =>
113109
f(cont.copy(channel=this)) map { // todo - eliminate copy
114110
f1 => { case v@ContRead.Value(a) =>
115-
if (p(a)) f1(v) else Future successful cont
111+
if (p(a)) f1(v) else {
112+
f1(ContRead.Skip)
113+
Future successful cont
114+
}
116115
case v@_ => f1(v)
117116
} } }, ft)
118117

@@ -132,7 +131,8 @@ trait Input[A] extends GopherAPIProvider
132131
def mf(cont:ContRead[A,C]):Option[ContRead.In[A]=>Future[Continuated[C]]] =
133132
{ val contA = ContRead(f,this,cont.flowTermination)
134133
f(contA) map (f1 => { case v@ContRead.Value(a) => f1(ContRead.Value(g(a)))
135-
case ContRead.Skip => Future successful cont
134+
case ContRead.Skip => f1(ContRead.Skip)
135+
Future successful cont
136136
case ContRead.ChannelClosed => f1(ContRead.ChannelClosed)
137137
case ContRead.Failure(ex) => f1(ContRead.Failure(ex))
138138
} )
@@ -157,7 +157,8 @@ trait Input[A] extends GopherAPIProvider
157157
{ val contA = ContRead(f,this,cont.flowTermination)
158158
f(contA) map { f1 => {
159159
case v@ContRead.Value(a) => Future successful ContRead(f,op(g(a),this),cont.flowTermination)
160-
case ContRead.Skip => Future successful cont
160+
case ContRead.Skip => f1(ContRead.Skip)
161+
Future successful cont
161162
case ContRead.ChannelClosed => f1(ContRead.ChannelClosed)
162163
case ContRead.Failure(ex) => f1(ContRead.Failure(ex))
163164
}}}
@@ -426,6 +427,20 @@ object Input
426427
def api = gopherApi
427428
}
428429

430+
431+
def zero[A](implicit gopherAPI: GopherAPI):Input[A] = new Input[A] {
432+
433+
/**
434+
* will eat f without a trace (i.e. f will be never called)
435+
*/
436+
override def cbread[B](f: (ContRead[A, B]) => Option[(In[A]) => Future[Continuated[B]]], ft: FlowTermination[B]): Unit = {}
437+
438+
/**
439+
* instance of gopher API
440+
*/
441+
override def api: GopherAPI = gopherAPI
442+
}
443+
429444
}
430445

431446

src/main/scala/gopher/channels/Output.scala

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,30 +80,6 @@ trait Output[A] extends GopherAPIProvider
8080
}
8181
}
8282

83-
def awriteAllDebug[C <: Iterable[A]](c:C):Future[Unit] =
84-
{
85-
if (c.isEmpty) {
86-
System.err.println("awriteAllDebug: empty")
87-
Future successful (())
88-
} else {
89-
val ft = PromiseFlowTermination[Unit]
90-
val it = c.iterator
91-
def f(cont:ContWrite[A,Unit]):Option[(A,Future[Continuated[Unit]])]=
92-
{
93-
val n = it.next()
94-
if (it.hasNext) {
95-
System.err.println(s"awriteAllDebug: n=${n}, hasNext")
96-
Some((n,Future successful ContWrite(f,this,ft)))
97-
} else {
98-
System.err.println(s"awriteAllDebug: n=${n}, last")
99-
Some((n, Future successful Done((), ft) ))
100-
}
101-
}
102-
cbwrite(f,ft)
103-
ft.future
104-
}
105-
}
106-
10783
def writeAll[C <: Iterable[A]](it:C):Unit = macro Output.writeAllImpl[A,C]
10884

10985

src/main/scala/gopher/channels/Selector.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
5454
block match {
5555
case cr@ContRead(f,ch, ft) =>
5656
def f1(cont:ContRead[cr.El,cr.R]): Option[ContRead.In[cr.El]=>Future[Continuated[cr.R]]] = {
57-
System.err.println(s"Selector.makeLocked.f1 - before tryLocked, fClass=${f.getClass} isLocked=${isLocked}")
5857
tryLocked(f(ContRead(f,ch,ft)),cont,"read") map { q =>
5958
(in => unlockAfter(
6059
try {
@@ -91,7 +90,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
9190
@inline
9291
private[this] def tryLocked[X](body: => Option[X], cont: FlowContinuated[A], dstr: String):Option[X] =
9392
if (tryLock()) {
94-
System.err.println(s"lock set for selector ${this}")
9593
try {
9694
body match {
9795
case None => mustUnlock(dstr,cont.flowTermination)
@@ -108,7 +106,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
108106
None
109107
}
110108
} else {
111-
System.err.println(s"TryLockedFailed, send $cont to waiters")
112109
toWaiters(cont)
113110
None
114111
}
@@ -183,7 +180,6 @@ class Selector[A](api: GopherAPI) extends PromiseFlowTermination[A]
183180

184181
private[this] def unlock(debugFrom: String): Boolean =
185182
{
186-
System.err.println(s"unlock selector ${this}")
187183
val retval = lockFlag.compareAndSet(true,false)
188184
//if (retval) {
189185
sendWaits()

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,23 @@ class SelectorBuilderImpl(val c: Context) extends ASTUtilImpl
160160
else
161161
super.transform(tree)
162162
case _ =>
163-
if (tree.symbol != null) {
163+
if (tree.symbol != null && tree.symbol != NoSymbol) {
164164
if (ownerWillBeErased(tree.symbol)) {
165165
var prevMustBeErased = insideMustBeErased
166166
insideMustBeErased = true
167-
val (done, rtree) = doClear(tree)
168-
insideMustBeErased = prevMustBeErased
169-
if (done) {
170-
rtree
171-
} else {
172-
super.transform(tree)
167+
try {
168+
val (done, rtree) = doClear(tree)
169+
insideMustBeErased = prevMustBeErased
170+
if (done) {
171+
rtree
172+
} else {
173+
super.transform(tree)
174+
}
175+
}catch{
176+
case ex: Exception =>
177+
System.err.println(s"ex, tree.symbol=${tree.symbol}")
178+
ex.printStackTrace()
179+
throw ex
173180
}
174181
} else super.transform(tree)
175182
} else {
@@ -195,6 +202,7 @@ class SelectorBuilderImpl(val c: Context) extends ASTUtilImpl
195202
(true, atPos(tree.pos)(Select(Ident(changeName(name)),proj)) )
196203
case _ =>
197204
// (false, tree)
205+
throw new IllegalStateException("unexpected shapr")
198206
c.abort(tree.pos,"""Unexpected shape for tree with caseDef owner, which erased by macro,
199207
please, fire bug-report to scala-gopher, raw="""+showRaw(tree))
200208
}

0 commit comments

Comments
 (0)