diff --git a/contrib/src/main/scala/akka/stream/contrib/EitherFlow.scala b/contrib/src/main/scala/akka/stream/contrib/EitherFlow.scala new file mode 100644 index 00000000..091814d5 --- /dev/null +++ b/contrib/src/main/scala/akka/stream/contrib/EitherFlow.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.stream.contrib + +import akka.NotUsed +import akka.stream.FlowShape +import akka.stream.scaladsl.{ Broadcast, Flow, GraphDSL, Merge } + +object EitherFlow { + /** + * Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value + * + * @param leftFlow: flow used for left values + * @param rightFlow: flow used for right values + * @return merge flow from either leftFlow or rightFlow + */ + def either[L, R, Result]( + leftFlow: Flow[L, Result, NotUsed], + rightFlow: Flow[R, Result, NotUsed]): Flow[Either[L, R], Result, NotUsed] = + Flow.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val eitherIn = b.add(Broadcast[Either[L, R]](2)) + val result = b.add(Merge[Result](2)) + + eitherIn.collect { case Left(v) => v } ~> leftFlow ~> result + eitherIn.collect { case Right(v) => v } ~> rightFlow ~> result + + FlowShape(eitherIn.in, result.out) + }) +} diff --git a/contrib/src/test/scala/akka/stream/contrib/EitherViaSpec.scala b/contrib/src/test/scala/akka/stream/contrib/EitherViaSpec.scala new file mode 100644 index 00000000..74fbfdb1 --- /dev/null +++ b/contrib/src/test/scala/akka/stream/contrib/EitherViaSpec.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.stream.contrib + +import akka.NotUsed +import akka.stream.scaladsl.{ Flow, Keep, Source } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } + +class EitherViaSpec extends BaseStreamSpec { + override protected def autoFusing: Boolean = true + + val rightFlow: Flow[Int, String, NotUsed] = Flow[Int].map(_.toString) + val leftFlow: Flow[Throwable, String, NotUsed] = Flow[Throwable].map(_.getMessage) + + def asEither(i: Int): Either[Exception, Int] = if (i % 2 == 0) Right(i) else Left(new RuntimeException(s"ups, $i is odd")) + + "EitherFlow" should { + "work with empty source" in { + Source.empty[Int] + .map(asEither) + .via(EitherFlow.either(leftFlow, rightFlow)) + .runWith(TestSink.probe) + .request(1) + .expectComplete() + } + + "direct to the right flow based on materialized value of Either" in { + val (source, sink) = TestSource + .probe[Int] + .map(asEither) + .via(EitherFlow.either(leftFlow, rightFlow)) + .toMat(TestSink.probe)(Keep.both) + .run() + + sink.request(99) + + source.sendNext(1) + source.sendNext(2) + source.sendNext(3) + + sink.expectNext("ups, 1 is odd", "2", "ups, 3 is odd") + + source.sendComplete() + sink.expectComplete() + } + } +} \ No newline at end of file