Skip to content

Commit d44322e

Browse files
committed
abstract-out channel to abstract class.
1 parent a43924e commit d44322e

File tree

8 files changed

+102
-69
lines changed

8 files changed

+102
-69
lines changed

src/main/scala/gopher/GopherAPI.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,16 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
4343
*/
4444
def makeChannel[A](capacity: Int = 1) =
4545
{
46+
require(capacity > 0)
47+
// TODO: add support for unbuffered channels.
4648
val nextId = newChannelId
4749
val futureChannelRef = (channelSupervisorRef.ask(
4850
NewChannel(nextId, capacity)
4951
)(10 seconds)
5052
.asInstanceOf[Future[ActorRef]]
5153
)
52-
new Channel[A](futureChannelRef, this)
54+
55+
new ActorBackedChannel[A](futureChannelRef, this)
5356
}
5457

5558
def makeEffectedInput[A](in: Input[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single) =
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package gopher.channels
2+
3+
4+
import akka.actor._
5+
import akka.pattern._
6+
import scala.concurrent._
7+
import scala.concurrent.duration._
8+
import gopher._
9+
import scala.language.experimental.macros
10+
import scala.language.postfixOps
11+
import scala.reflect.macros.blackbox.Context
12+
import scala.reflect.api._
13+
14+
class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A](api)
15+
{
16+
17+
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]], flwt: FlowTermination[B] ): Unit =
18+
{
19+
val cont = ContRead(f,this, flwt)
20+
def applyClosed() =
21+
{
22+
f(cont) foreach { f1 => try {
23+
api.continue( f1(ContRead.ChannelClosed), flwt)
24+
} catch {
25+
case ex: Throwable => flwt.doThrow(ex)
26+
}
27+
}
28+
}
29+
if (closed) {
30+
if (closedEmpty) {
31+
applyClosed();
32+
} else {
33+
// TODO: ask timeput on closed channel set in config.
34+
futureChannelRef.foreach{ ref => val f = ref.ask(ClosedChannelRead(cont))(5 seconds)
35+
f.onFailure{
36+
case e: AskTimeoutException => applyClosed()
37+
}
38+
f.onSuccess{
39+
case ChannelCloseProcessed(0) =>
40+
closedEmpty = true
41+
}
42+
}
43+
}
44+
} else {
45+
futureChannelRef.foreach( _ ! cont )
46+
}
47+
}
48+
49+
private def contRead[B](x:ContRead[A,B]): Unit =
50+
futureChannelRef.foreach( _ ! x )
51+
52+
def cbwrite[B](f: ContWrite[A,B] => Option[(A,Future[Continuated[B]])], flwt: FlowTermination[B] ): Unit =
53+
if (closed) {
54+
flwt.doThrow(new ChannelClosedException())
55+
} else {
56+
futureChannelRef.foreach( _ ! ContWrite(f,this, flwt) )
57+
}
58+
59+
private def contWrite[B](x:ContWrite[A,B]): Unit =
60+
futureChannelRef.foreach( _ ! x )
61+
62+
private[this] implicit val ec = api.executionContext
63+
64+
def isClosed: Boolean = closed
65+
66+
def close(): Unit =
67+
{
68+
futureChannelRef.foreach( _ ! ChannelClose )
69+
closed=true
70+
}
71+
72+
override protected def finalize(): Unit =
73+
{
74+
// allow channel actor be grabage collected
75+
futureChannelRef.foreach( _ ! ChannelRefDecrement )
76+
}
77+
78+
private var closed = false
79+
private var closedEmpty = false
80+
}
81+

src/main/scala/gopher/channels/Channel.scala

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,72 +11,10 @@ import scala.language.postfixOps
1111
import scala.reflect.macros.blackbox.Context
1212
import scala.reflect.api._
1313

14-
// TODO: change channelRef to channelSelector
15-
class Channel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends InputOutput[A]
14+
abstract class Channel[A](override val api: GopherAPI) extends InputOutput[A]
1615
{
1716

18-
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]], flwt: FlowTermination[B] ): Unit =
19-
{
20-
val cont = ContRead(f,this, flwt)
21-
def applyClosed() =
22-
{
23-
f(cont) foreach { f1 => try {
24-
api.continue( f1(ContRead.ChannelClosed), flwt)
25-
} catch {
26-
case ex: Throwable => flwt.doThrow(ex)
27-
}
28-
}
29-
}
30-
if (closed) {
31-
if (closedEmpty) {
32-
applyClosed();
33-
} else {
34-
// TODO: ask timeput on closed channel set in config.
35-
futureChannelRef.foreach{ ref => val f = ref.ask(ClosedChannelRead(cont))(5 seconds)
36-
f.onFailure{
37-
case e: AskTimeoutException => applyClosed()
38-
}
39-
f.onSuccess{
40-
case ChannelCloseProcessed(0) =>
41-
closedEmpty = true
42-
}
43-
}
44-
}
45-
} else {
46-
futureChannelRef.foreach( _ ! cont )
47-
}
48-
}
17+
def close(): Unit
4918

50-
private def contRead[B](x:ContRead[A,B]): Unit =
51-
futureChannelRef.foreach( _ ! x )
52-
53-
def cbwrite[B](f: ContWrite[A,B] => Option[(A,Future[Continuated[B]])], flwt: FlowTermination[B] ): Unit =
54-
if (closed) {
55-
flwt.doThrow(new ChannelClosedException())
56-
} else {
57-
futureChannelRef.foreach( _ ! ContWrite(f,this, flwt) )
58-
}
59-
60-
private def contWrite[B](x:ContWrite[A,B]): Unit =
61-
futureChannelRef.foreach( _ ! x )
62-
63-
private[this] implicit val ec = api.executionContext
64-
65-
def isClosed: Boolean = closed
66-
67-
def close(): Unit =
68-
{
69-
futureChannelRef.foreach( _ ! ChannelClose )
70-
closed=true
71-
}
72-
73-
override protected def finalize(): Unit =
74-
{
75-
// allow channel actor be grabage collected
76-
futureChannelRef.foreach( _ ! ChannelRefDecrement )
77-
}
78-
79-
private var closed = false
80-
private var closedEmpty = false
8119
}
8220

src/main/scala/gopher/channels/ChannelActor.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,14 @@ class ChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Actor
118118
private[this] def processReader[B](reader:ContRead[A,B]): Boolean =
119119
reader.function(reader) match {
120120
case Some(f1) =>
121-
val cont = f1(ContRead.In value elementAt(readIndex) )
121+
val readedElement = elementAt(readIndex)
122122
nElements-=1
123123
readIndex+=1
124124
readIndex%=capacity
125-
api.continue(cont, reader.flowTermination)
125+
Future{
126+
val cont = f1(ContRead.In value readedElement )
127+
api.continue(cont, reader.flowTermination)
128+
}(api.executionContext)
126129
true
127130
case None =>
128131
false
@@ -216,7 +219,7 @@ class ChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Actor
216219

217220

218221
// boxed representation of type.
219-
val buffer= new Array[AnyRef](capacity)
222+
val buffer= new Array[AnyRef](capacity+1)
220223
var readIndex=0
221224
var writeIndex=0
222225
var nElements=0

src/main/scala/gopher/channels/Input.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ trait Input[A]
2525
type <~ = A
2626
type read = A
2727

28+
// TODO: use closed in selector.
2829

2930
/**
3031
* apply f, when input will be ready and send result to API processor

src/main/scala/gopher/channels/SelectorBuilder.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ trait SelectorBuilder[A]
6868

6969
}
7070

71+
72+
class SelectorBuilderImpl(c: Context)
73+
{
74+
75+
}
76+
7177
object SelectorBuilder
7278
{
7379

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class MacroSelectSuite extends FunSuite
195195
assert(x1+x2+x3+x4+x5==1)
196196
}
197197

198-
test("fold in selector") {
198+
test("fold over selector") {
199199
import gopherApi._
200200
for(i <- 1 to 100) {
201201
val ch = makeChannel[Int](10)

src/test/scala/gopher/internal/FoldParseSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ object FoldData {
4343
}
4444
}
4545

46+
4647
def run1(n:Int, acceptor: Long => Unit ): Future[(Long,Long,Long)] =
4748
{
4849
val c = gopherApi.makeChannel[Long](1);

0 commit comments

Comments
 (0)