Skip to content

Commit 2d22a96

Browse files
committed
test(Topic): Add failing tests for #3642: Concurrent subscription and closing of a Topic.
1 parent d5a3fe3 commit 2d22a96

File tree

1 file changed

+35
-0
lines changed

1 file changed

+35
-0
lines changed

core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,39 @@ class TopicSuite extends Fs2Suite {
185185

186186
TestControl.executeEmbed(program) // will fail if program is deadlocked
187187
}
188+
189+
// https://github.com/typelevel/fs2/issues/3642
190+
test("subscribe and close concurrently".fail) {
191+
val check: IO[Unit] =
192+
for {
193+
t <- Topic[IO, Int]
194+
fiber <- t
195+
.subscribe(maxQueued = 1)
196+
.compile
197+
.toList
198+
.start // let the subscription race with closing
199+
_ <- t.close
200+
_ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream
201+
} yield ()
202+
203+
check.replicateA_(100000)
204+
}
205+
206+
// https://github.com/typelevel/fs2/issues/3642
207+
test("subscribeAwait and close concurrently".fail) {
208+
val check: IO[Unit] =
209+
for {
210+
t <- Topic[IO, Int]
211+
fiber <- Stream
212+
.resource(t.subscribeAwait(maxQueued = 1))
213+
.flatten
214+
.compile
215+
.toList
216+
.start // let the subscription race with closing
217+
_ <- t.close
218+
_ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream
219+
} yield ()
220+
221+
check.replicateA_(100000)
222+
}
188223
}

0 commit comments

Comments
 (0)