Skip to content

Commit 4dc7df1

Browse files
committed
added EffectedChannel
1 parent c98483b commit 4dc7df1

File tree

5 files changed

+76
-24
lines changed

5 files changed

+76
-24
lines changed

src/main/scala/gopher/GopherAPI.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
4141
* channel.awrite(1 to 100)
4242
*}}}
4343
*/
44-
def makeChannel[A](capacity: Int = 0) =
44+
def makeChannel[A](capacity: Int = 0): Channel[A] =
4545
{
4646
require(capacity >= 0)
4747
val nextId = newChannelId
@@ -54,12 +54,14 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
5454
new ActorBackedChannel[A](futureChannelRef, this)
5555
}
5656

57-
def makeEffectedInput[A](in: Input[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single) =
57+
def makeEffectedInput[A](in: Input[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single): EffectedInput[A] =
5858
EffectedInput(in,threadingPolicy)
5959

6060
def makeEffectedOutput[A](out: Output[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single) =
6161
EffectedOutput(out,threadingPolicy)
6262

63+
def makeEffectedChannel[A](ch: Channel[A], threadingPolicy: ThreadingPolicy = ThreadingPolicy.Single) =
64+
EffectedChannel(ch,threadingPolicy)
6365

6466
/**
6567
* Represent Scala future as channel from which we can read one value.
@@ -72,6 +74,7 @@ class GopherAPI(as: ActorSystem, es: ExecutionContext)
7274
*/
7375
def iterableInput[A](iterable:Iterable[A]): Input[A] = Input.asInput(iterable, this)
7476

77+
7578
/**
7679
* create and start instance of transputer with given recovery policy.
7780
*@see gopher.Transputer

src/main/scala/gopher/channels/ActorBackedChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import scala.language.postfixOps
1111
import scala.reflect.macros.blackbox.Context
1212
import scala.reflect.api._
1313

14-
class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A](api)
14+
class ActorBackedChannel[A](futureChannelRef: Future[ActorRef], override val api: GopherAPI) extends Channel[A]
1515
{
1616

1717
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]], flwt: FlowTermination[B] ): Unit =

src/main/scala/gopher/channels/Channel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import scala.language.postfixOps
1111
import scala.reflect.macros.blackbox.Context
1212
import scala.reflect.api._
1313

14-
abstract class Channel[A](override val api: GopherAPI) extends InputOutput[A]
14+
trait Channel[A] extends InputOutput[A]
1515
{
1616

1717

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package gopher.channels
2+
3+
4+
import gopher._
5+
import gopher.util._
6+
import scala.concurrent._
7+
8+
trait EffectedChannel[A] extends Channel[A] with Effected[Channel[A]]
9+
{
10+
def asInput(): EffectedInput[A]
11+
def asOutput(): EffectedOutput[A]
12+
}
13+
14+
15+
object EffectedChannel
16+
{
17+
def apply[A](in: Channel[A], policy: ThreadingPolicy): EffectedChannel[A] =
18+
policy match {
19+
case ThreadingPolicy.Single => new SinglethreadedEffectedChannel(in)
20+
case ThreadingPolicy.Multi => new MultithreadedEffectedChannel(in)
21+
}
22+
}
23+
24+
25+
class SinglethreadedEffectedChannel[A](ch:Channel[A]) extends SinglethreadedEffected[Channel[A]](ch)
26+
with EffectedChannel[A]
27+
{
28+
29+
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit = v.cbread(f,ft)
30+
31+
def cbwrite[B](f: ContWrite[A,B] => Option[
32+
(A,Future[Continuated[B]])
33+
],
34+
ft: FlowTermination[B]): Unit = v.cbwrite(f,ft)
35+
36+
def close() = v.close()
37+
38+
def asInput() = api.makeEffectedInput(v, ThreadingPolicy.Single)
39+
40+
def asOutput() = api.makeEffectedOutput(v, ThreadingPolicy.Single)
41+
42+
def api: GopherAPI = v.api
43+
44+
}
45+
46+
class MultithreadedEffectedChannel[A](ch:Channel[A]) extends MultithreadedEffected[Channel[A]](ch)
47+
with EffectedChannel[A]
48+
{
49+
50+
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit = v.get().cbread(f,ft)
51+
52+
def cbwrite[B](f: ContWrite[A,B] => Option[
53+
(A,Future[Continuated[B]])
54+
],
55+
ft: FlowTermination[B]): Unit = v.get().cbwrite(f,ft)
56+
57+
def close() = v.get().close()
58+
59+
def asInput() = api.makeEffectedInput(v.get(), ThreadingPolicy.Multi)
60+
61+
def asOutput() = api.makeEffectedOutput(v.get(), ThreadingPolicy.Multi)
62+
63+
64+
def api: GopherAPI = v.get().api
65+
66+
}
67+
68+

src/test/scala/example/BroadcasterSuite.scala

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Broadcaster[A]
2525
val sendc: Channel[A] = makeChannel()
2626
val quitc: Channel[Boolean] = makeChannel()
2727

28-
select.afold(makeChannel[Message[A]]()) { (last,s) =>
28+
val process = select.afold(makeChannel[Message[A]]()) { (last,s) =>
2929
s match {
3030
case v: sendc.read @unchecked =>
3131
val next = makeChannel[Message[A]]()
@@ -40,25 +40,6 @@ class Broadcaster[A]
4040
}
4141

4242

43-
/*
44-
go {
45-
var last = makeChannel[Message[A]]()
46-
for (s <- select.forever) {
47-
s match {
48-
case v: sendc.read @ unchecked =>
49-
val c = makeChannel[Message[A]](1)
50-
val b = ValueMessage(c,v)
51-
last <~ b
52-
last = c
53-
case r: listenc.read @ unchecked =>
54-
r <~ last
55-
case q: quitc.read =>
56-
last write EndMessage
57-
implicitly[FlowTermination[Unit]].doExit(())
58-
}
59-
}
60-
}
61-
*/
6243

6344
def alisten(): Future[Receiver[A]] = go {
6445
val c = makeChannel[Channel[Message[A]]]()

0 commit comments

Comments
 (0)