Skip to content

Commit f735cca

Browse files
authored
chore: Remove CollectionUtil (#2582)
1 parent 01f2265 commit f735cca

File tree

8 files changed

+57
-57
lines changed

8 files changed

+57
-57
lines changed

actor/src/main/scala/org/apache/pekko/util/Collections.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,30 @@ import scala.collection.immutable
2020
* INTERNAL API
2121
*/
2222
private[pekko] object Collections {
23+
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
24+
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
25+
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
26+
private object NotApplied extends (Any => Any) {
27+
final override def apply(v1: Any): Any = this
28+
}
29+
30+
implicit class IterableOps[T](val iterable: java.lang.Iterable[T]) extends AnyVal {
31+
def collectToImmutableSeq[R](pf: PartialFunction[T, R]): immutable.Seq[R] = {
32+
val builder = immutable.Seq.newBuilder[R]
33+
iterable.forEach((t: T) => {
34+
// 1. `applyOrElse` is faster than (`pf.isDefinedAt` and then `pf.apply`)
35+
// 2. using reference comparing here instead of pattern matching can generate less and quicker bytecode,
36+
// eg: just a simple `IF_ACMPNE`, and you can find the same trick in `CollectWhile` operator.
37+
// If you interest, you can check the associated PR for this change and the
38+
// current implementation of `scala.collection.IterableOnceOps.collectFirst`.
39+
pf.applyOrElse(t, NotApplied) match {
40+
case _: NotApplied.type => // do nothing
41+
case r: R @unchecked => builder += r
42+
}
43+
})
44+
builder.result()
45+
}
46+
}
2347

2448
case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
2549
override final def iterator = Iterator.empty

stream/src/main/scala-2.13/org/apache/pekko/stream/javadsl/CollectionUtil.scala

Lines changed: 0 additions & 39 deletions
This file was deleted.

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3495,7 +3495,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
34953495
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
34963496
segmentSize: Int,
34973497
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
3498-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
3498+
import pekko.util.Collections._
3499+
val seq = if (those ne null) those.collectToImmutableSeq {
34993500
case source: Source[Out @unchecked, _] => source.asScala
35003501
case other => other
35013502
}
@@ -3577,7 +3578,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
35773578
def mergeAll(
35783579
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
35793580
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
3580-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
3581+
import pekko.util.Collections._
3582+
val seq = if (those ne null) those.collectToImmutableSeq {
35813583
case source: Source[Out @unchecked, _] => source.asScala
35823584
case other => other
35833585
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,8 @@ object Sink {
465465
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
466466
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
467467
: Sink[T, java.util.List[M]] = {
468-
val seq = if (sinks ne null) CollectionUtil.toSeq(sinks).collect {
468+
import pekko.util.Collections._
469+
val seq = if (sinks ne null) sinks.collectToImmutableSeq {
469470
case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
470471
case other => other
471472
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,8 @@ object Source {
549549
rest: java.util.List[Source[T, _ <: Any]],
550550
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
551551
: Source[U, NotUsed] = {
552-
val seq = if (rest ne null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq()
552+
import scala.jdk.CollectionConverters._
553+
val seq = if (rest ne null) rest.asScala.map(_.asScala).toSeq else immutable.Seq()
553554
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num)))
554555
}
555556

@@ -574,7 +575,8 @@ object Source {
574575
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
575576
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
576577
: Source[U, java.util.List[M]] = {
577-
val seq = if (sources ne null) CollectionUtil.toSeq(sources).collect {
578+
import pekko.util.Collections._
579+
val seq = if (sources ne null) sources.collectToImmutableSeq {
578580
case source: Source[T @unchecked, M @unchecked] => source.asScala
579581
case other => other
580582
}
@@ -586,7 +588,8 @@ object Source {
586588
* Combine the elements of multiple streams into a stream of lists.
587589
*/
588590
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
589-
val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
591+
import scala.jdk.CollectionConverters._
592+
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
590593
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
591594
}
592595

@@ -596,7 +599,8 @@ object Source {
596599
def zipWithN[T, O](
597600
zipper: function.Function[java.util.List[T], O],
598601
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
599-
val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
602+
import scala.jdk.CollectionConverters._
603+
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
600604
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
601605
}
602606

@@ -844,9 +848,10 @@ object Source {
844848
def mergePrioritizedN[T](
845849
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
846850
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
851+
import scala.jdk.CollectionConverters._
847852
val seq =
848853
if (sourcesAndPriorities ne null)
849-
CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue()))
854+
sourcesAndPriorities.asScala.map(pair => (pair.first.asScala, pair.second.intValue())).toSeq
850855
else
851856
immutable.Seq()
852857
new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete))
@@ -1625,7 +1630,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
16251630
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
16261631
segmentSize: Int,
16271632
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
1628-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
1633+
import pekko.util.Collections._
1634+
val seq = if (those ne null) those.collectToImmutableSeq {
16291635
case source: Source[Out @unchecked, _] => source.asScala
16301636
case other => other
16311637
}
@@ -1705,7 +1711,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
17051711
def mergeAll(
17061712
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
17071713
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
1708-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
1714+
import pekko.util.Collections._
1715+
val seq = if (those ne null) those.collectToImmutableSeq {
17091716
case source: Source[Out @unchecked, _] => source.asScala
17101717
case other => other
17111718
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,7 +2368,8 @@ final class SubFlow[In, Out, Mat](
23682368
def mergeAll(
23692369
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
23702370
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
2371-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
2371+
import pekko.util.Collections._
2372+
val seq = if (those ne null) those.collectToImmutableSeq {
23722373
case source: Source[Out @unchecked, _] => source.asScala
23732374
case other => other
23742375
}
@@ -2426,7 +2427,8 @@ final class SubFlow[In, Out, Mat](
24262427
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
24272428
segmentSize: Int,
24282429
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
2429-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
2430+
import pekko.util.Collections._
2431+
val seq = if (those ne null) those.collectToImmutableSeq {
24302432
case source: Source[Out @unchecked, _] => source.asScala
24312433
case other => other
24322434
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,7 +2334,8 @@ final class SubSource[Out, Mat](
23342334
def mergeAll(
23352335
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
23362336
eagerComplete: Boolean): SubSource[Out, Mat] = {
2337-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
2337+
import pekko.util.Collections._
2338+
val seq = if (those ne null) those.collectToImmutableSeq {
23382339
case source: Source[Out @unchecked, _] => source.asScala
23392340
case other => other
23402341
}
@@ -2393,7 +2394,8 @@ final class SubSource[Out, Mat](
23932394
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
23942395
segmentSize: Int,
23952396
eagerClose: Boolean): SubSource[Out, Mat] = {
2396-
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
2397+
import pekko.util.Collections._
2398+
val seq = if (those ne null) those.collectToImmutableSeq {
23972399
case source: Source[Out @unchecked, _] => source.asScala
23982400
case other => other
23992401
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import pekko.stream.SystemMaterializer
4242
import pekko.stream.TLSClosing
4343
import pekko.stream.scaladsl
4444
import pekko.util.ByteString
45+
import pekko.japi.Util.immutableSeq
4546

4647
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
4748

@@ -177,7 +178,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
177178
idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] =
178179
Source.fromGraph(
179180
delegate
180-
.bind(interface, port, backlog, CollectionUtil.toSeq(options), halfClose, optionalDurationToScala(idleTimeout))
181+
.bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout))
181182
.map(new IncomingConnection(_))
182183
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
183184

@@ -228,7 +229,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
228229
.outgoingConnection(
229230
remoteAddress,
230231
localAddress.toScala,
231-
CollectionUtil.toSeq(options),
232+
immutableSeq(options),
232233
halfClose,
233234
optionalDurationToScala(connectTimeout),
234235
optionalDurationToScala(idleTimeout))
@@ -291,7 +292,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
291292
remoteAddress,
292293
createSSLEngine = () => createSSLEngine.create(),
293294
localAddress.toScala,
294-
CollectionUtil.toSeq(options),
295+
immutableSeq(options),
295296
optionalDurationToScala(connectTimeout),
296297
optionalDurationToScala(idleTimeout),
297298
session =>
@@ -342,7 +343,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
342343
port,
343344
createSSLEngine = () => createSSLEngine.create(),
344345
backlog,
345-
CollectionUtil.toSeq(options),
346+
immutableSeq(options),
346347
optionalDurationToScala(idleTimeout),
347348
session =>
348349
verifySession.apply(session).toScala match {

0 commit comments

Comments
 (0)