Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.apache.pekko.stream.impl

import scala.concurrent.Promise

import org.apache.pekko
import pekko.NotUsed
import pekko.stream._
Expand All @@ -23,8 +25,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
import pekko.util.OptionVal
import pekko.testkit.PekkoSpec

import scala.concurrent.Future

class TraversalBuilderSpec extends PekkoSpec {

"CompositeTraversalBuilder" must {
Expand Down Expand Up @@ -508,7 +508,8 @@ class TraversalBuilderSpec extends PekkoSpec {
}

"find Source.future via TraversalBuilder with getValuePresentedSource" in {
val future = Future.successful("a")
val promise = Promise[String]()
val future = promise.future
TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[
String]].future should ===(
future)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
}

"Source.futureSource" must {
"Source.future" must {
"work as empty source when the future source completes with null" in {
val source = Source.future(Future.successful(null.asInstanceOf[String]))
val probe = source.runWith(TestSink[String]())

probe.request(1)
probe.expectComplete()
}

"work with a successful future" in {
val source = Source.future(Future.successful(42))
val probe = source.runWith(TestSink[Int]())

probe.request(1)
probe.expectNext(42)
probe.expectComplete()
}

"work with a failed future" in {
val ex = new RuntimeException("boom")
val source = Source.future(Future.failed(ex))
val probe = source.runWith(TestSink[Int]())

probe.request(1)
probe.expectError().getMessage should ===("boom")
}

"work with a delayed future" in {
val promise = scala.concurrent.Promise[Int]()
val source = Source.future(promise.future)
val probe = source.runWith(TestSink[Int]())

probe.request(1)
probe.expectNoMessage(500.millis)

promise.success(42)

probe.expectNext(42)
probe.expectComplete()
}
}

"Source.futureSource" must {
"not cancel substream twice" in {
val result = Source
.futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2))))
Expand All @@ -558,6 +599,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout {

Await.result(result, 4.seconds) shouldBe Done
}

"fail when the future completes with null" in {
val source = Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]]))
val probe = source.runWith(TestSink[Int]())

probe.request(1)
probe.expectError().getMessage should include("futureSource completed with null")
}

"work with a successful future" in {
val source = Source.futureSource(Future.successful(Source(1 to 3)))
val probe = source.runWith(TestSink[Int]())

probe.request(3)
probe.expectNext(1, 2, 3)
probe.expectComplete()
}

"work with a failed future source" in {
val ex = new RuntimeException("boom")
val source = Source.futureSource(Future.failed(ex))
val probe = source.runWith(TestSink[Int]())

probe.request(1)
probe.expectError().getMessage should ===("boom")
}

"work with a delayed future source" in {
val promise = scala.concurrent.Promise[Source[Int, NotUsed]]()
val source = Source.futureSource(promise.future)
val probe = source.runWith(TestSink[Int]())

probe.request(3)
probe.expectNoMessage(500.millis)

promise.success(Source(1 to 3))

probe.expectNext(1, 2, 3)
probe.expectComplete()
}
}

"Source of sources" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,12 @@ object Source {
* Emits a single value when the given `Future` is successfully completed and then completes the stream.
* The stream fails if the `Future` is completed with a failure.
*/
def future[T](futureElement: Future[T]): Source[T, NotUsed] =
fromGraph(new FutureSource[T](futureElement))
def future[T](futureElement: Future[T]): Source[T, NotUsed] = futureElement.value match {
case None => fromGraph(new FutureSource[T](futureElement))
case Some(scala.util.Success(null)) => empty[T]
case Some(scala.util.Success(elem)) => single(elem)
case Some(scala.util.Failure(ex)) => failed[T](ex)
}

/**
* Never emits any elements, never completes and never fails.
Expand All @@ -662,8 +666,14 @@ object Source {
* Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully.
* If the `Future` is completed with a failure the stream is failed.
*/
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] =
fromGraph(new FutureFlattenSource(futureSource))
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = futureSource.value match {
case None => fromGraph(new FutureFlattenSource(futureSource))
case Some(scala.util.Success(null)) =>
val exception = new NullPointerException("futureSource completed with null")
Source.failed(exception).mapMaterializedValue(_ => Future.failed[M](exception))
case Some(scala.util.Success(source)) => source.mapMaterializedValue(Future.successful)
case Some(scala.util.Failure(ex)) => Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex))
}

/**
* Defers invoking the `create` function to create a single element until there is downstream demand.
Expand Down