Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.

Commit c6906f5

Browse files
committed
working now
1 parent 1f8f3a2 commit c6906f5

File tree

11 files changed

+161
-61
lines changed

11 files changed

+161
-61
lines changed

experiments/akkastream/src/main/resources/geardefault.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ gearpump.serializers {
44
"scala.collection.immutable.Map$Map2" = ""
55
}
66
akka {
7-
version = "2.4.11"
7+
version = "2.4.12"
88
}

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ class GearpumpMaterializer(override val system: ActorSystem,
135135

136136
override def logger: LoggingAdapter = Logging.getLogger(system, this)
137137

138+
override def makeLogger(logSource: Class[_]): LoggingAdapter =
139+
Logging(system, logSource)
140+
138141
override def isShutdown: Boolean = system.whenTerminated.isCompleted
139142

140143
override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.gearpump.akkastream.example
2020

2121
import akka.NotUsed
22-
import akka.stream.{ClosedShape, ThrottleMode}
22+
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape, ThrottleMode}
2323
import org.apache.gearpump.akkastream.GearpumpMaterializer
24-
import org.apache.gearpump.cluster.main.ArgumentsParser
24+
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
2525
import org.apache.gearpump.util.AkkaApp
2626

2727
import scala.concurrent.Await
@@ -31,22 +31,32 @@ import scala.concurrent.duration._
3131
* Stream example showing Conflate, Throttle
3232
*/
3333
object Test10 extends AkkaApp with ArgumentsParser {
34-
3534
// scalastyle:off println
35+
override val options: Array[(String, CLIOption[Any])] = Array(
36+
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
37+
)
38+
3639
override def main(akkaConf: Config, args: Array[String]): Unit = {
3740
import akka.actor.ActorSystem
3841
import akka.stream.scaladsl._
39-
42+
val config = parse(args)
4043
implicit val system = ActorSystem("Test10", akkaConfig)
41-
implicit val materializer = GearpumpMaterializer()
44+
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
45+
case true =>
46+
GearpumpMaterializer()
47+
case false =>
48+
ActorMaterializer(
49+
ActorMaterializerSettings(system).withAutoFusing(false)
50+
)
51+
}
4252
implicit val ec = system.dispatcher
4353

54+
4455
// Conflate[A] - (2 inputs, 1 output) concatenates two streams
4556
// (first consumes one, then the second one)
46-
def stream(x: String) = Stream.continually(x)
4757

48-
val sourceA = Source(stream("A"))
49-
val sourceB = Source(stream("B"))
58+
val sourceA = Source(List("A", "B", "C", "D"))
59+
val sourceB = Source(List("E", "F", "G", "H"))
5060

5161
val throttler: Flow[String, String, NotUsed] =
5262
Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
package org.apache.gearpump.akkastream.example
2020

21-
import akka.stream.{ClosedShape, UniformFanInShape}
21+
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape, UniformFanInShape}
2222
import org.apache.gearpump.akkastream.GearpumpMaterializer
23-
import org.apache.gearpump.cluster.main.ArgumentsParser
23+
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
2424
import org.apache.gearpump.util.AkkaApp
2525

2626
import scala.concurrent.{Await, Future}
@@ -30,17 +30,26 @@ import scala.concurrent.{Await, Future}
3030
*/
3131
object Test12 extends AkkaApp with ArgumentsParser{
3232
// scalastyle:off println
33+
override val options: Array[(String, CLIOption[Any])] = Array(
34+
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
35+
)
36+
3337
override def main(akkaConf: Config, args: Array[String]): Unit = {
3438
import akka.actor.ActorSystem
3539
import akka.stream.scaladsl._
3640

3741
import scala.concurrent.duration._
38-
42+
val config = parse(args)
3943
implicit val system = ActorSystem("Test12", akkaConfig)
40-
// implicit val materializer = ActorMaterializer(
41-
// ActorMaterializerSettings(system).withAutoFusing(false)
42-
// )
43-
implicit val materializer = GearpumpMaterializer()
44+
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
45+
case true =>
46+
GearpumpMaterializer()
47+
case false =>
48+
ActorMaterializer(
49+
ActorMaterializerSettings(system).withAutoFusing(false)
50+
)
51+
}
52+
4453
implicit val ec = system.dispatcher
4554

4655
val pickMaxOfThree = GraphDSL.create() { implicit b =>

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
package org.apache.gearpump.akkastream.example
2020

2121
import akka.actor.{Actor, ActorSystem, Props}
22-
import akka.stream.ClosedShape
22+
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
2323
import akka.stream.scaladsl._
2424
import org.apache.gearpump.akkastream.GearpumpMaterializer
25-
import org.apache.gearpump.cluster.main.ArgumentsParser
25+
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
2626
import org.apache.gearpump.util.AkkaApp
2727

2828
import scala.concurrent.Await
@@ -33,9 +33,21 @@ import scala.concurrent.duration._
3333
*/
3434
object Test5 extends AkkaApp with ArgumentsParser {
3535
// scalastyle:off println
36+
override val options: Array[(String, CLIOption[Any])] = Array(
37+
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
38+
)
39+
3640
override def main(akkaConf: Config, args: Array[String]): Unit = {
41+
val config = parse(args)
3742
implicit val system = ActorSystem("Test5", akkaConf)
38-
implicit val materializer = GearpumpMaterializer()
43+
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
44+
case true =>
45+
GearpumpMaterializer()
46+
case false =>
47+
ActorMaterializer(
48+
ActorMaterializerSettings(system).withAutoFusing(false)
49+
)
50+
}
3951

4052
val echo = system.actorOf(Props(new Echo()))
4153
val source = Source(List(("male", "24"), ("female", "23")))
@@ -45,11 +57,9 @@ object Test5 extends AkkaApp with ArgumentsParser {
4557
GraphDSL.create() { implicit b =>
4658
import GraphDSL.Implicits._
4759
val unzip = b.add(Unzip[String, String]())
48-
val sink1 = Sink.actorRef(echo, "COMPLETE")
49-
val sink2 = Sink.actorRef(echo, "COMPLETE")
5060
source ~> unzip.in
51-
unzip.out0 ~> sink1
52-
unzip.out1 ~> sink1
61+
unzip.out0 ~> sink
62+
unzip.out1 ~> sink
5363
ClosedShape
5464
}
5565
).run()

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
package org.apache.gearpump.akkastream.example
2020

2121
import akka.actor.ActorSystem
22+
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
2223
import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
2324
import org.apache.gearpump.akkastream.GearpumpMaterializer
24-
import org.apache.gearpump.cluster.main.ArgumentsParser
25+
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
2526
import org.apache.gearpump.util.AkkaApp
2627

2728
import scala.concurrent.Await
@@ -36,13 +37,25 @@ import scala.concurrent.duration._
3637

3738
object Test7 extends AkkaApp with ArgumentsParser {
3839
// scalastyle:off println
40+
override val options: Array[(String, CLIOption[Any])] = Array(
41+
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
42+
)
43+
3944
override def main(akkaConf: Config, args: Array[String]): Unit = {
45+
val config = parse(args)
4046
implicit val system = ActorSystem("Test7", akkaConf)
41-
implicit val materializer = GearpumpMaterializer()
47+
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
48+
case true =>
49+
GearpumpMaterializer()
50+
case false =>
51+
ActorMaterializer(
52+
ActorMaterializerSettings(system).withAutoFusing(false)
53+
)
54+
}
4255
implicit val ec = system.dispatcher
43-
44-
val sourceA = Source(List(1))
45-
val sourceB = Source(List(2))
56+
57+
val sourceA = Source(List(1, 2, 3, 4, 5))
58+
val sourceB = Source(List(6, 7, 8, 9, 10))
4659
val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
4760

4861
val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import scala.concurrent.{Await, Future}
3030
import scala.concurrent.duration._
3131

3232
/**
33-
* Stream example showing Broadcast
33+
* Stream example showing Broadcast, Merge
3434
*/
3535
object Test9 extends AkkaApp with ArgumentsParser {
3636
// scalastyle:off println

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
3232
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
3333
import akka.stream.impl.fusing.{Map => _, _}
3434
import akka.stream.impl.io.{TLSActor, TlsModule}
35-
import akka.stream.scaladsl.{GraphDSL, Keep, ModuleExtractor, RunnableGraph}
35+
import akka.stream.scaladsl.ModuleExtractor
3636
import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
3737
import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
3838
import org.apache.gearpump.akkastream.module.ReduceModule
3939
import org.apache.gearpump.akkastream.util.MaterializedValueOps
40-
import org.reactivestreams.{Publisher, Subscriber}
40+
import org.reactivestreams.{Processor, Publisher, Subscriber}
4141

4242
import scala.concurrent.ExecutionContextExecutor
4343
import scala.concurrent.duration.FiniteDuration
@@ -63,6 +63,9 @@ case class LocalMaterializerImpl (
6363

6464
override def logger: LoggingAdapter = Logging.getLogger(system, this)
6565

66+
override def makeLogger(logSource: Class[_]): LoggingAdapter =
67+
Logging(system, logSource)
68+
6669
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration,
6770
task: Runnable): Cancellable =
6871
system.scheduler.schedule(initialDelay, interval, task)(executionContext)
@@ -105,7 +108,7 @@ case class LocalMaterializerImpl (
105108
effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
106109

107110
def newMaterializationContext() =
108-
new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
111+
MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
109112
stageName(effectiveAttributes))
110113
atomic match {
111114
case sink: SinkModule[_, _] =>
@@ -117,7 +120,7 @@ case class LocalMaterializerImpl (
117120
assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
118121
matVal.put(atomic, mat)
119122
case stage: ProcessorModule[_, _, _] =>
120-
val (processor, mat) = stage.createProcessor()
123+
val (processor: Processor[_, _], mat) = stage.createProcessor()
121124
assignPort(stage.inPort, processor)
122125
assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
123126
matVal.put(atomic, mat)
@@ -201,7 +204,13 @@ case class LocalMaterializerImpl (
201204
})
202205
graph.edges.foreach(value => {
203206
val (node1, edge, node2) = value
204-
moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
207+
val to = moduleInProgress.upstreams.contains(edge.to)
208+
val from = moduleInProgress.downstreams.contains(edge.from)
209+
!to && !from match {
210+
case true =>
211+
moduleInProgress = moduleInProgress.wire (edge.from, edge.to)
212+
case false =>
213+
}
205214
})
206215

207216
moduleInProgress
@@ -214,26 +223,9 @@ case class LocalMaterializerImpl (
214223
val session = LocalMaterializerSession(topLevelModule, null, null)
215224
import scala.collection.JavaConverters._
216225
val matV = inputMatValues.asJava
217-
val materializedGraph = graph.mapVertex { module =>
226+
graph.vertices.foreach(module => {
218227
session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
219-
matV.get(module)
220-
}
221-
materializedGraph.edges.foreach { nodeEdgeNode =>
222-
val (node1, edge, node2) = nodeEdgeNode
223-
val from = edge.from
224-
val to = edge.to
225-
node1 match {
226-
case module1: Module =>
227-
node2 match {
228-
case module2: Module =>
229-
val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]]
230-
val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]]
231-
publisher.subscribe(subscriber)
232-
case _ =>
233-
}
234-
case _ =>
235-
}
236-
}
228+
})
237229
val matValSources = graph.vertices.flatMap(module => {
238230
val rt: Option[MaterializedValueSource[_]] = module match {
239231
case graphStage: GraphStageModule =>
@@ -255,7 +247,7 @@ case class LocalMaterializerImpl (
255247
private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]],
256248
matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
257249
modules.foreach { source =>
258-
Option(source.computation).map { attr =>
250+
Option(source.computation).foreach { attr =>
259251
val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
260252
source.setValue(valueToPublish)
261253
}

experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import akka.stream.impl.Timers._
2424
import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource}
2525
import akka.stream.impl.fusing.{Map => FMap, _}
2626
import akka.stream.impl.io.IncomingConnectionStage
27-
import akka.stream.impl.{HeadOptionStage, Stages, Throttle}
27+
import akka.stream.impl.{ProcessorModule => _, Unzip => _, _}
2828
import akka.stream.scaladsl._
2929
import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
3030
import akka.stream.stage.GraphStage
@@ -326,7 +326,12 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
326326
conf.withValue(
327327
Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
328328
case zip: Zip[_, _] =>
329-
zipWithOp(zip.zipper, conf)
329+
// zipWithOp(zip.zipper, conf)
330+
ProcessorOp(classOf[ZipTask[_, _]],
331+
parallelism,
332+
conf.withValue(
333+
ZipTask.ZIP_FUNCTION, ZipTask.ZipFunction(zip.zipper)
334+
), "zip")
330335
case zipWith2: ZipWith2[_, _, _] =>
331336
ProcessorOp(classOf[Zip2Task[_, _, _]],
332337
parallelism,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.gearpump.akkastream.task
20+
21+
import org.apache.gearpump.Message
22+
import org.apache.gearpump.akkastream.task.ZipTask.ZipFunction
23+
import org.apache.gearpump.cluster.UserConfig
24+
import org.apache.gearpump.streaming.task.TaskContext
25+
26+
class ZipTask[A1, A2](context: TaskContext, userConf : UserConfig)
27+
extends GraphTask(context, userConf) {
28+
29+
val zip = userConf.
30+
getValue[ZipFunction[A1, A2]](ZipTask.ZIP_FUNCTION)(context.system).get.zip
31+
var a1: Option[A1] = None
32+
var a2: Option[A2] = None
33+
34+
override def onNext(msg : Message) : Unit = {
35+
val message = msg.msg
36+
val time = msg.timestamp
37+
a1 match {
38+
case Some(x) =>
39+
a2 = Some(message.asInstanceOf[A2])
40+
a1.foreach(v1 => {
41+
a2.foreach(v2 => {
42+
val (w1, w2) = zip(v1, v2)
43+
context.output(Message((w1, w2), time))
44+
45+
})
46+
})
47+
case None =>
48+
a1 = Some(message.asInstanceOf[A1])
49+
}
50+
}
51+
}
52+
53+
object ZipTask {
54+
case class ZipFunction[A1, A2](zip: (A1, A2) => (A1, A2)) extends Serializable
55+
56+
val ZIP_FUNCTION = "org.apache.gearpump.akkastream.task.zip.function"
57+
}

0 commit comments

Comments
 (0)