Skip to content

Commit 9956ae3

Browse files
committed
implemented Unbuffered channels
1 parent 02b35df commit 9956ae3

File tree

5 files changed

+235
-5
lines changed

5 files changed

+235
-5
lines changed

src/main/scala/gopher/GopherAPI.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,15 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
4343
*/
4444
def makeChannel[A](capacity: Int = 1) =
4545
{
46-
require(capacity > 0)
47-
// TODO: add support for unbuffered channels.
46+
require(capacity >= 0)
4847
val nextId = newChannelId
4948
val futureChannelRef = (channelSupervisorRef.ask(
5049
NewChannel(nextId, capacity)
5150
)(10 seconds)
5251
.asInstanceOf[Future[ActorRef]]
5352
)
5453

55-
new ActorBackedBufferedChannel[A](futureChannelRef, this)
54+
new ActorBackedChannel[A](futureChannelRef, this)
5655
}
5756

5857
def makeEffectedInput[A](in: Input[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single) =

src/main/scala/gopher/channels/ActorBackedBufferedChannel.scala renamed to src/main/scala/gopher/channels/ActorBackedChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import scala.language.postfixOps
1111
import scala.reflect.macros.blackbox.Context
1212
import scala.reflect.api._
1313

14-
class ActorBackedBufferedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A](api)
14+
class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A](api)
1515
{
1616

1717
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]], flwt: FlowTermination[B] ): Unit =

src/main/scala/gopher/channels/ChannelSupervisor.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ class ChannelSupervisor(api: GopherAPI) extends Actor
1212

1313
def receive = {
1414
case NewChannel(id,capacity) =>
15-
val props = Props(classOf[BufferedChannelActor[_]],id, capacity, api)
15+
val props = if (capacity==0) {
16+
Props(classOf[UnbufferedChannelActor[_]],id, api)
17+
} else {
18+
Props(classOf[BufferedChannelActor[_]],id, capacity, api)
19+
}
1620
sender ! context.actorOf(props, name=id.toString)
1721
case CloseChannel(id) =>
1822
context.actorSelection(id.toString) ! ChannelClose
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package gopher.channels
2+
3+
import akka.actor._
4+
import scala.language._
5+
import scala.concurrent._
6+
import scala.collection.immutable._
7+
import gopher._
8+
9+
10+
/**
11+
* ChannelActor - actor, which leave
12+
*/
13+
class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends Actor
14+
{
15+
16+
def receive = {
17+
case cw@ContWrite(_,_,ft) =>
18+
val cwa = cw.asInstanceOf[ContWrite[A,_]]
19+
if (closed) {
20+
ft.throwIfNotCompleted(new ChannelClosedException())
21+
} else if (!processReaders(cwa)) {
22+
writers = writers :+ cwa
23+
}
24+
case cr@ContRead(_,_,ft) =>
25+
val cra = cr.asInstanceOf[ContRead[A,_]]
26+
if (closed) {
27+
processReaderClosed(cra)
28+
} else if (!processWriters(cra)) {
29+
readers = readers :+ cra;
30+
}
31+
case ccr@ClosedChannelRead(_) =>
32+
self ! ccr.cont
33+
sender ! ChannelCloseProcessed(0)
34+
case ChannelClose =>
35+
closed=true
36+
stopIfEmpty
37+
case ChannelRefDecrement =>
38+
nRefs -= 1
39+
if (nRefs == 0) {
40+
stopAll
41+
}
42+
case ChannelRefIncrement =>
43+
nRefs += 1
44+
case GracefullChannelStop =>
45+
context.stop(self)
46+
}
47+
48+
def processReaders(w: ContWrite[A,_]) : Boolean =
49+
{
50+
var done = false
51+
while(!(done || readers.isEmpty)) {
52+
val current = readers.head
53+
readers = readers.tail
54+
done = processReader(current,w)
55+
}
56+
done
57+
}
58+
59+
private[this] def processReader[B,C](reader:ContRead[A,B],writer:ContWrite[A,C]): Boolean =
60+
reader.function(reader) match {
61+
case Some(f1) =>
62+
writer.function(writer) match {
63+
case Some((a,wcont)) =>
64+
Future{
65+
val cont = f1(ContRead.In value a)
66+
api.continue(cont, reader.flowTermination)
67+
}(api.executionContext)
68+
api.continue(wcont, writer.flowTermination)
69+
true
70+
case None =>
71+
val cont = f1(ContRead.Skip)
72+
api.continue(cont, reader.flowTermination)
73+
false
74+
}
75+
case None =>
76+
false
77+
}
78+
79+
private[this] def processReaderClosed[B](reader:ContRead[A,B]): Boolean =
80+
reader.function(reader) match {
81+
case Some(f1) => api.continue(f1(ContRead.ChannelClosed), reader.flowTermination)
82+
true
83+
case None => false
84+
}
85+
86+
def processWriters[C](reader:ContRead[A,C]): Boolean =
87+
{
88+
if (writers.isEmpty) {
89+
false
90+
} else {
91+
reader.function(reader) match {
92+
case Some(f1) =>
93+
var done = false
94+
while(!writers.isEmpty && !done) {
95+
val current = writers.head
96+
writers = writers.tail
97+
done = processWriter(current,f1,reader)
98+
}
99+
if (!done) {
100+
f1(ContRead.Skip)
101+
}
102+
done
103+
case None => true
104+
}
105+
}
106+
}
107+
108+
private[this] def processWriter[B,C](writer:ContWrite[A,B],
109+
f1:ContRead.In[A]=>Future[Continuated[C]],
110+
reader:ContRead[A,C]): Boolean =
111+
writer.function(writer) match {
112+
case Some((a,wcont)) =>
113+
Future {
114+
val rcont = f1(ContRead.In value a)
115+
api.continue(rcont,reader.flowTermination)
116+
}
117+
api.continue(wcont,writer.flowTermination)
118+
true
119+
case None =>
120+
false
121+
}
122+
123+
124+
private[this] def stopIfEmpty: Boolean =
125+
{
126+
require(closed==true)
127+
while(!readers.isEmpty) {
128+
val reader = readers.head
129+
val c = reader.asInstanceOf[ContRead[A,reader.R]]
130+
readers = readers.tail
131+
c.function(c) foreach { f1 =>
132+
api.continue(f1(ContRead.ChannelClosed), c.flowTermination)
133+
}
134+
}
135+
while(!writers.isEmpty) {
136+
val writer = writers.head
137+
val c = writer.asInstanceOf[ContWrite[A,writer.R]]
138+
writers = writers.tail
139+
c.function(c) foreach {
140+
f1 => c.flowTermination.throwIfNotCompleted(new ChannelClosedException())
141+
}
142+
}
143+
if (nRefs == 0) {
144+
// here we leave 'closed' channels in actor-system untile they will be
145+
// garbage-collected. TODO: think about actual stop ?
146+
self ! GracefullChannelStop
147+
}
148+
true
149+
}
150+
151+
def stopAll: Unit =
152+
{
153+
if (!closed) {
154+
closed=true
155+
}
156+
if (!stopIfEmpty) {
157+
// stop anyway
158+
self ! GracefullChannelStop
159+
}
160+
}
161+
162+
private[this] implicit def ec: ExecutionContext = api.executionContext
163+
164+
var closed=false
165+
var readers = Queue[ContRead[A,_]] ()
166+
var writers = Queue[ContWrite[A,_]] ()
167+
var nRefs = 1
168+
169+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package gopher.channels
2+
3+
import gopher._
4+
import gopher.channels._
5+
import gopher.tags._
6+
7+
import org.scalatest._
8+
9+
import scala.language._
10+
import scala.concurrent._
11+
import scala.concurrent.duration._
12+
13+
class UnbufferedSelectSuite extends FunSuite
14+
{
15+
16+
import scala.concurrent.ExecutionContext.Implicits.global
17+
18+
19+
test("write without read must block ") {
20+
import gopherApi._
21+
val channel1 = makeChannel[Int](0)
22+
val w1 = channel1.awrite(1)
23+
24+
assert(!w1.isCompleted)
25+
26+
val r1 = channel1.aread
27+
28+
Await.ready(r1, 10 seconds)
29+
30+
assert(w1.isCompleted)
31+
assert(r1.isCompleted)
32+
33+
val rd = Await.result(r1, 10 seconds)
34+
35+
assert(rd==1)
36+
}
37+
38+
test("fold over selector with one-direction flow") {
39+
import gopherApi._
40+
for(i <- 1 to 100) {
41+
val ch = makeChannel[Int](0)
42+
val quit = Promise[Boolean]()
43+
val r = select.afold(0){ (x,s) =>
44+
s match {
45+
case a:ch.read => x+a
46+
case q:Boolean if (q==quit.future.read) => CurrentFlowTermination.exit(x)
47+
}
48+
}
49+
ch.awriteAll(1 to 10) onComplete { _ => quit success true }
50+
val sum = Await.result(r, 1 second)
51+
assert(sum==(1 to 10).sum)
52+
}
53+
}
54+
55+
56+
lazy val gopherApi = CommonTestObjects.gopherApi
57+
58+
}

0 commit comments

Comments
 (0)