Skip to content

Commit 5daf874

Browse files
committed
prepare infrastructure for use channels as part of state in select fold.
1 parent d3c7863 commit 5daf874

File tree

13 files changed

+300
-74
lines changed

13 files changed

+300
-74
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"
1818

1919
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
2020

21-
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.7"
21+
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.8"
2222

2323
//TODO: enable after 1.0
2424
//libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "0.9"

src/main/scala/gopher/channels/Channel.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,34 @@ import scala.reflect.api._
1414
trait Channel[A] extends InputOutput[A]
1515
{
1616

17+
thisChannel =>
18+
1719
def close(): Unit
1820

21+
// override some operations
22+
23+
override def filter(p:A=>Boolean): Channel[A] =
24+
{
25+
val filteredInput = super.filter(p)
26+
new Channel[A] {
27+
def cbread[B](f:
28+
ContRead[A,B]=>Option[
29+
ContRead.In[A]=>Future[Continuated[B]]
30+
],
31+
ft: FlowTermination[B]): Unit = filteredInput.cbread(f,ft)
32+
33+
def cbwrite[B](f: ContWrite[A,B] => Option[
34+
(A,Future[Continuated[B]])
35+
],
36+
ft: FlowTermination[B]): Unit =
37+
thisChannel.cbwrite(f,ft) // TODO: optimize by filteredOutput.cbwrite() ?
38+
39+
def api = thisChannel.api
40+
41+
def close() = thisChannel.close()
42+
}
43+
}
44+
1945
}
2046

2147
object Channel

src/main/scala/gopher/channels/Continuated.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ object ContRead
8080
Option[In[A] => Future[Continuated[B]]] =
8181
prev.function(prev) map (f1 => liftInValue(prev) { v => fn(v,f1) } )
8282

83+
type Aux[A,B] = ContRead[A,B]{ type El=A
84+
type S=B
85+
type F = ContRead[A,B]=>Option[ContRead.In[A]=>Future[Continuated[B]]]
86+
}
8387
}
8488

8589

src/main/scala/gopher/channels/EffectedChannel.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class SinglethreadedEffectedChannel[A](ch:Channel[A]) extends SinglethreadedEffe
4141

4242
def api: GopherAPI = v.api
4343

44+
override def filter(p:A=>Boolean):Channel[A] = new SinglethreadedEffectedChannel(v.filter(p))
45+
4446
}
4547

4648
class MultithreadedEffectedChannel[A](ch:Channel[A]) extends MultithreadedEffected[Channel[A]](ch)

src/main/scala/gopher/channels/EffectedInput.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,20 @@ import gopher._
44
import gopher.util._
55
import scala.concurrent._
66

7+
import scala.collection.mutable.{HashSet => MutableHashSet}
8+
import java.util.concurrent.{ConcurrentHashMap=>JavaConcurrentHashMap}
9+
710
trait EffectedInput[A] extends Input[A] with Effected[Input[A]]
11+
{
12+
13+
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit = {
14+
val sv = current
15+
sv.cbread((cr:ContRead[A,B]) => if (sv==current) f(cr.copy(channel=this)) else None, ft)
16+
}
17+
18+
def api: GopherAPI = current.api
19+
20+
}
821

922

1023
object EffectedInput
@@ -18,20 +31,6 @@ object EffectedInput
1831

1932
class SinglethreadedEffectedInput[A](in:Input[A]) extends SinglethreadedEffected[Input[A]](in)
2033
with EffectedInput[A]
21-
{
22-
23-
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit = v.cbread(f,ft)
24-
25-
def api: GopherAPI = v.api
26-
27-
}
2834

2935
class MultithreadedEffectedInput[A](in:Input[A]) extends MultithreadedEffected[Input[A]](in)
3036
with EffectedInput[A]
31-
{
32-
33-
def cbread[B](f: ContRead[A,B] => Option[ContRead.In[A] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit = v.get().cbread(f,ft)
34-
35-
def api: GopherAPI = v.get().api
36-
37-
}

src/main/scala/gopher/channels/EffectedOutput.scala

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ import gopher.util._
55
import scala.concurrent._
66

77
trait EffectedOutput[A] extends Effected[Output[A]] with Output[A]
8+
{
9+
10+
def cbwrite[B](f: ContWrite[A,B] => Option[
11+
(A,Future[Continuated[B]])
12+
],
13+
ft: FlowTermination[B]): Unit = {
14+
val sv = current
15+
sv.cbwrite[B](cw => if (current eq sv) f(cw.copy(channel=this)) else None,ft)
16+
}
17+
18+
def api: GopherAPI = current.api
19+
20+
}
821

922
object EffectedOutput
1023
{
@@ -17,31 +30,9 @@ object EffectedOutput
1730

1831
class SinglethreadedEffectedOutput[A](out:Output[A]) extends SinglethreadedEffected[Output[A]](out)
1932
with EffectedOutput[A]
20-
{
21-
22-
def cbwrite[B](f: ContWrite[A,B] => Option[
23-
(A,Future[Continuated[B]])
24-
],
25-
ft: FlowTermination[B]): Unit = v.cbwrite(f,ft)
26-
27-
28-
def api: GopherAPI = v.api
29-
30-
}
3133

3234
class MultithreadedEffectedOutput[A](out:Output[A]) extends MultithreadedEffected[Output[A]](out)
3335
with EffectedOutput[A]
34-
{
35-
36-
def cbwrite[B](f: ContWrite[A,B] => Option[
37-
(A,Future[Continuated[B]])
38-
],
39-
ft: FlowTermination[B]): Unit = v.get().cbwrite(f,ft)
40-
41-
42-
def api: GopherAPI = v.get().api
43-
44-
}
4536

4637

4738

0 commit comments

Comments
 (0)