Skip to content

Commit 6a3acfc

Browse files
Add Queue.imap, Topic.imap and Signal.map
1 parent bdd2d11 commit 6a3acfc

File tree

3 files changed

+62
-16
lines changed

3 files changed

+62
-16
lines changed

core/src/main/scala/fs2/async/immutable/Signal.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package fs2.async.immutable
22

33
import fs2.{pipe, Async, Stream}
44
import fs2.util.Functor
5-
65
import fs2.Async
76
import fs2.async.immutable
87

9-
108
/** A holder of a single value of type `A` that can be read in the effect `F`. */
11-
trait Signal[F[_],A] {
9+
trait Signal[F[_], A] { self =>
1210

1311
/**
1412
* Returns the discrete version stream of this signal, updated only when `value`
@@ -43,6 +41,18 @@ trait Signal[F[_],A] {
4341
* Asynchronously get the current value of this `Signal`
4442
*/
4543
def get: F[A]
44+
45+
/**
46+
* Returns an alternate view of this `Signal` where its elements are of type [[B]],
47+
* given a function from `A` to `B`.
48+
*/
49+
def map[B](f: A => B)(implicit F: Functor[F]): Signal[F, B] =
50+
new Signal[F, B] {
51+
def discrete: Stream[F, B] = self.discrete.map(f)
52+
def continuous: Stream[F, B] = self.continuous.map(f)
53+
def changes: Stream[F, Unit] = self.changes
54+
def get: F[B] = F.map(self.get)(f)
55+
}
4656
}
4757

4858

@@ -75,4 +85,9 @@ object Signal {
7585
Stream.eval(fs2.async.signalOf[F,A](initial)) flatMap { sig =>
7686
Stream(sig).merge(source.flatMap(a => Stream.eval_(sig.set(a))))
7787
}
88+
89+
implicit def fs2SignalFunctor[F[_]](implicit F: Functor[F]): Functor[({type l[A]=Signal[F, A]})#l] =
90+
new Functor[({type l[A]=Signal[F, A]})#l] {
91+
def map[A, B](fa: Signal[F, A])(f: A => B): Signal[F, B] = fa.map(f)
92+
}
7893
}

core/src/main/scala/fs2/async/mutable/Queue.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package fs2.async.mutable
22

33
import fs2._
4-
4+
import fs2.util.Functor
55
import fs2.async.immutable
66

77
/**
@@ -10,7 +10,7 @@ import fs2.async.immutable
1010
* a queue may have a bound on its size, in which case enqueuing may
1111
* block until there is an offsetting dequeue.
1212
*/
13-
trait Queue[F[_],A] {
13+
trait Queue[F[_],A] { self =>
1414

1515
/**
1616
* Enqueues one element in this `Queue`.
@@ -40,17 +40,17 @@ trait Queue[F[_],A] {
4040
def dequeue1: F[A]
4141

4242
/** Like `dequeue1` but provides a way to cancel the dequeue. */
43-
def cancellableDequeue1: F[(F[A],F[Unit])]
43+
def cancellableDequeue1: F[(F[A], F[Unit])]
4444

4545
/** Repeatedly call `dequeue1` forever. */
46-
def dequeue: Stream[F,A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
46+
def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat
4747

4848
/**
4949
* The time-varying size of this `Queue`. This signal refreshes
5050
* only when size changes. Offsetting enqueues and de-queues may
5151
* not result in refreshes.
5252
*/
53-
def size: immutable.Signal[F,Int]
53+
def size: immutable.Signal[F, Int]
5454

5555
/** The size bound on the queue. `None` if the queue is unbounded. */
5656
def upperBound: Option[Int]
@@ -59,13 +59,30 @@ trait Queue[F[_],A] {
5959
* Returns the available number of entries in the queue.
6060
* Always `Int.MaxValue` when the queue is unbounded.
6161
*/
62-
def available: immutable.Signal[F,Int]
62+
def available: immutable.Signal[F, Int]
6363

6464
/**
6565
* Returns `true` when the queue has reached its upper size bound.
6666
* Always `false` when the queue is unbounded.
6767
*/
68-
def full: immutable.Signal[F,Boolean]
68+
def full: immutable.Signal[F, Boolean]
69+
70+
/**
71+
* Returns an alternate view of this `Queue` where its elements are of type [[B]],
72+
* given back and forth function from `A` to `B`.
73+
*/
74+
def imap[B](f: A => B)(g: B => A)(implicit F: Functor[F]): Queue[F, B] =
75+
new Queue[F, B] {
76+
def available: immutable.Signal[F, Int] = self.available
77+
def full: immutable.Signal[F, Boolean] = self.full
78+
def size: immutable.Signal[F, Int] = self.size
79+
def upperBound: Option[Int] = self.upperBound
80+
def enqueue1(a: B): F[Unit] = self.enqueue1(g(a))
81+
def offer1(a: B): F[Boolean] = self.offer1(g(a))
82+
def dequeue1: F[B] = F.map(self.dequeue1)(f)
83+
def cancellableDequeue1: F[(F[B],F[Unit])] =
84+
F.map(self.cancellableDequeue1)(bu => F.map(bu._1)(f) -> bu._2)
85+
}
6986
}
7087

7188
object Queue {

core/src/main/scala/fs2/async/mutable/Topic.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ import fs2.Stream._
1414
* Additionally the subscriber has possibility to terminate whenever size of enqueued elements is over certain size
1515
* by using `subscribeSize`.
1616
*/
17-
trait Topic[F[_],A] {
17+
trait Topic[F[_], A] { self =>
1818

1919
/**
2020
* Published any elements from source of `A` to this topic.
2121
* If any of the subscribers reach its `maxQueued` limit, then this will hold to publish next element
2222
* before that subscriber consumes it's elements or terminates.
2323
*/
24-
def publish:Sink[F,A]
24+
def publish: Sink[F, A]
2525

2626
/**
2727
* Publish one `A` to topic.
@@ -31,7 +31,7 @@ trait Topic[F[_],A] {
3131
* some of its elements to get room for this new. published `A`
3232
*
3333
*/
34-
def publish1(a:A):F[Unit]
34+
def publish1(a: A): F[Unit]
3535

3636
/**
3737
* Subscribes to receive any published `A` to this topic.
@@ -42,7 +42,7 @@ trait Topic[F[_],A] {
4242
* then publishers will hold into publishing to the queue.
4343
*
4444
*/
45-
def subscribe(maxQueued:Int):Stream[F,A]
45+
def subscribe(maxQueued: Int): Stream[F, A]
4646

4747
/**
4848
* Subscribes to receive published `A` to this topic.
@@ -58,12 +58,26 @@ trait Topic[F[_],A] {
5858
* then publishers will hold into publishing to the queue.
5959
*
6060
*/
61-
def subscribeSize(maxQueued:Int):Stream[F,(A, Int)]
61+
def subscribeSize(maxQueued: Int): Stream[F, (A, Int)]
6262

6363
/**
6464
* Signal of current active subscribers
6565
*/
66-
def subscribers:fs2.async.immutable.Signal[F,Int]
66+
def subscribers: fs2.async.immutable.Signal[F, Int]
67+
68+
/**
69+
* Returns an alternate view of this `Topic` where its elements are of type [[B]],
70+
* given back and forth function from `A` to `B`.
71+
*/
72+
def imap[B](f: A => B)(g: B => A): Topic[F, B] =
73+
new Topic[F, B] {
74+
def publish: Sink[F, B] = sfb => self.publish(sfb.map(g))
75+
def publish1(b: B): F[Unit] = self.publish1(g(b))
76+
def subscribe(maxQueued: Int): Stream[F, B] = self.subscribe(maxQueued).map(f)
77+
def subscribers: fs2.async.immutable.Signal[F, Int] = self.subscribers
78+
def subscribeSize(maxQueued: Int): Stream[F, (B, Int)] =
79+
self.subscribeSize(maxQueued).map { case (a, i) => f(a) -> i }
80+
}
6781
}
6882

6983
object Topic {

0 commit comments

Comments
 (0)