Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion experiments/akkastream/src/main/resources/geardefault.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ gearpump.serializers {
"scala.collection.immutable.Map$Map2" = ""
}
akka {
version = "2.4.11"
version = "2.4.12"
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ object GearpumpMaterializer {
case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ =>
throw new IllegalArgumentException(
s"""
| context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
""".stripMargin
"\n context must be a ActorSystem or ActorContext, got [%s]\n ".
format(context.getClass.getName).stripMargin
)
}
system
Expand Down Expand Up @@ -136,6 +135,9 @@ class GearpumpMaterializer(override val system: ActorSystem,

override def logger: LoggingAdapter = Logging.getLogger(system, this)

override def makeLogger(logSource: Class[_]): LoggingAdapter =
Logging(system, logSource)

override def isShutdown: Boolean = system.whenTerminated.isCompleted

override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
Expand Down Expand Up @@ -175,10 +177,11 @@ class GearpumpMaterializer(override val system: ActorSystem,
ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
Nil)

// val info = Fusing.structuralInfo(runnableGraph, Attributes.none)
val info = Fusing.aggressive(runnableGraph).module.info
val graph = GGraph.empty[Module, Edge]

info.allModules.foreach(module => {
info.subModules.foreach(module => {
if (module.isCopied) {
val original = module.asInstanceOf[CopiedModule].copyOf
graph.addVertex(original)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Test extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override def main(akkaConf: Config, args: Array[String]): Unit = {
implicit val system = ActorSystem("Test", akkaConf)
implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
implicit val materializer = GearpumpMaterializer()

val echo = system.actorOf(Props(new Echo()))
val sink = Sink.actorRef(echo, "COMPLETE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.gearpump.akkastream.example

import akka.NotUsed
import akka.stream.{ClosedShape, ThrottleMode}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape, ThrottleMode}
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.Await
Expand All @@ -31,22 +31,32 @@ import scala.concurrent.duration._
* Stream example showing Conflate, Throttle
*/
object Test10 extends AkkaApp with ArgumentsParser {

// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
import akka.actor.ActorSystem
import akka.stream.scaladsl._

val config = parse(args)
implicit val system = ActorSystem("Test10", akkaConfig)
implicit val materializer = GearpumpMaterializer()
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}
implicit val ec = system.dispatcher


// Conflate[A] - (2 inputs, 1 output) concatenates two streams
// (first consumes one, then the second one)
def stream(x: String) = Stream.continually(x)

val sourceA = Source(stream("A"))
val sourceB = Source(stream("B"))
val sourceA = Source(List("A", "B", "C", "D"))
val sourceB = Source(List("E", "F", "G", "H"))

val throttler: Flow[String, String, NotUsed] =
Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.gearpump.akkastream.example

import akka.NotUsed
import akka.stream.ClosedShape
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.Await
Expand All @@ -32,14 +32,23 @@ import scala.concurrent.duration._
*/
object Test11 extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
import akka.actor.ActorSystem
import akka.stream.scaladsl._

val config = parse(args)
implicit val system = ActorSystem("Test11", akkaConfig)
implicit val materializer = GearpumpMaterializer()
// implicit val materializer =
// ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}
implicit val ec = system.dispatcher

val g = RunnableGraph.fromGraph(GraphDSL.create() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.gearpump.akkastream.example

import akka.stream.{ClosedShape, UniformFanInShape}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape, UniformFanInShape}
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.{Await, Future}
Expand All @@ -30,17 +30,26 @@ import scala.concurrent.{Await, Future}
*/
object Test12 extends AkkaApp with ArgumentsParser{
// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
import akka.actor.ActorSystem
import akka.stream.scaladsl._

import scala.concurrent.duration._

val config = parse(args)
implicit val system = ActorSystem("Test12", akkaConfig)
// implicit val materializer = ActorMaterializer(
// ActorMaterializerSettings(system).withAutoFusing(false)
// )
implicit val materializer = GearpumpMaterializer()
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}

implicit val ec = system.dispatcher

val pickMaxOfThree = GraphDSL.create() { implicit b =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gearpump.akkastream.example

import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.Await
import scala.concurrent.duration._

/**
* Stream example showing Interleave
* output should be 1, 2, 4, 5, 3, 6, 7
*/
object Test17 extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
implicit val system = ActorSystem("Test17", akkaConf)
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}
implicit val ec = system.dispatcher

val sinkActor = system.actorOf(Props(new SinkActor()))
val sink = Sink.actorRef(sinkActor, "COMPLETE")
Source(0 to 3).interleave(Source(4 to 6), 2).interleave(Source(7 to 11), 3).runWith(sink)

Await.result(system.whenTerminated, 60.minutes)
}

class SinkActor extends Actor {
def receive: Receive = {
case any: AnyRef =>
println("Confirm received: " + any)
}
}
// scalastyle:on println
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.gearpump.akkastream.example

import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.ClosedShape
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
import akka.stream.scaladsl._
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.Await
Expand All @@ -33,9 +33,21 @@ import scala.concurrent.duration._
*/
object Test5 extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
implicit val system = ActorSystem("Test5", akkaConf)
implicit val materializer = GearpumpMaterializer()
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}

val echo = system.actorOf(Props(new Echo()))
val source = Source(List(("male", "24"), ("female", "23")))
Expand All @@ -45,11 +57,9 @@ object Test5 extends AkkaApp with ArgumentsParser {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val unzip = b.add(Unzip[String, String]())
val sink1 = Sink.actorRef(echo, "COMPLETE")
val sink2 = Sink.actorRef(echo, "COMPLETE")
source ~> unzip.in
unzip.out0 ~> sink1
unzip.out1 ~> sink1
unzip.out0 ~> sink
unzip.out1 ~> sink
ClosedShape
}
).run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.apache.gearpump.akkastream.example

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
import org.apache.gearpump.akkastream.GearpumpMaterializer
import org.apache.gearpump.cluster.main.ArgumentsParser
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp

import scala.concurrent.Await
Expand All @@ -36,13 +37,25 @@ import scala.concurrent.duration._

object Test7 extends AkkaApp with ArgumentsParser {
// scalastyle:off println
override val options: Array[(String, CLIOption[Any])] = Array(
"gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
)

override def main(akkaConf: Config, args: Array[String]): Unit = {
val config = parse(args)
implicit val system = ActorSystem("Test7", akkaConf)
implicit val materializer = GearpumpMaterializer()
implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
case true =>
GearpumpMaterializer()
case false =>
ActorMaterializer(
ActorMaterializerSettings(system).withAutoFusing(false)
)
}
implicit val ec = system.dispatcher
val sourceA = Source(List(1))
val sourceB = Source(List(2))

val sourceA = Source(List(1, 2, 3, 4, 5))
val sourceB = Source(List(6, 7, 8, 9, 10))
val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))

val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

/**
* Stream example showing Broadcast
* Stream example showing Broadcast, Merge
*/
object Test9 extends AkkaApp with ArgumentsParser {
// scalastyle:off println
Expand All @@ -51,8 +51,8 @@ object Test9 extends AkkaApp with ArgumentsParser {
}
implicit val ec = system.dispatcher

val sinkActor = system.actorOf(Props(new SinkActor()))
val source = Source((1 to 5))
val sinkActor = system.actorOf(Props(new SinkActor()))
val sink = Sink.actorRef(sinkActor, "COMPLETE")
val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
x => println(s"processing broadcasted element : $x in flowA"); x
Expand Down
Loading