Skip to content

Commit c5ad64c

Browse files
committed
added iddle for fold
1 parent 7387748 commit c5ad64c

File tree

6 files changed

+53
-4
lines changed

6 files changed

+53
-4
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ resolvers += Resolver.sonatypeRepo("snapshots")
99

1010
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
1111

12-
scalacOptions ++= Seq("-unchecked","-deprecation", "-feature" /* , "-Ymacro-debug-lite" , "-Ydebug" , "-Ylog:lambdalift" */ )
12+
scalacOptions ++= Seq("-unchecked","-deprecation", "-feature" /* , "-Ymacro-debug-lite" , "-Ydebug" , "-Ylog:lambdalift" */ )
1313

1414
libraryDependencies <+= scalaVersion( "org.scala-lang" % "scala-reflect" % _ )
1515

notes/0.99.7.markdown

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
- implemented EffectedInput/EffectedOutput
66
(more detailed about fold over selected and Effected pattern can
77
be readed at https://github.com/rssh/notes/blob/master/2016_03_05_see-ma-no-vars.md )
8-
- akka 2.4.2
8+
- akka 2.4.2, scala-2.11.8
99
- channels by defaults now unbuffered. (as in Go)
1010

1111

notes/papers.markdown

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,7 @@ CSP for scala: implementation, papers, ect ..
1616

1717
* Minimalistics implementation of clojure core-async in scala: http://blog.podsnap.com/scasync.html
1818

19+
* implementation over hawtdipath: https://github.com/oplohmann/CoSelect.Scala
20+
1921
------------------
2022
part of [scala-gopher](https://github.com/rssh/scala-gopher)

src/main/scala/gopher/channels/FoldSelectorBuilder.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ trait FoldSelectorBuilder[T] extends SelectorBuilder[T]
3333
def writingWithFlowTerminationAsync[A](ch:Output[A], x: =>A, f: (ExecutionContext, FlowTermination[T], A) => Future[T] ): this.type =
3434
withWriter[A](ch, { cw => Some(x,f(ec,cw.flowTermination, x) map Function.const(cw)) } )
3535

36+
def idle(body:T): FoldSelectorBuilder[T] =
37+
macro SelectorBuilder.idleImpl[T,FoldSelectorBuilder[T]]
3638

39+
@inline
40+
def idleWithFlowTerminationAsync[A](f: (ExecutionContext, FlowTermination[T]) => Future[T] ): this.type =
41+
withIdle{ st => Some(f(ec,st.flowTermination) map Function.const(st)) }
3742

3843
}
3944

@@ -239,6 +244,8 @@ class FoldSelectorBuilderImpl(val c:Context)
239244
c.abort(cd.pat.pos,"x:channel.read or x:channel.write form is required")
240245
}
241246
}
247+
case Ident(TermName("_")) => (cd.pat, cd.guard)
248+
case _ => c.abort(cd.pat.pos,"expected Bind or Default in pattern, have:"+cd.pat)
242249
}
243250
atPos(cd.pos)(CaseDef(pat,substProj(fp,guard),preTransformCaseDefBody(fp,patSymbol,cd.body)))
244251
}else{

src/test/scala/gopher/channels/InputOpsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ class InputOpsSuite extends FunSuite with AsyncAssertions {
259259

260260
test("append for finite stream") {
261261
val w = new Waiter
262-
val ch1 = gopherApi.makeChannel[Int]()
263-
val ch2 = gopherApi.makeChannel[Int]()
262+
val ch1 = gopherApi.makeChannel[Int](10)
263+
val ch2 = gopherApi.makeChannel[Int](10)
264264
val appended = ch1 append ch2
265265
var sum = 0
266266
var prev = 0

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,46 @@ class MacroSelectSuite extends FunSuite
219219
}
220220
}
221221

222+
test("fold over selector with idle") {
223+
import gopherApi._
224+
val ch1 = makeChannel[Int](10)
225+
val ch2 = makeChannel[Int](10)
226+
ch1.awrite(1)
227+
val sf = select.afold((0,0,0)){ case ((n1,n2,nIdle),s) =>
228+
s match {
229+
case x:ch1.read =>
230+
val nn1 = n1+1
231+
if (nn1 > 100) {
232+
CurrentFlowTermination.exit((nn1,n2,nIdle))
233+
}else{
234+
ch2.write(x)
235+
(nn1,n2,nIdle)
236+
}
237+
case x:ch2.read =>
238+
ch1.write(x)
239+
(n1,n2+1,nIdle)
240+
case _ =>
241+
(n1,n2,nIdle+1)
242+
243+
}
244+
}
245+
val (n1,n2,ni) = Await.result(sf, 10 seconds)
246+
assert (n1+n2+ni > 100)
247+
val sf2 = select.afold((0,0)){ case ((n1,nIdle),s) =>
248+
s match {
249+
case x:ch1.read =>
250+
(n1+1,nIdle)
251+
case _ =>
252+
val nni = nIdle+1
253+
if (nni > 3) {
254+
CurrentFlowTermination.exit((n1,nni))
255+
} else {
256+
(n1,nni)
257+
}
258+
} }
259+
val (n21,n2i) = Await.result(sf2, 10 seconds)
260+
assert(n2i>3)
261+
}
222262

223263
lazy val gopherApi = CommonTestObjects.gopherApi
224264

0 commit comments

Comments
 (0)