Skip to content

Commit 53df7df

Browse files
committed
added broadcast to master
1 parent f0c4e8a commit 53df7df

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package example.broadcast
2+
3+
/**
4+
* code from
5+
* Concurrent Idioms #1: Broadcasting values in Go with linked channels.
6+
* https://rogpeppe.wordpress.com/2009/12/01/concurrent-idioms-1-broadcasting-values-in-go-with-linked-channels/
7+
*/
8+
9+
import scala.concurrent.{Channel=>_,_}
10+
import scala.concurrent.ExecutionContext.Implicits.global
11+
import scala.async.Async._
12+
13+
import gopher._
14+
import gopher.channels._
15+
import CommonTestObjects.gopherApi._
16+
17+
import org.scalatest._
18+
19+
20+
class Broadcaster[A]
21+
{
22+
import Broadcaster._
23+
24+
val listenc: Channel[Channel[Channel[Message[A]]]] = makeChannel()
25+
val sendc: Channel[A] = makeChannel()
26+
val quitc: Channel[Boolean] = makeChannel()
27+
28+
/*
29+
select.fold(makeChannel[Message[A]]) { (last,s) =>
30+
s match {
31+
case v: sendc.read =>
32+
val next = makeChannel[Message[A]]
33+
last <~ Message(next,v)
34+
next
35+
case r: listenc.read =>
36+
r <~ last
37+
last
38+
case q: quitc.read =>
39+
implicitly[FlowTermination[Unit]].doExit(())
40+
}
41+
}
42+
*/
43+
44+
go {
45+
var last = makeChannel[Message[A]]()
46+
for (s <- select.forever) {
47+
s match {
48+
case v: sendc.read @ unchecked =>
49+
val c = makeChannel[Message[A]](1)
50+
val b = ValueMessage(c,v)
51+
last <~ b
52+
last = c
53+
case r: listenc.read @ unchecked =>
54+
r <~ last
55+
case q: quitc.read =>
56+
last write EndMessage
57+
implicitly[FlowTermination[Unit]].doExit(())
58+
}
59+
}
60+
}
61+
62+
def alisten(): Future[Receiver[A]] = go {
63+
val c = makeChannel[Channel[Message[A]]]()
64+
listenc <~ c
65+
new Receiver(c.read)
66+
}
67+
68+
69+
def write(a: A) = sendc.awrite(a)
70+
71+
def stop() = quitc.awrite(true)
72+
73+
}
74+
75+
76+
object Broadcaster {
77+
78+
import language.experimental.macros
79+
import scala.reflect.macros.blackbox.Context
80+
import scala.reflect.api._
81+
82+
class Receiver[A](var c: Channel[Message[A]])
83+
{
84+
85+
/**
86+
* return Some(a) when broadcaster is not closed; None when closed.
87+
* (this is logic from original Go example, where
88+
* 'T' in Go is equilend to Option[T] in Scala [Go nil ~ Scala None])
89+
* In real life, interface will be better.
90+
**/
91+
def aread():Future[Option[A]] = go {
92+
val b = c.read
93+
c.write(b)
94+
b match {
95+
case ValueMessage(ch,v) =>
96+
c = ch
97+
Some(v)
98+
case EndMessage =>
99+
None
100+
}
101+
}
102+
103+
def read():Option[A] = macro Receiver.readImpl[A]
104+
105+
}
106+
107+
object Receiver
108+
{
109+
def readImpl[A](c:Context)():c.Expr[Option[A]]=
110+
{
111+
import c.universe._
112+
awaitImpl[Option[A]](c)(c.Expr[Future[Option[A]]](q"${c.prefix}.aread()"))
113+
}
114+
}
115+
116+
sealed trait Message[+A]
117+
case class ValueMessage[A](ch: Channel[Message[A]],v:A) extends Message[A]
118+
case object EndMessage extends Message[Nothing]
119+
120+
}
121+
122+
123+
class BroadcaseSuite extends FunSuite
124+
{
125+
126+
def listen[A](r: Broadcaster.Receiver[A]): Unit = go {
127+
val x = await(r.aread)
128+
129+
}
130+
131+
test("broadcast") {
132+
133+
val b = new Broadcaster[Int]()
134+
135+
136+
}
137+
138+
}
139+
140+
141+

0 commit comments

Comments
 (0)