Skip to content

Commit db164a5

Browse files
committed
implemented async hanfdling of option operations.
1 parent 6b36608 commit db164a5

File tree

7 files changed

+282
-50
lines changed

7 files changed

+282
-50
lines changed

notes/0.99.8.markdown

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,10 @@
1-
- added support for select.timeout construct
1+
- added support for select.timeout construct
2+
- added support for lifting-up await in hight-order functions.
3+
ie in simplicified explanation:
4+
```
5+
for(x <- 1 to n) { s += read(x) }
6+
```
7+
is translated to
8+
```
9+
1.to(n).foreachAsync { async(s += await(aread(x))) }
10+
```

src/main/scala/gopher/goasync/AsyncApply.scala

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,114 @@ object AsyncApply
1919
c.Expr[Future[C]] =
2020
{
2121
import c.universe._
22-
val nhof = transformHof[A,B,C](c)(hof.tree)
22+
val nhof = transformHof[A,B](c)(hof.tree,List())
2323
val retval = c.Expr[Future[C]](q"${nhof}(${nf})")
2424
retval
2525
}
2626

27-
def transformHof[A:c.WeakTypeTag,B:c.WeakTypeTag,C:c.WeakTypeTag](c:Context)(hof:c.Tree):c.Tree = {
27+
def apply1i[A,B,C](hof:Function[Function[A,B],C])(nf:Function[A,Future[B]],implicitParams:Any*):Future[C] = macro AsyncApply.impl1i[A,B,C]
28+
29+
30+
def impl1i[A:c.WeakTypeTag,B:c.WeakTypeTag,C:c.WeakTypeTag](c:Context)(hof:c.Expr[Function[Function[A,B],C]])(nf:c.Expr[Function[A,Future[B]]],implicitParams:c.Expr[Any]*): c.Expr[Future[C]] =
31+
{
32+
import c.universe._
33+
val nhof = transformHof[A,B](c)(hof.tree,implicitParams.map(_.tree))
34+
val retval = (q"${nhof}(${nf})")
35+
c.Expr[Future[C]](retval)
36+
}
37+
38+
def transformHof[A:c.WeakTypeTag,B:c.WeakTypeTag](c:Context)(hof:c.Tree,imps:Seq[c.Tree]):c.Tree = {
2839
import c.universe.{Function=>_,_}
2940
hof match {
3041
case q"${p}.$h" =>
3142
val ah = genAsyncName(c)(h,hof.pos)
3243
q"${p}.${ah}"
33-
case q"${p}.$h[$w]" =>
44+
case q"${p}.$h[..$w]" =>
3445
val ah = genAsyncName(c)(h,hof.pos)
35-
q"${p}.${ah}[$w]"
46+
q"${p}.${ah}[..$w]"
3647
case q"($fp)=>$res($fp1)" if (fp.symbol == fp1.symbol) =>
37-
val nested = transformHof[A,B,C](c)(res)
38-
val nname = TermName(c.freshName())
39-
val paramType = tq"Function[${c.weakTypeOf[A]},Future[${c.weakTypeOf[B]}]]"
40-
//val paramDef = q"val $nname:${paramType}"
41-
val paramDef = ValDef(Modifiers(Flag.PARAM),nname,paramType,EmptyTree)
42-
c.typecheck(q"($paramDef)=>$nested($nname)")
48+
val nested = transformHof[A,B](c)(res,imps)
49+
val (paramName, paramDef) = createAsyncParam[A,B](c)(fp)
50+
val mfp2 = appendImplicitExecutionContext(c)(imps)
51+
val transformed = q"($paramDef)=>$nested($paramName)(..$mfp2)"
52+
c.typecheck(transformed)
53+
case q"($fp)=>$res($fp1)(..$fp2)" if (fp.symbol == fp1.symbol) =>
54+
// ..fp2 is a list of implicit params.
55+
val nested = transformHof[A,B](c)(res,imps)
56+
val (paramName, paramDef) = createAsyncParam[A,B](c)(fp)
57+
val mfp2 = appendImplicitExecutionContext(c)(fp2)
58+
val r = q"($paramDef)=>$nested($paramName)(..$mfp2)"
59+
r
60+
case q"($fp)=>$res[$w1,$w2]($fp1)($fp2)" =>
61+
c.abort(hof.pos,"A1"+hof)
62+
case q"($fp:$ft)=>$a" =>
63+
c.abort(hof.pos,"a="+a)
4364
case q"{ ..$stats }" =>
65+
try {
4466
val nstats = transformLast(c){
45-
t => transformHof[A,B,C](c)(t)
67+
t => transformHof[A,B](c)(t,imps)
4668
}(stats)
4769
q"{ ..$nstats }"
48-
case _ => c.abort(hof.pos,"hof match failed:"+hof)
70+
} catch {
71+
case ex: Throwable =>
72+
System.err.println(s"error during transforming ${stats}")
73+
ex.printStackTrace()
74+
throw ex
75+
}
76+
case q"{ $stats }" =>
77+
//System.err.println(s"catched, stats.length==${stats.length}")
78+
System.err.println(s"stats = ${stats}")
79+
System.err.println(s"raw = ${showRaw(stats)}")
80+
throw new RuntimeException("qqq")
81+
case _ => c.abort(hof.pos,"hof match failed:"+hof+"\n raw:"+showRaw(hof))
82+
}
83+
}
84+
85+
def createAsyncParam[A:c.WeakTypeTag,B:c.WeakTypeTag](c:Context)(fp:c.Tree):(c.TermName,c.Tree) =
86+
{
87+
import c.universe._
88+
// TODO: check that fp is ident and get fp as name.
89+
val nname = TermName(c.freshName())
90+
val paramType = tq"Function[${c.weakTypeOf[A]},Future[${c.weakTypeOf[B]}]]"
91+
(nname,ValDef(Modifiers(Flag.PARAM),nname,paramType,EmptyTree))
92+
}
93+
94+
def inferImplicitExecutionContext(c:Context)():c.Tree =
95+
{
96+
val ect = c.weakTypeOf[scala.concurrent.ExecutionContext]
97+
c.inferImplicitValue(ect, silent=false)
98+
}
99+
100+
101+
def appendImplicitExecutionContext(c:Context)(paramList:Seq[c.Tree]):Seq[c.Tree] =
102+
{
103+
val t = inferImplicitExecutionContext(c)()
104+
paramList.find(_.symbol == t.symbol) match {
105+
case None => paramList :+ t
106+
case Some(v) => paramList
107+
}
108+
}
109+
110+
/*
111+
def appendImplicitExecutionContextParam(c:Context)(paramList:List[c.Tree]):List[c.Tree]=
112+
{
113+
// check that paramList not contains ec.
114+
// (note, input must be typed
115+
paramList.find{ x =>
116+
x match {
117+
case ValDef(m,pn,pt,pv) =>
118+
m.hasFlag(Flag.IMPLICIT) && pt =:= c.weakTypeOf[scala.concurrent.ExecutionContext]
119+
case _ => false
120+
}
121+
} match {
122+
case None =>
123+
val pName = TermName(c.freshName("ec"))
124+
val pType = c.weakTypeOf[scala.concurrent.ExecutionContext]
125+
ValDef(Modifiers(Flag.PARAM|Flag.IMPLICIT),pName,paramType,EmptyTree)
126+
49127
}
50128
}
129+
*/
51130

52131
def genAsyncName(c:Context)(h:c.TermName,pos:c.Position):c.TermName =
53132
{

src/main/scala/gopher/goasync/AsyncIterable.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@ import scala.collection.generic._
88
class AsyncIterable[T](val x:Iterable[T]) //extends AnyVal [implementation restriction, [scala-2.11.8]
99
{
1010

11+
1112
def foreachAsync[U](f: T => Future[U])(implicit ec:ExecutionContext): Future[Unit] =
1213
async{
1314
val it = x.iterator
1415
while(it.hasNext) {
1516
await(f(it.next))
1617
}
1718
}
19+
1820

19-
def mapAsync[U,That](f: T => Future[U])(implicit bf: CanBuildFrom[Iterable[T],U,That], ec: ExecutionContext): Future[That] =
20-
{
21+
def mapAsync[U,Z](f: T => Future[U])(implicit bf: CanBuildFrom[_,U,Z], ec:ExecutionContext): Future[Z] =
2122
async {
2223
val builder = bf.apply()
2324
val it = x.iterator
@@ -26,7 +27,7 @@ class AsyncIterable[T](val x:Iterable[T]) //extends AnyVal [implementation restr
2627
}
2728
builder.result()
2829
}
29-
}
3030

3131
}
3232

33+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package gopher.goasync
2+
3+
import scala.concurrent._
4+
import scala.async.Async._
5+
import scala.collection.generic._
6+
7+
8+
class AsyncOption[T](val x:Option[T]) extends AnyVal
9+
{
10+
11+
12+
def foreachAsync[U](f: T => Future[U])(implicit ec:ExecutionContext): Future[Unit] =
13+
{
14+
if (x.isDefined) {
15+
f(x.get) map (_ => ())
16+
} else {
17+
Future successful (())
18+
}
19+
}
20+
21+
22+
def mapAsync[U](f: T => Future[U])(implicit ec:ExecutionContext): Future[Option[U]] =
23+
{
24+
if (x.isDefined) {
25+
f(x.get) map (x => Some(x))
26+
} else {
27+
Future successful None
28+
}
29+
}
30+
31+
def flatMapAsync[U](f: T => Future[Option[U]])(implicit ec:ExecutionContext): Future[Option[U]] =
32+
{
33+
if (x.isDefined) {
34+
f(x.get)
35+
} else {
36+
Future successful None
37+
}
38+
}
39+
40+
def filterAsync(f: T=>Future[Boolean])(implicit ec:ExecutionContext): Future[Option[T]] =
41+
{
42+
if (x.isDefined) {
43+
f(x.get) map { r =>
44+
if (r) x else None
45+
}
46+
} else {
47+
Future successful None
48+
}
49+
}
50+
51+
}
52+
53+

src/main/scala/gopher/goasync/GoAsync.scala

Lines changed: 94 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -124,51 +124,104 @@ object GoAsync
124124
def transformAsyncBody[T:c.WeakTypeTag](c:Context)(body:c.Tree):c.Tree =
125125
{
126126
import c.universe._
127+
var found = false
127128
val transformer = new Transformer {
128129
override def transform(tree:Tree):Tree =
129130
tree match {
131+
case q"${f1}(${a}=>${b})(..$a2)" =>
132+
// TODO: cache in tree.
133+
found = findAwait(c)(b)
134+
if (found) {
135+
// this can be implicit parameters of inline apply.
136+
// whe can distinguish first from second by looking at f1 shape.
137+
// for now will assume
138+
System.err.println(s"inline-await + implicit, a2=${a2}")
139+
System.err.println(s"inline-await ,tree=${tree}")
140+
System.err.println(s"inline-await ,tree.tpe=${tree.tpe}")
141+
val isImplicit = f1 match {
142+
case TypeApply(Select(x,m),w) =>
143+
System.err.println(s"typed select, x=$x, m=$m, w=$w")
144+
System.err.println(s"x.tpe=${x.tpe}")
145+
System.err.println(s"x.symbol=${x.symbol}")
146+
System.err.println(s"tree.symbol=${tree.symbol}")
147+
if (! (x.tpe eq null) ) {
148+
val sym = x.tpe.member(m)
149+
System.err.println("sym=$sym")
150+
} else {
151+
true
152+
}
153+
case q"$x.$m[$w]" =>
154+
System.err.println(s"typed select, x=$x, m=$m, w=$w")
155+
case q"$x.$m" =>
156+
System.err.println(s"select, x=$x, m=$m")
157+
true
158+
case q"($x.$m)[$w]" =>
159+
System.err.println(s"typed select-1, x=$x, m=$m, w=$w")
160+
true
161+
case _ =>
162+
System.err.println(s"other: ${f1}")
163+
System.err.println(s"raw: ${showRaw(f1)}")
164+
true
165+
}
166+
transformInlineHofCall1(c)(f1,a,b,a2)
167+
}else{
168+
super.transform(tree)
169+
}
130170
case q"${f1}(${a}=>${b})" =>
131-
val found = findAwait(c)(b)
171+
found = findAwait(c)(b)
132172
if (found) {
133-
//System.err.println(s"found hof entry ${f1} ${a}=>${b}")
134-
val btype = b.tpe
135-
// untypechack is necessory, because async-transform later will corrupt
136-
// symbols owners inside b
137-
// [scala-2.11.8]
138-
val nb = c.untypecheck(b)
139-
val anb = atPos(b.pos){
140-
// typecheck here is needed to prevent incorrect liftingUp of
141-
// internal variables in ${b}
142-
//[scala-2.11.8]
143-
//c.typecheck(q"(${a})=>go[${btype}](${nb})")
144-
val nnb = transformAsyncBody(c)(nb)
145-
//c.typecheck(q"(${a})=>scala.async.Async.async[${btype}](${nnb})")
146-
q"(${a})=>scala.async.Async.async[${btype}](${nnb})"
147-
}
148-
val ar = atPos(tree.pos){
149-
// typecheck is necessory
150-
// 1. to prevent runnint analysis of async over internal awaits in anb as on
151-
// enclosing async instead those applied from asyncApply
152-
// 2. to expand macroses here, to prevent error during expanding macroses
153-
// in next typecheck
154-
c.typecheck(q"gopher.asyncApply1(${f1})(${anb})")
155-
//q"gopher.asyncApply1(${f1})(${anb})"
156-
}
157-
//typecheck with macros disabled is needed for compiler,
158-
//to set symbol 'await', because async macro discovered
159-
//awaits by looking at symbole
160-
val r = c.typecheck(q"scala.async.Async.await[${btype}](${ar})",withMacrosDisabled=true)
161-
r
173+
transformInlineHofCall1(c)(f1,a,b,List())
162174
} else {
163175
super.transform(tree)
164176
}
165177
case _ =>
166178
super.transform(tree)
167179
}
168180
}
169-
transformer.transform(body)
181+
val r = transformer.transform(body)
182+
r
170183
}
171184

185+
// handle things like:
186+
// q"${fun}(${param}=>${body})($implicitParams)" =>
187+
def transformInlineHofCall1(c:Context)(fun:c.Tree,param:c.Tree,body:c.Tree,implicitParams:List[c.Tree]):c.Tree =
188+
{
189+
import c.universe._
190+
val btype = body.tpe
191+
// untypechack is necessory, because async-transform later will corrupt
192+
// symbols owners inside body
193+
// [scala-2.11.8]
194+
val nb = c.untypecheck(body)
195+
val anb = atPos(body.pos){
196+
val nnb = transformAsyncBody(c)(nb)
197+
val ec = c.inferImplicitValue(c.weakTypeOf[ExecutionContext])
198+
q"(${param})=>scala.async.Async.async[${btype}](${nnb})($ec)"
199+
}
200+
val ar = atPos(fun.pos) {
201+
val uar = if (implicitParams.isEmpty) {
202+
q"gopher.asyncApply1(${fun})(${anb})"
203+
} else {
204+
//we can't call macros here, becouse we don't know types of implicitParams
205+
//val a = param.tpe
206+
//val b = body.tpe
207+
//AsyncApply.impl1i(c)(fun)(anb,implicitParams)(a,b)
208+
q"gopher.goasync.AsyncApply.apply1i(${fun})(${anb},${implicitParams})"
209+
//q"gopher.asyncApply1i(${fun})(${anb})(..$implicitParams)"
210+
}
211+
// typecheck is necessory
212+
// 1. to prevent runnint analysis of async over internal awaits in anb as on
213+
// enclosing async instead those applied from asyncApply
214+
// 2. to expand macroses here, to prevent error during expanding macroses
215+
// in next typecheck
216+
c.typecheck(uar)
217+
}
218+
//typecheck with macros disabled is needed for compiler,
219+
//to set symbol 'await', because async macro discovered
220+
//awaits by looking at symbols
221+
val r = c.typecheck(q"scala.async.Async.await(${ar})",withMacrosDisabled=true)
222+
r
223+
}
224+
172225
def findAwait(c:Context)(body:c.Tree): Boolean =
173226
{
174227
import c.universe._
@@ -183,8 +236,12 @@ object GoAsync
183236
tree match {
184237
case q"(scala.async.Async.await[${w}]($r)):${w1}"=>
185238
found = true
186-
// here we erase 'await' symbols
187-
//q"(scala.async.Async.await[${w}]($r)):${w1}"
239+
tree
240+
case q"scala.async.Async.await[${w}]($r)"=>
241+
found = true
242+
tree
243+
case q"(scala.async.Async.async[${w}]($r)):${w1}"=>
244+
//TODO: add test to test-case
188245
tree
189246
case q"(${a}=>${b})" =>
190247
// don't touch nested functions
@@ -200,6 +257,10 @@ object GoAsync
200257
found
201258
}
202259

260+
private def numberOfParamLists(c:Context)(obj:c.Tree,m:c.Name):Int =
261+
{
262+
???
263+
}
203264

204265
}
205266

src/main/scala/gopher/package.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,11 @@ package object gopher {
186186
def asyncApply1[A,B,C](hof:(A=>B)=>C)(nf:A=>Future[B]):Future[C] =
187187
macro gopher.goasync.AsyncApply.impl1[A,B,C]
188188

189-
implicit def toAsyncIterable[T](x:Iterable[T]):AsyncIterable[T] = new AsyncIterable[T](x)
189+
import scala.collection.generic._
190+
implicit def toAsyncIterable[T](x:Iterable[T]): AsyncIterable[T] = new AsyncIterable[T](x)
191+
implicit def toAsyncOption[T](x:Option[T]): AsyncOption[T] = new AsyncOption[T](x)
192+
193+
190194

191195
}
192196

0 commit comments

Comments
 (0)