Skip to content

Commit 3801cf2

Browse files
committed
Merge branch 'main' into topic/net-metrics
2 parents 55b55db + 1633841 commit 3801cf2

File tree

10 files changed

+345
-7
lines changed

10 files changed

+345
-7
lines changed

core/shared/src/main/scala-2.12/fs2/CollectorPlatform.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ import scala.collection.generic.{
2525
GenericTraversableTemplate,
2626
MapFactory,
2727
SetFactory,
28+
SortedMapFactory,
29+
SortedSetFactory,
2830
TraversableFactory
2931
}
30-
import scala.collection.{MapLike, SetLike, Traversable}
32+
import scala.collection.{MapLike, SetLike, SortedMapLike, SortedSetLike, Traversable}
3133

3234
import fs2.internal._
3335

@@ -52,11 +54,31 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
5254
): Collector.Aux[(K, V), C[K, V]] =
5355
make(Builder.fromMapFactory(f))
5456

57+
implicit def supportsSortedMapFactory[
58+
K: Ordering,
59+
V,
60+
C[a, b] <: collection.SortedMap[a, b] with SortedMapLike[
61+
a,
62+
b,
63+
C[a, b]
64+
]
65+
](
66+
f: SortedMapFactory[C]
67+
): Collector.Aux[(K, V), C[K, V]] =
68+
make(Builder.fromSortedMapFactory(f))
69+
5570
implicit def supportsSetFactory[A, C[x] <: Set[x] with SetLike[x, C[x]]](
5671
f: SetFactory[C]
5772
): Collector.Aux[A, C[A]] =
5873
make(Builder.fromSetFactory(f))
5974

75+
implicit def supportsSortedSetFactory[A: Ordering, C[x] <: collection.SortedSet[
76+
x
77+
] with SortedSetLike[x, C[x]]](
78+
f: SortedSetFactory[C]
79+
): Collector.Aux[A, C[A]] =
80+
make(Builder.fromSortedSetFactory(f))
81+
6082
private[fs2] trait BuilderPlatform { self: Collector.Builder.type =>
6183
def fromFactory[A, C[_], B](f: Factory[A, C[B]]): Builder[A, C[B]] =
6284
fromBuilder(f())
@@ -71,9 +93,25 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
7193
): Builder[(K, V), C[K, V]] =
7294
fromBuilder(f.newBuilder)
7395

96+
def fromSortedMapFactory[
97+
K: Ordering,
98+
V,
99+
C[a, b] <: collection.SortedMap[a, b] with SortedMapLike[a, b, C[a, b]]
100+
](
101+
f: SortedMapFactory[C]
102+
): Builder[(K, V), C[K, V]] =
103+
fromBuilder(f.newBuilder)
104+
74105
def fromSetFactory[A, C[x] <: collection.Set[x] with SetLike[x, C[x]]](
75106
f: SetFactory[C]
76107
): Builder[A, C[A]] =
77108
fromBuilder(f.newBuilder)
109+
110+
def fromSortedSetFactory[A: Ordering, C[x] <: collection.SortedSet[x] with SortedSetLike[x, C[
111+
x
112+
]]](
113+
f: SortedSetFactory[C]
114+
): Builder[A, C[A]] =
115+
fromBuilder(f.newBuilder)
78116
}
79117
}

core/shared/src/main/scala-2.13/fs2/CollectorPlatform.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
package fs2
2323

2424
import scala.collection.immutable.ArraySeq
25-
import scala.collection.{Factory, IterableFactory, MapFactory}
25+
import scala.collection.{
26+
Factory,
27+
EvidenceIterableFactory,
28+
IterableFactory,
29+
MapFactory,
30+
SortedMapFactory
31+
}
2632
import scala.reflect.ClassTag
2733

2834
private[fs2] trait CollectorPlatform { self: Collector.type =>
@@ -38,6 +44,16 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
3844
implicit def supportsMapFactory[K, V, C[_, _]](f: MapFactory[C]): Collector.Aux[(K, V), C[K, V]] =
3945
make(Builder.fromMapFactory(f))
4046

47+
implicit def supportsSortedMapFactory[K: Ordering, V, C[_, _]](
48+
f: SortedMapFactory[C]
49+
): Collector.Aux[(K, V), C[K, V]] =
50+
make(Builder.fromSortedMapFactory(f))
51+
52+
implicit def supportsEvidenceIterableFactory[A, C[_], E[_]](f: EvidenceIterableFactory[C, E])(
53+
implicit ev: E[A]
54+
): Collector.Aux[A, C[A]] =
55+
make(Builder.fromEvidenceIterableFactory(f))
56+
4157
/** Use `ArraySeq.untagged` to build a `Collector` where a `ClassTag` is not available.
4258
*/
4359
implicit def supportsTaggedArraySeq[A: ClassTag](
@@ -64,6 +80,16 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
6480
def fromMapFactory[K, V, C[_, _]](f: MapFactory[C]): Builder[(K, V), C[K, V]] =
6581
fromBuilder(f.newBuilder)
6682

83+
def fromSortedMapFactory[K: Ordering, V, C[_, _]](
84+
f: SortedMapFactory[C]
85+
): Builder[(K, V), C[K, V]] =
86+
fromBuilder(f.newBuilder)
87+
88+
def fromEvidenceIterableFactory[A, C[_], E[_]](f: EvidenceIterableFactory[C, E])(implicit
89+
ev: E[A]
90+
): Builder[A, C[A]] =
91+
fromBuilder(f.newBuilder)
92+
6793
def taggedArraySeq[A: ClassTag]: Builder[A, ArraySeq[A]] =
6894
array[A].mapResult(ArraySeq.unsafeWrapArray)
6995
}

core/shared/src/main/scala-3/fs2/CollectorPlatform.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
package fs2
2323

2424
import scala.collection.immutable.ArraySeq
25-
import scala.collection.{Factory, IterableFactory, MapFactory}
25+
import scala.collection.{
26+
Factory,
27+
EvidenceIterableFactory,
28+
IterableFactory,
29+
MapFactory,
30+
SortedMapFactory
31+
}
2632
import scala.reflect.ClassTag
2733

2834
private[fs2] trait CollectorPlatform { self: Collector.type =>
@@ -38,6 +44,16 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
3844
implicit def supportsMapFactory[K, V, C[_, _]](f: MapFactory[C]): Collector.Aux[(K, V), C[K, V]] =
3945
make(Builder.fromMapFactory(f))
4046

47+
implicit def supportsSortedMapFactory[K: Ordering, V, C[_, _]](
48+
f: SortedMapFactory[C]
49+
): Collector.Aux[(K, V), C[K, V]] =
50+
make(Builder.fromSortedMapFactory(f))
51+
52+
implicit def supportsEvidenceIterableFactory[A, C[_], E[_]](f: EvidenceIterableFactory[C, E])(
53+
implicit ev: E[A]
54+
): Collector.Aux[A, C[A]] =
55+
make(Builder.fromEvidenceIterableFactory(f))
56+
4157
/** Use `ArraySeq.untagged` to build a `Collector` where a `ClassTag` is not available.
4258
*/
4359
implicit def supportsTaggedArraySeq[A: ClassTag](
@@ -72,6 +88,16 @@ private[fs2] trait CollectorPlatform { self: Collector.type =>
7288
def fromMapFactory[K, V, C[_, _]](f: MapFactory[C]): Builder[(K, V), C[K, V]] =
7389
fromBuilder(f.newBuilder)
7490

91+
def fromSortedMapFactory[K: Ordering, V, C[_, _]](
92+
f: SortedMapFactory[C]
93+
): Builder[(K, V), C[K, V]] =
94+
fromBuilder(f.newBuilder)
95+
96+
def fromEvidenceIterableFactory[A, C[_], E[_]](f: EvidenceIterableFactory[C, E])(implicit
97+
ev: E[A]
98+
): Builder[A, C[A]] =
99+
fromBuilder(f.newBuilder)
100+
75101
def taggedArraySeq[A: ClassTag]: Builder[A, ArraySeq[A]] =
76102
array[A].mapResult(ArraySeq.unsafeWrapArray)
77103
}

core/shared/src/test/scala/fs2/CompilationTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,8 @@ object ThisModuleShouldCompile {
120120
Stream(1, 2, 3).compile.to(Set)
121121
Stream(1, 2, 3).to(List)
122122
Stream(1, 2, 3).covary[Fallible].to(List)
123+
Stream(1, 2, 3).to(Set)
124+
Stream(1, 2, 3).to(collection.immutable.SortedSet)
125+
Stream(1 -> 1, 2 -> 2, 3 -> 3).to(Map)
126+
Stream(1 -> 1, 2 -> 2, 3 -> 3).to(collection.immutable.SortedMap)
123127
}

io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
9494
case None => orElse
9595
}
9696

97+
private def selectingDatagram[A](
98+
ifSelecting: SelectingIpDatagramSocketsProvider[F] => Resource[F, A],
99+
orElse: => Resource[F, A]
100+
): Resource[F, A] =
101+
Resource.eval(tryGetSelector).flatMap {
102+
case Some(selector) => ifSelecting(new SelectingIpDatagramSocketsProvider(selector))
103+
case None => orElse
104+
}
105+
97106
override def connect(
98107
address: GenSocketAddress,
99108
options: List[SocketOption]
@@ -118,7 +127,15 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
118127
address: GenSocketAddress,
119128
options: List[SocketOption]
120129
): Resource[F, DatagramSocket[F]] =
121-
fallback.bindDatagramSocket(address, options)
130+
matchAddress(
131+
address,
132+
sa =>
133+
selectingDatagram(
134+
_.bindDatagramSocket(sa, options),
135+
fallback.bindDatagramSocket(sa, options)
136+
),
137+
ua => fallback.bindDatagramSocket(ua, options)
138+
)
122139

123140
// Implementations of deprecated operations
124141

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package net
25+
26+
import cats.effect.{LiftIO, Selector}
27+
import cats.effect.kernel.Async
28+
import cats.effect.std.Mutex
29+
import cats.syntax.all._
30+
import com.comcast.ip4s.{IpAddress, SocketAddress, GenSocketAddress, NetworkInterface}
31+
32+
import java.net.InetSocketAddress
33+
import java.nio.ByteBuffer
34+
import java.nio.channels.{DatagramChannel, SelectionKey}
35+
import java.net.{NetworkInterface => JNetworkInterface}
36+
import com.comcast.ip4s.MulticastJoin
37+
import CollectionCompat.*
38+
39+
private final class SelectingDatagramSocket[F[_]: LiftIO] private (
40+
selector: Selector,
41+
ch: DatagramChannel,
42+
readMutex: Mutex[F],
43+
writeMutex: Mutex[F],
44+
override val address: SocketAddress[IpAddress]
45+
)(implicit F: Async[F])
46+
extends DatagramSocket[F] {
47+
48+
private[this] val bufferSize = 1 << 16
49+
50+
def localAddress: F[SocketAddress[IpAddress]] =
51+
F.pure(address)
52+
53+
def read: F[Datagram] =
54+
readMutex.lock.surround {
55+
val buf = ByteBuffer.allocate(bufferSize)
56+
57+
def go: F[Datagram] =
58+
F.delay(ch.receive(buf)).flatMap {
59+
case null =>
60+
selector.select(ch, SelectionKey.OP_READ).to *> go
61+
case src =>
62+
F.delay {
63+
buf.flip()
64+
val bytes = new Array[Byte](buf.remaining())
65+
buf.get(bytes)
66+
buf.clear()
67+
Datagram(
68+
SocketAddress.fromInetSocketAddress(src.asInstanceOf[InetSocketAddress]),
69+
Chunk.array(bytes)
70+
)
71+
}
72+
}
73+
go
74+
}
75+
76+
def readGen: F[GenDatagram] =
77+
read.map(_.toGenDatagram)
78+
79+
def reads: Stream[F, Datagram] =
80+
Stream.repeatEval(read)
81+
82+
private def write0(bytes: Chunk[Byte], addr: Option[InetSocketAddress]): F[Unit] =
83+
writeMutex.lock.surround {
84+
val buf = bytes.toByteBuffer
85+
86+
def go: F[Unit] =
87+
F.delay {
88+
addr match {
89+
case Some(a) => ch.send(buf, a)
90+
case None => ch.write(buf)
91+
}
92+
}.flatMap { _ =>
93+
if (buf.hasRemaining) selector.select(ch, SelectionKey.OP_WRITE).to[F] *> go
94+
else F.unit
95+
}
96+
97+
go
98+
}
99+
100+
def write(bytes: Chunk[Byte], address: GenSocketAddress): F[Unit] =
101+
write0(bytes, Some(address.asIpUnsafe.toInetSocketAddress))
102+
103+
def write(bytes: Chunk[Byte]): F[Unit] =
104+
write0(bytes, None)
105+
106+
def writes: Pipe[F, Datagram, Nothing] =
107+
_.evalMap(write).drain
108+
109+
def connect(addr: GenSocketAddress): F[Unit] =
110+
F.delay(ch.connect(addr.asIpUnsafe.toInetSocketAddress)).void
111+
112+
def disconnect: F[Unit] =
113+
F.delay(ch.disconnect()).void
114+
115+
def getOption[A](key: java.net.SocketOption[A]): F[Option[A]] =
116+
F.delay(Option(ch.getOption(key)))
117+
118+
def setOption[A](key: java.net.SocketOption[A], value: A): F[Unit] =
119+
F.delay(ch.setOption(key, value)).void
120+
121+
override def join(
122+
join: MulticastJoin[IpAddress],
123+
interface: NetworkInterface
124+
): F[GroupMembership] =
125+
F.delay {
126+
val jinterface = JNetworkInterface.getByName(interface.name)
127+
val membership = join.fold(
128+
j => ch.join(j.group.address.toInetAddress, jinterface),
129+
j => ch.join(j.group.address.toInetAddress, jinterface, j.source.toInetAddress)
130+
)
131+
new GroupMembership {
132+
def drop = F.delay(membership.drop)
133+
def block(source: IpAddress) =
134+
F.delay { membership.block(source.toInetAddress); () }
135+
def unblock(source: IpAddress) =
136+
F.delay { membership.unblock(source.toInetAddress); () }
137+
override def toString = "GroupMembership"
138+
}
139+
}
140+
141+
override def join(
142+
j: MulticastJoin[IpAddress],
143+
interface: JNetworkInterface
144+
): F[GroupMembership] =
145+
join(j, NetworkInterface.fromJava(interface))
146+
147+
override def supportedOptions: F[Set[SocketOption.Key[?]]] =
148+
F.delay {
149+
ch.supportedOptions.asScala.toSet
150+
}
151+
152+
}
153+
154+
private object SelectingDatagramSocket {
155+
def apply[F[_]: LiftIO](
156+
selector: Selector,
157+
ch: DatagramChannel,
158+
local: SocketAddress[IpAddress]
159+
)(implicit F: Async[F]): F[DatagramSocket[F]] =
160+
(Mutex[F], Mutex[F]).flatMapN { (readM, writeM) =>
161+
F.delay {
162+
new SelectingDatagramSocket[F](selector, ch, readM, writeM, local)
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)