Skip to content

Commit 079d72a

Browse files
committed
implemented map over selector
1 parent 4ebde06 commit 079d72a

File tree

9 files changed

+111
-35
lines changed

9 files changed

+111
-35
lines changed

notes/0.99.7.markdown

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@
88
- akka 2.4.2, scala-2.11.8
99
- channels by defaults now unbuffered. (as in Go)
1010
- implemented fold over input
11+
- implemented map/amap over selector
1112

1213

src/main/scala/gopher/Transputer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ trait Transputer
205205

206206

207207
/**
208-
* called when transducer is choose resume durign recovery.
208+
* called when transducer is choose to resume durign recovery.
209209
*/
210210
protected def onResume() { }
211211

@@ -391,7 +391,7 @@ trait SelectTransputer extends ForeverSelectorBuilder with Transputer
391391
private[gopher] override def beforeResume() : Unit =
392392
{
393393
super.beforeResume()
394-
selector = new Selector[Unit](api)
394+
//selector.clear()
395395
selectorInit()
396396
}
397397

src/main/scala/gopher/channels/ForeverSelectorBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ trait ForeverSelectorBuilder extends SelectorBuilder[Unit]
9696
**/
9797
def map[B](f:Any=>B):Input[B] = macro SelectorBuilder.mapImpl[B]
9898

99-
def input[B](f:PartialFunction[Any,B]):Input[B] = ???
99+
def input[B](f:PartialFunction[Any,B]):Input[B] =
100+
macro SelectorBuilder.inputImpl[B]
100101

101102
}
102103

src/main/scala/gopher/channels/InputSelectorBuilder.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ class InputSelectorBuilder[T](override val api: GopherAPI) extends SelectorBuild
6767
],
6868
ft: FlowTermination[B]): Unit = proxy.cbread(f,ft)
6969

70+
def started: InputSelectorBuilder[T] = { go; this }
71+
72+
//
73+
override val selector = new Selector[T](api) {
74+
override def doExit(a: T): T =
75+
{
76+
proxy.awrite(a) onComplete {
77+
_ => proxy.close()
78+
}
79+
super.doExit(a)
80+
}
81+
}
7082

7183
}
7284

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ trait SelectorBuilder[A]
6060

6161
implicit def ec: ExecutionContext = api.executionContext
6262

63-
private[gopher] var selector=new Selector[A](api)
63+
private[gopher] val selector=new Selector[A](api)
6464

6565
// used for reading from future
6666
@inline
@@ -423,38 +423,38 @@ object SelectorBuilder
423423
q"${builderName}.idle(${caseDef.body})"
424424
}
425425

426-
def mapImpl[T](c:Context)(f:c.Expr[Any=>T]):c.Expr[Input[T]] =
426+
def mapImpl[T:c.WeakTypeTag](c:Context)(f:c.Expr[Any=>T]):c.Expr[Input[T]] =
427427
{
428428
import c.universe._
429429
val builder = f.tree match {
430430
case Function(forvals,Match(choice,cases)) =>
431-
foreachBuildMatch(c)(cases)
431+
mapBuildMatch[T](c)(cases)
432432
case Function(a,b) =>
433433
c.abort(f.tree.pos, "match expected in gopher select map, have: ${MacroUtil.shortString(b)} ");
434434
case _ =>
435435
c.abort(f.tree.pos, "match expected in gopher select map, have: ${MacroUtil.shortString(f.tree)}");
436436

437437
}
438-
c.Expr[Input[T]](c.untypecheck(builder))
438+
c.Expr[Input[T]](c.untypecheck(q"${builder}.started"))
439439
}
440440

441-
def mapBuildMatch(c:Context)(cases:List[c.universe.CaseDef]):c.Tree =
441+
def mapBuildMatch[T:c.WeakTypeTag](c:Context)(cases:List[c.universe.CaseDef]):c.Tree =
442442
{
443443
import c.universe._
444444
val bn = TermName(c.freshName)
445445
val calls = transformSelectMatch(c)(bn,cases)
446-
q"""..${q"val ${bn} = ${c.prefix}.inputBuilder()" :: calls}"""
446+
q"""..${q"val ${bn} = ${c.prefix}.inputBuilder[${weakTypeOf[T]}]()" :: calls}"""
447447
}
448448

449-
def inputImpl[T](c:Context)(f:c.Expr[PartialFunction[Any,T]]):c.Expr[Input[T]] =
449+
def inputImpl[T:c.WeakTypeTag](c:Context)(f:c.Expr[PartialFunction[Any,T]]):c.Expr[Input[T]] =
450450
{
451451
import c.universe._
452452
val builder = f.tree match {
453453
case q"{case ..$cases}" =>
454-
mapBuildMatch(c)(cases)
454+
mapBuildMatch[T](c)(cases)
455455
case _ => c.abort(f.tree.pos,"expected partial function with syntax case ... =>, have ${MacroUtil.shortString(f.tree)}");
456456
}
457-
c.Expr[Input[T]](c.untypecheck(builder))
457+
c.Expr[Input[T]](c.untypecheck(q"${builder}.started"))
458458
}
459459

460460
}

src/main/scala/gopher/channels/SelectorFactory.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class SelectFactory(val api: GopherAPI)
3939
*/
4040
def once[T]: OnceSelectorBuilder[T] = new OnceSelectorBuilder[T] with SelectFactoryApi {}
4141

42+
def inputBuilder[T]() = new InputSelectorBuilder[T](api)
43+
4244
/**
4345
* generic selector builder
4446
*/
@@ -48,6 +50,14 @@ class SelectFactory(val api: GopherAPI)
4850

4951
def fold[S](s:S)(op:(S,Any)=>S):S = macro FoldSelectorBuilderImpl.fold[S]
5052

53+
def map[B](f:Any=>B):Input[B] = macro SelectorBuilder.mapImpl[B]
54+
55+
def input[B](f:PartialFunction[Any,B]):Input[B] =
56+
macro SelectorBuilder.inputImpl[B]
57+
58+
def amap[B](f:PartialFunction[Any,B]):Input[B] =
59+
macro SelectorBuilder.inputImpl[B]
60+
5161

5262
}
5363

src/test/scala/gopher/channels/CloseSuite.scala renamed to src/test/scala/gopher/channels/ChannelCloseSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import gopher.tags._
1111
import scala.async.Async._
1212
import scala.concurrent.ExecutionContext.Implicits.global
1313

14-
class CloseSuite extends FunSuite
14+
class ChannelCloseSuite extends FunSuite
1515
{
1616

1717

src/test/scala/gopher/channels/FlowTerminationSuite.scala

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import scala.concurrent._
66
import scala.concurrent.duration._
77
import gopher._
88

9+
import scala.concurrent.ExecutionContext.Implicits.global
910

1011
class FlowTerminationSuite extends FunSuite
1112
{
@@ -22,39 +23,36 @@ class FlowTerminationSuite extends FunSuite
2223
}
2324

2425

25-
/*
26-
2726
test("select with queue type") {
27+
import gopherApi._
2828

29-
val channel = make[Int](100)
29+
val channel = makeChannel[Int](100)
3030

31-
val producer = Future {
32-
for( i <- 1 to 1000) {
33-
channel <~ i
34-
}
35-
}
31+
val producer = channel.awriteAll(1 to 1000)
3632

3733
var sum = 0;
3834
val consumer = Future {
39-
val sc = new SelectorContext()
40-
sc.addInputAction(channel,
41-
(i: channel.OutputElement) => { sum = sum + i;
42-
if (i == 1000) {
43-
sc.shutdown()
44-
}
45-
Promise successful true future
46-
}
47-
)
48-
Await.ready(sc.go, 1000.second)
35+
val sc = new Selector[Unit](gopherApi)
36+
def f(self: ContRead[Int,Unit]):Option[ContRead.In[Int]=>Future[Continuated[Unit]]] =
37+
{
38+
Some {
39+
case ContRead.Value(a) => sum = sum + a
40+
if (a == 1000) sc.doExit(())
41+
Future successful self
42+
case ContRead.Failure(e) => Future failed e
43+
case _ =>
44+
Future successful self
45+
}
46+
}
47+
sc.addReader(channel,f)
48+
Await.ready(sc.run, 10.second)
4949
}
5050

51-
52-
Await.ready(consumer, 1000.second)
51+
Await.ready(consumer, 10.second)
5352

5453
}
5554

56-
*/
5755

58-
def gopherApi = CommonTestObjects.gopherApi
56+
val gopherApi = CommonTestObjects.gopherApi
5957

6058
}

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,31 @@ class MacroSelectSuite extends FunSuite
260260
assert(n2i>3)
261261
}
262262

263+
test("amap over selector") {
264+
import gopherApi._
265+
val ch1 = makeChannel[Int](10)
266+
val ch2 = makeChannel[Int](10)
267+
val quit = Promise[Boolean]()
268+
val out = select.amap {
269+
case x:ch1.read => x*2
270+
case x:ch2.read =>
271+
//System.err.println(s"received:${x}")
272+
x*3
273+
case q:Boolean if (q==quit.future.read) =>
274+
//System.err.println("received quit")
275+
CurrentFlowTermination.exit(1)
276+
}
277+
ch1.awriteAll(1 to 10)
278+
ch2.awriteAll(100 to 110)
279+
val f = out.afold(0){
280+
case (s,x) => //System.err.println(s"in afold ${x}")
281+
s+x }
282+
Thread.sleep(1000)
283+
quit success true
284+
val x = Await.result(f, 10 seconds)
285+
assert(x > 3000)
286+
}
287+
263288
test("generic channel make") {
264289
val ch1 = gopherApi.make[Channel[Int]]()
265290
val ch2 = gopherApi.make[Channel[Int]](1)
@@ -271,6 +296,35 @@ class MacroSelectSuite extends FunSuite
271296
assert(x==1)
272297
}
273298

299+
test("input afold") {
300+
import gopherApi._
301+
val ch1 = makeChannel[Int]()
302+
ch1.awriteAll(1 to 10) map { _ => ch1.close() }
303+
val f = ch1.afold(0){ case (s,x) => s+x }
304+
val x = Await.result(f, 10 seconds)
305+
assert(x==55)
306+
}
307+
308+
test("map over selector") {
309+
import gopherApi._
310+
val ch1 = gopherApi.make[Channel[Int]]()
311+
val ch2 = gopherApi.make[Channel[Int]](1)
312+
val f1 = ch1.awrite(1)
313+
val f2 = ch2.awrite(2)
314+
val chs = for(s <- select) yield {
315+
s match {
316+
case x:ch1.read => x*3
317+
case x:ch2.read => x*5
318+
}
319+
}
320+
val fs1 = chs.aread
321+
val fs2 = chs.aread
322+
val s1 = Await.result(fs1, 1 second)
323+
val s2 = Await.result(fs2, 1 second)
324+
assert(s1==3 || s1==10)
325+
}
326+
327+
274328
lazy val gopherApi = CommonTestObjects.gopherApi
275329

276330
}

0 commit comments

Comments
 (0)