Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ Streams the elements to the given future sink once it successfully completes.
Streams the elements through the given future flow once it successfully completes.
If the future fails the stream is failed.

`completionStageSink` uses the same lazy materialization semantics as
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md): the nested sink is not materialized until the first
upstream element arrives. If the stream completes before the first element, the materialized value fails with
`org.apache.pekko.stream.NeverMaterializedException`.

If you want this to work for empty streams as well, use
@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).

## Reactive Streams semantics

@@@div { .callout }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Sink.eagerCompletionStageSink

Materializes the inner sink when the future completes, even if no elements have arrived yet.

@ref[Sink operators](../index.md#sink-operators)


## Description

Turn a `CompletionStage<Sink>` into a Sink that will consume the values of the source when the future completes
successfully. If the `CompletionStage` is completed with a failure the stream is failed.

Unlike @ref:[completionStageSink](completionStageSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md), this operator materializes the inner sink as soon as the future
completes, even if no elements have arrived yet. This means empty streams complete normally rather than failing
with `NeverMaterializedException`. At most one element that arrives before the future completes is buffered.

The materialized future value is completed with the materialized value of the inner sink once it has been
materialized, or failed if the `CompletionStage` itself fails or if materialization of the inner sink fails.
Upstream failures or downstream cancellations that occur before the inner sink is materialized are propagated
through the inner sink rather than failing the materialized value directly.

See also @ref:[completionStageSink](completionStageSink.md), @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md).

## Reactive Streams semantics

@@@div { .callout }

**cancels** if the future fails or if the created sink cancels

**backpressures** when initialized and when created sink backpressures

@@@

37 changes: 37 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Sink.eagerFutureSink

Materializes the inner sink when the future completes, even if no elements have arrived yet.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.eagerFutureSink](Sink$) { scala="#eagerFutureSink[T,M](future:scala.concurrent.Future[org.apache.pekko.stream.scaladsl.Sink[T,M]]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]" }


## Description

Turn a `Future[Sink]` into a Sink that will consume the values of the source when the future completes
successfully. If the `Future` is completed with a failure the stream is failed.

Unlike @ref:[futureSink](futureSink.md) and @ref:[lazyFutureSink](lazyFutureSink.md), this operator materializes the inner sink as soon as the future
completes, even if no elements have arrived yet. This means empty streams complete normally rather than failing
with `NeverMaterializedException`. At most one element that arrives before the future completes is buffered.

The materialized future value is completed with the materialized value of the inner sink once it has been
materialized, or failed if the future itself fails or if materialization of the inner sink fails. Upstream
failures or downstream cancellations that occur before the inner sink is materialized are propagated through
the inner sink rather than failing the materialized value directly.

See also @ref:[futureSink](futureSink.md), @ref:[lazyFutureSink](lazyFutureSink.md).

## Reactive Streams semantics

@@@div { .callout }

**cancels** if the future fails or if the created sink cancels

**backpressures** when initialized and when created sink backpressures

@@@

6 changes: 6 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/futureSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Streams the elements to the given future sink once it successfully completes.
Streams the elements through the given future flow once it successfully completes.
If the future fails the stream is failed.

`futureSink` uses the same lazy materialization semantics as @ref:[lazyFutureSink](lazyFutureSink.md): the nested sink
is not materialized until the first upstream element arrives. If the stream completes before the first element, the
materialized value fails with `org.apache.pekko.stream.NeverMaterializedException`.

If you want this to work for empty streams as well, use @ref:[eagerFutureSink](eagerFutureSink.md).

## Reactive Streams semantics

@@@div { .callout }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ and failed with a `org.apache.pekko.stream.NeverMaterializedException` if the st

Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element.

See also @ref:[lazySink](lazySink.md).
If you need empty streams to complete normally, use
@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).

See also @ref:[lazySink](lazySink.md), @ref:[completionStageSink](completionStageSink.md).

## Reactive Streams semantics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ and failed with a `org.apache.pekko.stream.NeverMaterializedException` if the st

Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element.

See also @ref:[lazySink](lazySink.md).
If you need empty streams to complete normally, use @ref:[eagerFutureSink](eagerFutureSink.md).

See also @ref:[lazySink](lazySink.md), @ref:[futureSink](futureSink.md).

## Reactive Streams semantics

Expand Down
4 changes: 4 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="count"></a>@ref[count](Sink/count.md)|Counts all incoming elements until upstream terminates.|
|Sink|<a name="eagercompletionstagesink"></a>@ref[eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)|Materializes the inner sink when the future completes, even if no elements have arrived yet.|
|Sink|<a name="eagerfuturesink"></a>@ref[eagerFutureSink](Sink/eagerFutureSink.md)|Materializes the inner sink when the future completes, even if no elements have arrived yet.|
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
Expand Down Expand Up @@ -469,6 +471,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [dropRepeated](Source-or-Flow/dropRepeated.md)
* [dropWhile](Source-or-Flow/dropWhile.md)
* [dropWithin](Source-or-Flow/dropWithin.md)
* [eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)
* [eagerFutureSink](Sink/eagerFutureSink.md)
* [empty](Source/empty.md)
* [exists](Sink/exists.md)
* [expand](Source-or-Flow/expand.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
"lazyFutureFlow", // lazyCompletionStageFlow
"futureFlow", // completionStageFlow
"futureSink", // completionStageSink
"eagerFutureSink", // eagerCompletionStageSink
"lazyFutureSink", // lazyCompletionStageSink
"createGraph" // renamed/overload of create for getting type inference working in Scala 3
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.scaladsl

import scala.concurrent.{ Future, Promise }

import org.apache.pekko
import pekko.stream.{ AbruptStageTerminationException, Materializer }
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.Utils._

class EagerFutureSinkSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 1
pekko.stream.materializer.max-input-buffer-size = 1
""") {

val ex = TE("")

"Sink.eagerFutureSink" must {

"work with an already-completed future" in {
val result = Source(List(1, 2, 3))
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
.run()
.flatten

result.futureValue shouldBe Seq(1, 2, 3)
}

"work when the future completes after elements arrive" in {
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source(List(1, 2, 3))
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()
.flatten

sinkPromise.success(Sink.seq[Int])
result.futureValue shouldBe Seq(1, 2, 3)
}

"handle an empty stream with an already-completed future" in {
val result = Source
.empty[Int]
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
.run()
.flatten

result.futureValue shouldBe Seq.empty
}

"handle an empty stream with a pending future" in {
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source
.empty[Int]
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()
.flatten

sinkPromise.success(Sink.seq[Int])
result.futureValue shouldBe Seq.empty
}

"propagate failure when the future fails" in {
val result = Source(List(1, 2, 3))
.toMat(Sink.eagerFutureSink(Future.failed[Sink[Int, Future[Seq[Int]]]](ex)))(Keep.right)
.run()
.flatten

result.failed.futureValue shouldBe ex
}

"propagate upstream failure" in {
val result = Source
.failed[Int](ex)
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
.run()
.flatten

result.failed.futureValue shouldBe ex
}

"propagate upstream failure when the future is still pending" in {
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source
.failed[Int](ex)
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()
.flatten

sinkPromise.success(Sink.seq[Int])
result.failed.futureValue shouldBe ex
}

"propagate upstream failure when element was buffered and future resolves later" in {
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source(List(1))
.concat(Source.failed[Int](ex))
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()
.flatten

sinkPromise.success(Sink.seq[Int])
result.failed.futureValue shouldBe ex
}

"work with Sink.fold on a non-empty stream" in {
val result = Source(List(1, 2, 3))
.toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_ + _))))(Keep.right)
.run()
.flatten

result.futureValue shouldBe 6
}

"work with Sink.fold on an empty stream" in {
val result = Source
.empty[Int]
.toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_ + _))))(Keep.right)
.run()
.flatten

result.futureValue shouldBe 0
}

"not throw NeverMaterializedException on empty stream (unlike futureSink)" in {
val result = Source
.empty[Int]
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
.run()
.flatten

result.futureValue shouldBe Seq.empty
}

"materialize inner sink immediately when the future is already completed (even with no elements yet)" in {
val innerMatPromise = Promise[Unit]()
val sink = Sink.foreach[Int](_ => ()).mapMaterializedValue(_ => innerMatPromise.success(()))
val sinkFuture = Future.successful(sink)

Source.maybe[Int]
.toMat(Sink.eagerFutureSink(sinkFuture))(Keep.right)
.run()

innerMatPromise.future.futureValue shouldBe (())
}

"cancel upstream when inner sink cancels" in {
val result = Source(List(1, 2, 3, 4, 5))
.toMat(Sink.eagerFutureSink(Future.successful(Sink.head[Int])))(Keep.right)
.run()
.flatten

result.futureValue shouldBe 1
}

"propagate failure when the future fails late" in {
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source(List(1, 2, 3))
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()

sinkPromise.failure(ex)
result.failed.futureValue shouldBe ex
}

"fail the materialized value on abrupt termination before future completion" in {
val mat = Materializer(system)
val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
val result = Source.maybe[Int]
.toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
.run()(mat)

mat.shutdown()

result.failed.futureValue shouldBe an[AbruptStageTerminationException]
}
}
}
Loading
Loading