Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,21 @@ object Topic {
case State.Closed() =>
Topic.closed.pure[F]
case State.Active(subs, _) =>
foreach(subs)(_.send(a).void)
.as(Topic.rightUnit)
subs.foldLeft(F.pure(Topic.rightUnit)) { case (acc, (_, chan)) =>
acc.flatMap {
case Left(Topic.Closed) => Topic.closed.pure[F]
case Right(_) =>
chan.send(a).flatMap {
case Right(_) => Topic.rightUnit.pure[F]
case Left(_) =>
// Channel send failed, check if topic was closed
state.get.map {
case State.Closed() => Topic.closed
case State.Active(_, _) => Topic.rightUnit
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing: I think constructing the effect in a right-associative way will lead to slightly faster execution for an IO-like F.

}

def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] =
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class TopicSuite extends Fs2Suite {

// https://github.com/typelevel/fs2/issues/3644
test(
"when publish1 returns success, subscribers must receive the event, even if the publish1 races with close".fail
"when publish1 returns success, subscribers must receive the event, even if the publish1 races with close"
) {
val check: IO[Unit] =
Topic[IO, String]
Expand Down
Loading