@@ -8,7 +8,8 @@ import gopher.util._
8
8
import scala .concurrent ._
9
9
import scala .concurrent .duration ._
10
10
import scala .annotation .unchecked ._
11
-
11
+ import java .util .{WeakHashMap => JWeakHashMap }
12
+ import java .util .function .{BiConsumer => JBiConsumer }
12
13
13
14
14
15
trait FoldSelectorBuilder [T ] extends SelectorBuilder [T ]
@@ -68,17 +69,53 @@ class FoldSelect[T](sf:SelectFactory) extends FoldSelectorBuilder[T]
68
69
class FoldSelectorEffectedInput [A ](s: ()=> Input [A ],val api : GopherAPI ) extends Input [A ]
69
70
{
70
71
72
+ thisInput =>
73
+
71
74
def current = s()
72
-
75
+ val activeReaders = new JWeakHashMap [Reader [_],JWeakHashMap [Input [A ],Boolean ]]
76
+
77
+ class Reader [B ](val svInput : Input [A ],
78
+ val function : ContRead [A ,B ]=> Option [ContRead .In [A ]=> Future [Continuated [B ]]],
79
+ val flowTermination : FlowTermination [B ], var disabled : Boolean = false )
80
+ extends (ContRead [A ,B ]=> Option [ContRead .In [A ] => Future [Continuated [B ]]]) {
81
+ def apply (cr: ContRead [A ,B ]): Option [ContRead .In [A ]=> Future [Continuated [B ]]] =
82
+ if (disabled || ! (svInput eq s())) None else function(cr.copy(channel= thisInput))
83
+
84
+
85
+ def cbread (ch: Input [A ]): Unit =
86
+ ch.cbread(this ,flowTermination)
87
+ }
88
+
73
89
def cbread [B ](f : ContRead [A ,B ] => Option [ContRead .In [A ] => Future [Continuated [B ]]],ft : FlowTermination [B ]): Unit = {
74
- val sv = s()
75
- sv.cbread[B ](cr=> if (sv eq s()) f(cr.copy(channel= this )) else None ,ft)
90
+ val r = new Reader [B ](current,f,ft,false )
91
+ val s = current
92
+ val jwhashMap = new JWeakHashMap [Input [A ],Boolean ]
93
+ jwhashMap.put(current,true )
94
+ activeReaders.put(r,jwhashMap)
95
+ current.cbread(r,ft)
96
+ }
97
+
98
+ def refresh (): Unit =
99
+ {
100
+ activeReaders forEach refreshBiConsumer
101
+ }
102
+
103
+ val refreshBiConsumer = new JBiConsumer [Reader [_],JWeakHashMap [Input [A ],Boolean ]] {
104
+ override def accept (r: Reader [_], sources: JWeakHashMap [Input [A ],Boolean ]): Unit =
105
+ {
106
+ val s = current
107
+ val alreadyRegistered = sources.containsKey(s)
108
+ if (! alreadyRegistered) {
109
+ sources.put(s,true )
110
+ r.cbread(s)
111
+ }
112
+ }
76
113
}
77
114
78
115
79
116
}
80
117
81
- class FoldSelectorBuilderImpl (val c : Context )
118
+ class FoldSelectorBuilderImpl (override val c : Context ) extends SelectorBuilderImpl (c )
82
119
{
83
120
import c .universe ._
84
121
@@ -163,9 +200,6 @@ class FoldSelectorBuilderImpl(val c:Context)
163
200
c.Expr [Future [S ]](tree)
164
201
}
165
202
166
- def transformSelectMatch (bn: TermName ,cases : List [CaseDef ]): List [Tree ]=
167
- SelectorBuilder .transformSelectMatch(c)(bn,cases)
168
-
169
203
def fold [S : c.WeakTypeTag ](s: c.Expr [S ])(op: c.Expr [(S ,FoldSelect [S ])=> S ]): c.Expr [S ] =
170
204
c.Expr [S ](q " scala.async.Async.await( ${afold(s)(op).tree}) " )
171
205
@@ -340,7 +374,7 @@ class FoldSelectorBuilderImpl(val c:Context)
340
374
guard,
341
375
Match (Ident (choice1),cases1)) =>
342
376
if (sel == choice1) {
343
- val selectSymbols = retrieveSelectChannels(cases )
377
+ val selectSymbols = retrieveSelectChannels(cases1 )
344
378
FoldParse (
345
379
stateVal = x,
346
380
stateUsedInSelect = selectSymbols.contains(x.symbol),
@@ -386,8 +420,50 @@ class FoldSelectorBuilderImpl(val c:Context)
386
420
387
421
private def retrieveSelectChannels (cases: List [CaseDef ]): Set [c.Symbol ] =
388
422
{
389
- // TODO: implement
390
- Set ()
423
+ val s0 = Set [c.Symbol ]()
424
+ cases.foldLeft(s0){ (s,e) =>
425
+ def addSymbol (in: Tree ) = s+ e.symbol
426
+ acceptSelectCaseDefPattern(e, addSymbol, addSymbol, addSymbol, _ => s)
427
+ }
428
+ }
429
+
430
+ // TODO: generalize and merge with parsing in SelectorBuilderImpl
431
+ def acceptSelectCaseDefPattern [A ](caseDef: CaseDef ,onRead : Tree => A , onWrite : Tree => A ,
432
+ onSelectTimeout : Tree => A , onIdle : Tree => A ): A =
433
+ {
434
+ caseDef.pat match {
435
+ case Bind (name,t) =>
436
+ val termName = name.toTermName
437
+ t match {
438
+ case Typed (_,tp: TypeTree ) =>
439
+ val tpoa = if (tp.original.isEmpty) tp else tp.original
440
+ val tpo = MacroUtil .skipAnnotation(c)(tpoa)
441
+ tpo match {
442
+ case Select (ch,TypeName (" read" )) => onRead(ch)
443
+ case Select (ch,TypeName (" write" )) => onWrite(ch)
444
+ case Select (select,TypeName (" timeout" )) => onSelectTimeout(select)
445
+ case _ =>
446
+ if (caseDef.guard.isEmpty) {
447
+ c.abort(tp.pos, " row caseDef:" + showRaw(caseDef) );
448
+ c.abort(tp.pos, " match pattern in select without guard must be in form x:channel.write or x:channel.read" );
449
+ } else {
450
+ parseGuardInSelectorCaseDef(termName, caseDef.guard) match {
451
+ case q " scala.async.Async.await[ ${t}]( ${readed}.aread): ${t1}" =>
452
+ onRead(readed)
453
+ case q " scala.async.Async.await[ ${t}]( ${ch}.awrite( $expression)): ${t1}" =>
454
+ onWrite(ch)
455
+ case x@ _ =>
456
+ c.abort(tp.pos, " can't parse match guard: " + x);
457
+ }
458
+ }
459
+ }
460
+ case _ =>
461
+ c.abort(caseDef.pat.pos," x:channel.read or x:channel.write form is required" )
462
+ }
463
+ case Ident (TermName (" _" )) => onIdle(caseDef.pat)
464
+ case _ =>
465
+ c.abort(caseDef.pat.pos," bind in pattern is expected" )
466
+ }
391
467
}
392
468
393
469
}
0 commit comments