Skip to content

Commit 0d8d203

Browse files
committed
refactor to avoid code duplication between buffered and unbuffered channels.
1 parent ebc9ba2 commit 0d8d203

File tree

4 files changed

+138
-126
lines changed

4 files changed

+138
-126
lines changed

src/main/scala/gopher/channels/BufferedChannelActor.scala

Lines changed: 17 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ import gopher._
1010
/**
1111
* ChannelActor - actor, which leave
1212
*/
13-
class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Actor
13+
class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends ChannelActor[A](id,api)
1414
{
1515

16-
def receive = {
17-
case cw@ContWrite(_,_,ft) =>
18-
val cwa = cw.asInstanceOf[ContWrite[A,_]]
16+
17+
protected[this] def onContWrite(cwa: gopher.channels.ContWrite[A, _]): Unit =
18+
{
1919
if (closed) {
20-
ft.throwIfNotCompleted(new ChannelClosedException())
20+
cwa.flowTermination.throwIfNotCompleted(new ChannelClosedException())
2121
} else {
2222
if (nElements==capacity) {
2323
writers = writers :+ cwa
@@ -28,8 +28,10 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Act
2828
}
2929
}
3030
}
31-
case cr@ContRead(_,_,ft) =>
32-
val cra = cr.asInstanceOf[ContRead[A,_]]
31+
}
32+
33+
protected[this] def onContRead(cra: gopher.channels.ContRead[A, _]): Unit =
34+
{
3335
if (nElements==0) {
3436
if (closed) {
3537
processReaderClosed(cra)
@@ -46,22 +48,11 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Act
4648
}
4749
}
4850
}
49-
case ccr@ClosedChannelRead(_) =>
50-
self ! ccr.cont
51-
sender ! ChannelCloseProcessed(nElements)
52-
case ChannelClose =>
53-
closed=true
54-
stopIfEmpty
55-
case ChannelRefDecrement =>
56-
nRefs -= 1
57-
if (nRefs == 0) {
58-
stopAll
59-
}
60-
case ChannelRefIncrement =>
61-
nRefs += 1
62-
case GracefullChannelStop =>
63-
context.stop(self)
64-
}
51+
}
52+
53+
54+
protected[this] def getNElements(): Int = nElements;
55+
6556

6657
def processReaders() : Boolean =
6758
{
@@ -123,27 +114,13 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Act
123114
}
124115

125116

126-
private[this] def stopIfEmpty: Boolean =
117+
def stopIfEmpty: Boolean =
127118
{
128119
require(closed==true)
129120
if (nElements == 0) {
130-
while(!readers.isEmpty) {
131-
val reader = readers.head
132-
val c = reader.asInstanceOf[ContRead[A,reader.R]]
133-
readers = readers.tail
134-
c.function(c) foreach { f1 =>
135-
api.continue(f1(ContRead.ChannelClosed), c.flowTermination)
136-
}
137-
}
121+
stopReaders()
138122
}
139-
while(!writers.isEmpty) {
140-
val writer = writers.head
141-
val c = writer.asInstanceOf[ContWrite[A,writer.R]]
142-
writers = writers.tail
143-
c.function(c) foreach {
144-
f1 => c.flowTermination.throwIfNotCompleted(new ChannelClosedException())
145-
}
146-
}
123+
stopWriters()
147124
if (nElements == 0) {
148125
if (nRefs == 0) {
149126
// here we leave 'closed' channels in actor-system untile they will be
@@ -155,19 +132,6 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Act
155132
false
156133
}
157134

158-
def stopAll: Unit =
159-
{
160-
if (!closed) {
161-
closed=true
162-
}
163-
if (!stopIfEmpty) {
164-
// stop anyway
165-
self ! GracefullChannelStop
166-
}
167-
}
168-
169-
private[this] implicit def ec: ExecutionContext = api.executionContext
170-
171135
@inline
172136
private[this] def elementAt(i:Int): A =
173137
buffer(i).asInstanceOf[A]
@@ -182,10 +146,5 @@ class BufferedChannelActor[A](id:Long, capacity:Int, api: GopherAPI) extends Act
182146
var readIndex=0
183147
var writeIndex=0
184148
var nElements=0
185-
var closed=false
186-
var readers = Queue[ContRead[A,_]] ()
187-
var writers = Queue[ContWrite[A,_]] ()
188-
189-
var nRefs = 1
190149

191150
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package gopher.channels
2+
3+
import akka.actor._
4+
import scala.language._
5+
import scala.concurrent._
6+
import scala.collection.immutable._
7+
import gopher._
8+
9+
10+
/**
11+
* ChannelActor - actor, which leave
12+
*/
13+
abstract class ChannelActor[A](id:Long, api: GopherAPI) extends Actor
14+
{
15+
16+
def receive = {
17+
case cw@ContWrite(_,_,ft) =>
18+
val cwa = cw.asInstanceOf[ContWrite[A,cw.R]]
19+
onContWrite(cwa)
20+
case cr@ContRead(_,_,ft) =>
21+
val cra = cr.asInstanceOf[ContRead[A,cr.R]]
22+
onContRead(cra)
23+
case ccr@ClosedChannelRead(_) =>
24+
self ! ccr.cont
25+
sender ! ChannelCloseProcessed(getNElements())
26+
case ChannelClose =>
27+
closed=true
28+
stopIfEmpty
29+
case ChannelRefDecrement =>
30+
nRefs -= 1
31+
if (nRefs == 0) {
32+
stopAll
33+
}
34+
case ChannelRefIncrement =>
35+
nRefs += 1
36+
case GracefullChannelStop =>
37+
context.stop(self)
38+
}
39+
40+
protected[this] def onContWrite(cw:ContWrite[A,_]):Unit
41+
42+
protected[this] def onContRead(cr:ContRead[A,_]):Unit
43+
44+
protected[this] def getNElements():Int
45+
46+
protected[this] def processReaderClosed[B](reader:ContRead[A,B]): Boolean =
47+
reader.function(reader) match {
48+
case Some(f1) => api.continue(f1(ContRead.ChannelClosed), reader.flowTermination)
49+
true
50+
case None => false
51+
}
52+
53+
protected[this] def stopReaders(): Unit =
54+
{
55+
while(!readers.isEmpty) {
56+
val reader = readers.head
57+
val c = reader.asInstanceOf[ContRead[A,reader.R]]
58+
readers = readers.tail
59+
c.function(c) foreach { f1 =>
60+
api.continue(f1(ContRead.ChannelClosed), c.flowTermination)
61+
}
62+
}
63+
}
64+
65+
protected[this] def stopWriters(): Unit =
66+
{
67+
while(!writers.isEmpty) {
68+
val writer = writers.head
69+
val c = writer.asInstanceOf[ContWrite[A,writer.R]]
70+
writers = writers.tail
71+
c.function(c) foreach {
72+
f1 => c.flowTermination.throwIfNotCompleted(new ChannelClosedException())
73+
}
74+
}
75+
}
76+
77+
def stopIfEmpty: Boolean
78+
79+
def stopAll: Unit =
80+
{
81+
if (!closed) {
82+
closed=true
83+
}
84+
if (!stopIfEmpty) {
85+
// stop anyway
86+
self ! GracefullChannelStop
87+
}
88+
}
89+
90+
protected[this] implicit def ec: ExecutionContext = api.executionContext
91+
92+
protected[this] var closed=false
93+
var readers = Queue[ContRead[A,_]] ()
94+
var writers = Queue[ContWrite[A,_]] ()
95+
var nRefs = 1
96+
97+
}

src/main/scala/gopher/channels/Continuated.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ case class ContWrite[A,B](function: ContWrite[A,B] => Option[(A,Future[Continuat
9191
type El = A
9292
}
9393

94+
object ContWrite
95+
{
96+
97+
}
98+
9499
/**
95100
* skip (i.e. do some operation not related to reading or writing.)
96101
*/

src/main/scala/gopher/channels/UnbufferedChannelActor.scala

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -10,40 +10,26 @@ import gopher._
1010
/**
1111
* ChannelActor - actor, which leave
1212
*/
13-
class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends Actor
13+
class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends ChannelActor[A](id,api)
1414
{
1515

16-
def receive = {
17-
case cw@ContWrite(_,_,ft) =>
18-
val cwa = cw.asInstanceOf[ContWrite[A,_]]
19-
if (closed) {
20-
ft.throwIfNotCompleted(new ChannelClosedException())
21-
} else if (!processReaders(cwa)) {
22-
writers = writers :+ cwa
23-
}
24-
case cr@ContRead(_,_,ft) =>
25-
val cra = cr.asInstanceOf[ContRead[A,_]]
16+
protected[this] def onContWrite(cw:ContWrite[A,_]):Unit =
17+
if (closed) {
18+
cw.flowTermination.throwIfNotCompleted(new ChannelClosedException())
19+
} else if (!processReaders(cw)) {
20+
writers = writers :+ cw
21+
}
22+
23+
24+
protected[this] def onContRead(cr:ContRead[A,_]):Unit =
2625
if (closed) {
27-
processReaderClosed(cra)
28-
} else if (!processWriters(cra)) {
29-
readers = readers :+ cra;
26+
processReaderClosed(cr)
27+
} else if (!processWriters(cr)) {
28+
readers = readers :+ cr;
3029
}
31-
case ccr@ClosedChannelRead(_) =>
32-
self ! ccr.cont
33-
sender ! ChannelCloseProcessed(0)
34-
case ChannelClose =>
35-
closed=true
36-
stopIfEmpty
37-
case ChannelRefDecrement =>
38-
nRefs -= 1
39-
if (nRefs == 0) {
40-
stopAll
41-
}
42-
case ChannelRefIncrement =>
43-
nRefs += 1
44-
case GracefullChannelStop =>
45-
context.stop(self)
46-
}
30+
31+
protected[this] def getNElements():Int = 0
32+
4733

4834
def processReaders(w: ContWrite[A,_]) : Boolean =
4935
{
@@ -76,13 +62,6 @@ class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends Actor
7662
false
7763
}
7864

79-
private[this] def processReaderClosed[B](reader:ContRead[A,B]): Boolean =
80-
reader.function(reader) match {
81-
case Some(f1) => api.continue(f1(ContRead.ChannelClosed), reader.flowTermination)
82-
true
83-
case None => false
84-
}
85-
8665
def processWriters[C](reader:ContRead[A,C]): Boolean =
8766
{
8867
if (writers.isEmpty) {
@@ -121,25 +100,11 @@ class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends Actor
121100
}
122101

123102

124-
private[this] def stopIfEmpty: Boolean =
103+
def stopIfEmpty: Boolean =
125104
{
126105
require(closed==true)
127-
while(!readers.isEmpty) {
128-
val reader = readers.head
129-
val c = reader.asInstanceOf[ContRead[A,reader.R]]
130-
readers = readers.tail
131-
c.function(c) foreach { f1 =>
132-
api.continue(f1(ContRead.ChannelClosed), c.flowTermination)
133-
}
134-
}
135-
while(!writers.isEmpty) {
136-
val writer = writers.head
137-
val c = writer.asInstanceOf[ContWrite[A,writer.R]]
138-
writers = writers.tail
139-
c.function(c) foreach {
140-
f1 => c.flowTermination.throwIfNotCompleted(new ChannelClosedException())
141-
}
142-
}
106+
stopReaders()
107+
stopWriters()
143108
if (nRefs == 0) {
144109
// here we leave 'closed' channels in actor-system untile they will be
145110
// garbage-collected. TODO: think about actual stop ?
@@ -148,22 +113,8 @@ class UnbufferedChannelActor[A](id:Long, api: GopherAPI) extends Actor
148113
true
149114
}
150115

151-
def stopAll: Unit =
152-
{
153-
if (!closed) {
154-
closed=true
155-
}
156-
if (!stopIfEmpty) {
157-
// stop anyway
158-
self ! GracefullChannelStop
159-
}
160-
}
161116

162117
private[this] implicit def ec: ExecutionContext = api.executionContext
163118

164-
var closed=false
165-
var readers = Queue[ContRead[A,_]] ()
166-
var writers = Queue[ContWrite[A,_]] ()
167-
var nRefs = 1
168119

169120
}

0 commit comments

Comments
 (0)