44
55package doobie .postgres
66
7+ import util .Properties .versionString
78import cats .effect .IO
89import com .zaxxer .hikari .HikariDataSource
910import doobie .*
1011import doobie .implicits .*
1112import doobie .util .transactor .Strategy
13+ import doobie .util .fragment .Fragment
1214import fs2 .Stream
1315
1416import java .sql .SQLTransientConnectionException
@@ -24,6 +26,8 @@ class StreamPrefetchSuite extends munit.FunSuite {
2426 private var dataSource : HikariDataSource = null
2527 private var xa : Transactor [IO ] = null
2628 private val count = 100
29+ private val testTable =
30+ Fragment .const(s " stream_cancel_test_ ${versionString.replace('.' , '_' ).replace(" version " , " " )}" )
2731
2832 private def createDataSource () = {
2933
@@ -46,10 +50,10 @@ class StreamPrefetchSuite extends munit.FunSuite {
4650 ExecutionContext .fromExecutor(Executors .newFixedThreadPool(32 ))
4751 )
4852 val insert = for {
49- _ <- Stream .eval(sql " CREATE TABLE if not exists stream_cancel_test (i text) " .update.run.transact(xa))
50- _ <- Stream .eval(sql " truncate table stream_cancel_test " .update.run.transact(xa))
53+ _ <- Stream .eval(sql " CREATE TABLE if not exists ${testTable} (i text) " .update.run.transact(xa))
54+ _ <- Stream .eval(sql " truncate table ${testTable} " .update.run.transact(xa))
5155 _ <- Stream .eval(
52- sql " INSERT INTO stream_cancel_test select 1 from generate_series(1, $count) " .update.run.transact(xa))
56+ sql " INSERT INTO ${testTable} select 1 from generate_series(1, $count) " .update.run.transact(xa))
5357 } yield ()
5458
5559 insert.compile.drain.unsafeRunSync()
@@ -60,21 +64,22 @@ class StreamPrefetchSuite extends munit.FunSuite {
6064 super .afterAll()
6165 }
6266
63- test(" Connection returned before stream is drained, if chunk size is larger than result count" ) {
67+ test(" Connection returned before stream is drained, if chunk size is larger than result count"
68+ .pending(" skipped as it's a bit flakey at the moment (fails in CI but never locally)" )) {
6469 val xa = Transactor .fromDataSource[IO ](
6570 dataSource,
6671 ExecutionContext .fromExecutor(Executors .newFixedThreadPool(32 ))
6772 )
6873
69- val streamLargerBuffer = fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(200 ).transact(xa)
74+ val streamLargerBuffer = fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(200 ).transact(xa)
7075 .evalMap(_ => fr " select 1 " .query[Int ].unique.transact(xa))
7176 .compile.count
7277
7378 assertEquals(streamLargerBuffer.unsafeRunSync(), count.toLong)
7479 }
7580
7681 test(" Connection is not returned after consuming only 1 chunk, if chunk size is smaller than result count" ) {
77- val streamSmallerBuffer = fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(10 ).transact(xa)
82+ val streamSmallerBuffer = fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(10 ).transact(xa)
7883 .evalMap(_ => fr " select 1 " .query[Int ].unique.transact(xa))
7984 .compile.count
8085
@@ -92,7 +97,7 @@ class StreamPrefetchSuite extends munit.FunSuite {
9297 val earlyClose = new AtomicBoolean (false )
9398
9499 val streamSmallerBufferValid =
95- fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(10 ).transact(xaCopy)
100+ fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(10 ).transact(xaCopy)
96101 .evalMap { _ => IO { if (hasClosed.get()) earlyClose.set(true ) } >> IO .sleep(10 .milliseconds) }
97102 .compile.count
98103
0 commit comments