Skip to content

Commit 8b8a4a5

Browse files
committed
added forgottne files
1 parent 4925970 commit 8b8a4a5

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package gopher.channels
2+
3+
import akka.actor._
4+
import scala.language._
5+
import scala.concurrent._
6+
import scala.collection.immutable._
7+
import gopher._
8+
9+
10+
/**
11+
* ChannelActor - actor, which leave
12+
*/
13+
abstract class BaseBufferedChannelActor[A](id:Long, api: GopherAPI) extends ChannelActor[A](id,api)
14+
{
15+
16+
def processReaders() : Boolean =
17+
{
18+
var retval = false
19+
while(!readers.isEmpty && nElements > 0) {
20+
val current = readers.head
21+
readers = readers.tail
22+
retval ||= processReader(current)
23+
}
24+
retval
25+
}
26+
27+
def stopIfEmpty: Boolean =
28+
{
29+
require(closed==true)
30+
if (nElements == 0) {
31+
stopReaders()
32+
}
33+
stopWriters()
34+
if (nElements == 0) {
35+
if (nRefs == 0) {
36+
// here we leave 'closed' channels in actor-system untile they will be
37+
// garbage-collected. TODO: think about actual stop ?
38+
self ! GracefullChannelStop
39+
}
40+
true
41+
} else
42+
false
43+
}
44+
45+
protected[this] def processReader[B](reader:ContRead[A,B]): Boolean
46+
47+
48+
protected[this] def getNElements(): Int = nElements
49+
50+
protected[this] var nElements=0
51+
52+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package gopher.goasync
2+
3+
import scala.language.experimental.macros
4+
import scala.reflect.macros.blackbox.Context
5+
import scala.reflect.api._
6+
import scala.concurrent._
7+
8+
object AsyncWrapper
9+
{
10+
11+
def async[T](x:T)(implicit ec:ExecutionContext):Future[T] = macro asyncImpl[T]
12+
13+
def await[T](x:Future[T]):T = macro awaitImpl[T]
14+
15+
def postWrap[T](x:T):T = macro postWrapImpl[T]
16+
17+
def asyncImpl[T](c:Context)(x:c.Expr[T])(ec:c.Expr[ExecutionContext]):c.Expr[Future[T]] =
18+
{
19+
import c.universe._
20+
c.Expr[Future[T]](q"gopher.goasync.AsyncWrapper.postWrap(scala.async.Async.async(${x})(${ec}))")
21+
}
22+
23+
def awaitImpl[T](c:Context)(x:c.Expr[Future[T]]):c.Expr[T] =
24+
{
25+
import c.universe._
26+
c.Expr[T](q"gopher.goasync.AsyncWrapper.postWrap(scala.async.Async.await(${x}))")
27+
}
28+
29+
def postWrapImpl[T](c:Context)(x:c.Expr[T]):c.Expr[T]=
30+
{
31+
import c.universe._
32+
x
33+
}
34+
35+
}

0 commit comments

Comments
 (0)