Skip to content

Commit a869875

Browse files
pjfanningCopilot
andauthored
add Source.apply that is optimised for Seq (#2562)
* add Source.apply that is optimised for Seq * Update TraversalBuilderSpec.scala * Update stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala Co-authored-by: Copilot <[email protected]> * Update DslFactoriesConsistencySpec.scala --------- Co-authored-by: Copilot <[email protected]>
1 parent 56de5a1 commit a869875

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
161161
_ == "apply",
162162
_ == 1,
163163
_ == List(classOf[pekko.stream.impl.SourceModule[_, _]])),
164+
// no Java equivalent for this Scala only convenience method
165+
Ignore(
166+
_ == pekko.stream.scaladsl.Source.getClass,
167+
_ == "apply",
168+
_ == 1,
169+
_ == List(classOf[scala.collection.immutable.Seq[_]])),
164170
// corresponding matches on java side would need to have Function23
165171
Ignore(_ == pekko.stream.scaladsl.Source.getClass, _ == "apply", _ == 24, _ => true),
166172
Ignore(_ == pekko.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ => true),

stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,20 @@ class TraversalBuilderSpec extends PekkoSpec {
521521
}
522522

523523
"find Source.iterable via TraversalBuilder with getValuePresentedSource" in {
524-
val iterable = List("a")
524+
val iterable = Set("a", "b", "c")
525+
TraversalBuilder.getValuePresentedSource(Source(iterable)).get.asInstanceOf[IterableSource[
526+
String]].elements should ===(
527+
iterable)
528+
val iterableSource = new IterableSource(iterable)
529+
TraversalBuilder.getValuePresentedSource(iterableSource) should be(OptionVal.Some(iterableSource))
530+
531+
TraversalBuilder.getValuePresentedSource(Source(iterable).async) should be(OptionVal.None)
532+
TraversalBuilder.getValuePresentedSource(Source(iterable).mapMaterializedValue(_ => "Mat")) should be(
533+
OptionVal.None)
534+
}
535+
536+
"find Source.iterable via TraversalBuilder with getValuePresentedSource (Seq input)" in {
537+
val iterable = Seq("a", "b", "c")
525538
TraversalBuilder.getValuePresentedSource(Source(iterable)).get.asInstanceOf[IterableSource[
526539
String]].elements should ===(
527540
iterable)

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,14 +395,16 @@ object Source {
395395

396396
/**
397397
* Helper to create [[Source]] from `Iterable`.
398-
* Example usage: `Source(Seq(1,2,3))`
398+
* Example usage: `Source(Set(1,2,3))`
399399
*
400400
* Starts a new `Source` from the given `Iterable`. This is like starting from an
401401
* Iterator, but every Subscriber directly attached to the Publisher of this
402402
* stream will see an individual flow of elements (always starting from the
403403
* beginning) regardless of when they subscribed.
404+
* @see [[apply(immutable.Seq)]]
404405
*/
405406
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
407+
// unknown size is -1
406408
(iterable.knownSize: @switch) match {
407409
case 0 => empty
408410
case 1 => single(iterable.head)
@@ -411,6 +413,26 @@ object Source {
411413
}
412414
}
413415

416+
/**
417+
* Helper to create [[Source]] from `Seq`.
418+
* Example usage: `Source(Seq(1,2,3))`
419+
*
420+
* Starts a new `Source` from the given `Seq`. This is like starting from an
421+
* Iterator, but every Subscriber directly attached to the Publisher of this
422+
* stream will see an individual flow of elements (always starting from the
423+
* beginning) regardless of when they subscribed.
424+
* @see [[apply(immutable.Iterable)]]
425+
* @since 2.0.0
426+
*/
427+
def apply[T](seq: immutable.Seq[T]): Source[T, NotUsed] = {
428+
seq match {
429+
case immutable.Seq() => empty[T]
430+
case immutable.Seq(elem: T @unchecked) => single(elem)
431+
case _ =>
432+
fromGraph(new IterableSource[T](seq)).withAttributes(DefaultAttributes.iterableSource)
433+
}
434+
}
435+
414436
/**
415437
* Creates a `Source` from an array, if the array is empty, the stream is completed immediately,
416438
* otherwise, every element of the array will be emitted sequentially.

0 commit comments

Comments
 (0)