Skip to content

Commit 5c8ea08

Browse files
committed
implemented handling async operations inside foreach.
1 parent 0454e6e commit 5c8ea08

File tree

8 files changed

+291
-13
lines changed

8 files changed

+291
-13
lines changed

build.sbt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ resolvers += Resolver.sonatypeRepo("snapshots")
99

1010
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
1111

12-
scalacOptions ++= Seq("-unchecked","-deprecation", "-feature" /* , "-Ymacro-debug-lite" , "-Ydebug" , "-Ylog:lambdalift" */ )
12+
scalacOptions ++= Seq("-unchecked","-deprecation", "-feature" /* , "-Ymacro-debug-lite" , "-Ydebug" , "-Ylog:lambdalift" */ )
1313

1414
libraryDependencies <+= scalaVersion( "org.scala-lang" % "scala-reflect" % _ )
1515

16-
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
16+
//libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
17+
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.6-SNAPSHOT"
1718

1819
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
1920

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ object SelectorBuilder
279279

280280
val symbolsToErase = Set(caseDef.pat.symbol, caseDef.pat.symbol.owner)
281281

282-
// Loook's like bug in 'untypecheck' : when we split cassDef on few functions, than sometines, symbols
282+
// when we split cassDef on few functions, than sometines, symbols
283283
// entries in identifier tree are not cleared.
284284
// So, we 'reset' symbols which belong to caseDef which will be erased by macros
285285
// //TODO: check, may be will be better to use scala-compiler internal API and changeOwner instead.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package gopher.goasync
2+
3+
import scala.language.experimental.macros
4+
import scala.language.reflectiveCalls
5+
import scala.reflect.macros.blackbox.Context
6+
import scala.reflect.api._
7+
import gopher._
8+
import gopher.util._
9+
import scala.concurrent._
10+
import scala.annotation.unchecked._
11+
12+
13+
14+
object AsyncApply
15+
{
16+
17+
18+
def impl1[A:c.WeakTypeTag,B:c.WeakTypeTag,C:c.WeakTypeTag](c:Context)(hof:c.Expr[Function[Function[A,B],C]])(nf:c.Expr[Function[A,Future[B]]]):
19+
c.Expr[Future[C]] =
20+
{
21+
import c.universe._
22+
val nhof = transformHof[A,B,C](c)(hof.tree)
23+
val retval = c.Expr[Future[C]](q"${nhof}(${nf})")
24+
System.err.println("retval:"+retval)
25+
retval
26+
}
27+
28+
def transformHof[A:c.WeakTypeTag,B:c.WeakTypeTag,C:c.WeakTypeTag](c:Context)(hof:c.Tree):c.Tree = {
29+
import c.universe.{Function=>_,_}
30+
hof match {
31+
case q"${p}.$h" =>
32+
val ah = genAsyncName(c)(h,hof.pos)
33+
q"${p}.${ah}"
34+
case q"${p}.$h[$w]" =>
35+
val ah = genAsyncName(c)(h,hof.pos)
36+
q"${p}.${ah}[$w]"
37+
case q"($fp)=>$res($fp1)" if (fp.symbol == fp1.symbol) =>
38+
val nested = transformHof[A,B,C](c)(res)
39+
val nname = TermName(c.freshName())
40+
val paramType = tq"Function[${c.weakTypeOf[A]},Future[${c.weakTypeOf[B]}]]"
41+
//val paramDef = q"val $nname:${paramType}"
42+
val paramDef = ValDef(Modifiers(Flag.PARAM),nname,paramType,EmptyTree)
43+
c.typecheck(q"($paramDef)=>$nested($nname)")
44+
case q"{ ..$stats }" =>
45+
val nstats = transformLast(c){
46+
t => transformHof[A,B,C](c)(t)
47+
}(stats)
48+
q"{ ..$nstats }"
49+
case _ => c.abort(hof.pos,"hof match failed:"+hof)
50+
}
51+
}
52+
53+
def genAsyncName(c:Context)(h:c.TermName,pos:c.Position):c.TermName =
54+
{
55+
import c.universe._
56+
h match {
57+
case TermName(hname) =>
58+
TermName(hname+"Async")
59+
case _ =>
60+
c.abort(pos,"ident expected for hight order function")
61+
}
62+
}
63+
64+
def transformLast(c:Context)(f:c.Tree=>c.Tree)(block: List[c.Tree]):List[c.Tree] =
65+
block match {
66+
case Nil => Nil
67+
case r::Nil => f(r)::Nil
68+
case h::q => h::transformLast(c)(f)(q)
69+
}
70+
71+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package gopher.goasync
2+
3+
import scala.concurrent._
4+
import scala.async.Async._
5+
import scala.collection.generic._
6+
7+
8+
class AsyncIterable[T](val x:Iterable[T]) //extends AnyVal [implementation restriction, [scala-2.11.8]
9+
{
10+
11+
def foreachAsync[U](f: T => Future[U])(implicit ec:ExecutionContext): Future[Unit] =
12+
async{
13+
val it = x.iterator
14+
while(it.hasNext) {
15+
await(f(it.next))
16+
}
17+
}
18+
19+
def mapAsync[U,That](f: T => Future[U])(implicit bf: CanBuildFrom[Iterable[T],U,That], ec: ExecutionContext): Future[That] =
20+
{
21+
async {
22+
val builder = bf.apply()
23+
val it = x.iterator
24+
while(it.hasNext) {
25+
builder += await(f(it.next))
26+
}
27+
builder.result()
28+
}
29+
}
30+
31+
}
32+

src/main/scala/gopher/goasync/GoAsync.scala

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,26 @@ import scala.annotation.unchecked._
1313
/**
1414
* async arround go.
1515
*
16-
* Basicly go is wrapped inside SIP-22 async with defer
16+
* Basicly go is
17+
* 1. translate await-like exressions inside inline functions to calls of appropriative async functions.
18+
* (or show error if not found).
19+
*```
20+
* x.foreach{ x => p; await(x); .. }
21+
*```
22+
* become
23+
*```
24+
* await( transform-defer( x.foreachAsync{ x => async(p; await(x); ..) }) )
25+
*```
26+
* (note, that channel.read macroses are expanded to await-s on this point)
27+
*
28+
* 2. transform defer calls if defer statement is found inside go:
29+
*```
30+
* asnyc{ p .. defer(x) .. }
31+
*```
32+
* become (reallity is a little complext, here is just idea)
33+
*```
34+
* { val d = new Defers(); async{ p .. d.defer(x) .. }.onComplete(d.tryProcess) }
35+
*```
1736
*/
1837
object GoAsync
1938
{
@@ -23,26 +42,27 @@ object GoAsync
2342
def goImpl[T:c.WeakTypeTag](c:Context)(body:c.Expr[T])(ec:c.Expr[ExecutionContext]):c.Expr[Future[T]] =
2443
{
2544
import c.universe._
26-
if (containsDefer(c)(body)) {
45+
val nbody = GoAsync.transformAsyncBody[T](c)(body.tree)
46+
val r = if (containsDefer(c)(body)) {
2747
val defers = TermName(c.freshName)
2848
val promise = TermName(c.freshName)
2949
// asyn transform wantstyped tree on entry, so we must substitute 'defers' to untyped
3050
// values after it, no before.
31-
c.Expr[Future[T]](
3251
q"""
3352
gopher.goasync.GoAsync.transformDeferMacro[${c.weakTypeOf[T]}](
3453
{implicit val ${defers} = new Defers[${c.weakTypeOf[T]}]()
3554
val ${promise} = Promise[${c.weakTypeOf[T]}]()
36-
scala.async.Async.async(${body})(${ec}).onComplete( x =>
55+
scala.async.Async.async(${nbody})(${ec}).onComplete( x =>
3756
${promise}.complete(${defers}.tryProcess(x))
3857
)(${ec})
3958
${promise}.future
4059
}
4160
)
42-
""")
61+
"""
4362
} else {
44-
c.Expr[Future[T]](q"scala.async.Async.async(${body})(${ec})")
63+
q"scala.async.Async.async(${nbody})(${ec})"
4564
}
65+
c.Expr[Future[T]](r)
4666
}
4767

4868
def goScopeImpl[T:c.WeakTypeTag](c:Context)(body:c.Expr[T]):c.Expr[T] =
@@ -101,6 +121,86 @@ object GoAsync
101121
transformer.transform(body)
102122
}
103123

124+
def transformAsyncBody[T:c.WeakTypeTag](c:Context)(body:c.Tree):c.Tree =
125+
{
126+
import c.universe._
127+
val transformer = new Transformer {
128+
override def transform(tree:Tree):Tree =
129+
tree match {
130+
case q"${f1}(${a}=>${b})" =>
131+
val found = findAwait(c)(b)
132+
if (found) {
133+
//System.err.println(s"found hof entry ${f1} ${a}=>${b}")
134+
val btype = b.tpe
135+
// untypechack is necessory, because async-transform later will corrupt
136+
// symbols owners inside b
137+
// [scala-2.11.8]
138+
val nb = c.untypecheck(b)
139+
val anb = atPos(b.pos){
140+
// typecheck here is needed to prevent incorrect liftingUp of
141+
// internal variables in ${b}
142+
//[scala-2.11.8]
143+
//c.typecheck(q"(${a})=>go[${btype}](${nb})")
144+
val nnb = transformAsyncBody(c)(nb)
145+
//c.typecheck(q"(${a})=>scala.async.Async.async[${btype}](${nnb})")
146+
q"(${a})=>scala.async.Async.async[${btype}](${nnb})"
147+
}
148+
val ar = atPos(tree.pos){
149+
// typecheck is necessory
150+
// 1. to prevent runnint analysis of async over internal awaits in anb as on
151+
// enclosing async instead those applied from asyncApply
152+
// 2. to expand macroses here, to prevent error during expanding macroses
153+
// in next typecheck
154+
c.typecheck(q"gopher.asyncApply1(${f1})(${anb})")
155+
//q"gopher.asyncApply1(${f1})(${anb})"
156+
}
157+
//typecheck with macros disabled is needed for compiler,
158+
//to set symbol 'await', because async macro discovered
159+
//awaits by looking at symbole
160+
val r = c.typecheck(q"scala.async.Async.await[${btype}](${ar})",withMacrosDisabled=true)
161+
r
162+
} else {
163+
super.transform(tree)
164+
}
165+
case _ =>
166+
super.transform(tree)
167+
}
168+
}
169+
transformer.transform(body)
170+
}
104171

172+
def findAwait(c:Context)(body:c.Tree): Boolean =
173+
{
174+
import c.universe._
175+
var found: Boolean = false
176+
val transformer = new Transformer {
177+
178+
override def transform(tree:Tree):Tree =
179+
{
180+
if (found)
181+
tree
182+
else {
183+
tree match {
184+
case q"(scala.async.Async.await[${w}]($r)):${w1}"=>
185+
System.err.println(s"found await: [${w}](${r})")
186+
found = true
187+
// here we erase 'await' symbols
188+
//q"(scala.async.Async.await[${w}]($r)):${w1}"
189+
tree
190+
case q"(${a}=>${b})" =>
191+
// don't touch nested functions
192+
tree
193+
case _ =>
194+
super.transform(tree)
195+
}
196+
}
197+
}
198+
199+
}
200+
transformer.transform(body)
201+
found
202+
}
203+
204+
105205
}
106206

src/main/scala/gopher/package.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
import scala.language.experimental.macros
33
import scala.language.implicitConversions
44

5+
import scala.concurrent._
6+
import gopher.channels._
7+
import gopher.goasync._
8+
59
/**
610
* Provides scala API for 'go-like' CSP channels.
711
*
@@ -66,9 +70,6 @@ import scala.language.implicitConversions
6670
package object gopher {
6771

6872

69-
import scala.concurrent._
70-
import gopher.channels._
71-
import gopher.goasync._
7273

7374
//
7475
// magnetic arguments for selector-builder unsugared API
@@ -172,7 +173,7 @@ import gopher.goasync._
172173
def aread: Future[T] = f
173174
}
174175

175-
//import scala.language.experimental.macros
176+
import scala.language.experimental.macros
176177
import scala.reflect.macros.blackbox.Context
177178
import scala.reflect.api._
178179
def awaitImpl[T](c:Context)(v:c.Expr[Future[T]]):c.Expr[T] =
@@ -181,5 +182,11 @@ import gopher.goasync._
181182
c.Expr[T](q"scala.async.Async.await($v)")
182183
}
183184

185+
186+
def asyncApply1[A,B,C](hof:(A=>B)=>C)(nf:A=>Future[B]):Future[C] =
187+
macro gopher.goasync.AsyncApply.impl1[A,B,C]
188+
189+
implicit def toAsyncIterable[T](x:Iterable[T]):AsyncIterable[T] = new AsyncIterable[T](x)
190+
184191
}
185192

src/main/scala/gopher/util/MacroUtil.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ object MacroUtil
5454
}
5555

5656

57+
5758
final val SHORT_LEN = 80
5859
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package gopher.hofasyn
2+
3+
import gopher._
4+
import gopher.channels._
5+
import scala.language._
6+
import scala.concurrent._
7+
import scala.concurrent.duration._
8+
import scala.async.Async._
9+
10+
import org.scalatest._
11+
import gopher.tags._
12+
13+
14+
/*
15+
* code from go tutorial: http://tour.golang.org/#66
16+
*
17+
*/
18+
object FibonaccyL {
19+
20+
import scala.concurrent.ExecutionContext.Implicits.global
21+
22+
def fibonacci(c: Output[Long], quit: Input[Int]): Future[Unit] =
23+
go {
24+
var (x,y) = (0L,1L)
25+
for(s <- gopherApi.select.forever) {
26+
s match {
27+
case z: c.write if (z == x) =>
28+
x = y
29+
y = z+y
30+
case q: quit.read =>
31+
implicitly[FlowTermination[Unit]].doExit(())
32+
}
33+
}
34+
}
35+
36+
def run(n:Int, acceptor: Long => Unit ): Future[Unit] =
37+
{
38+
val c = gopherApi.makeChannel[Long](1);
39+
val quit = gopherApi.makeChannel[Int](1);
40+
go {
41+
for(i <-1 to n) {
42+
val xLLFind = c.read
43+
//Console.println(s"received: ${i}, ${xLLFind}")
44+
acceptor(xLLFind)
45+
}
46+
//System.err.println("sending quit")
47+
quit <~ 0
48+
}
49+
50+
fibonacci(c,quit)
51+
}
52+
53+
def gopherApi = CommonTestObjects.gopherApi
54+
55+
}
56+
57+
class FibonaccyAsyncLoopSuite extends FunSuite
58+
{
59+
60+
test("fibonaccy must be processed up to 50") {
61+
@volatile var last:Long = 0;
62+
Await.ready( FibonaccyL.run(50, last = _ ), 10 seconds )
63+
assert(last != 0)
64+
}
65+
66+
}

0 commit comments

Comments
 (0)