44
55package doobie .postgres
66
7+ import util .Properties .versionString
78import cats .effect .IO
89import com .zaxxer .hikari .HikariDataSource
910import doobie .*
1011import doobie .implicits .*
1112import doobie .util .transactor .Strategy
13+ import doobie .util .fragment .Fragment
1214import fs2 .Stream
1315
1416import java .sql .SQLTransientConnectionException
@@ -24,6 +26,8 @@ class StreamPrefetchSuite extends munit.FunSuite {
2426 private var dataSource : HikariDataSource = null
2527 private var xa : Transactor [IO ] = null
2628 private val count = 100
29+ private val testTable =
30+ Fragment .const(s " stream_cancel_test_ ${versionString.replace('.' , '_' ).replace(" version " , " " )}" )
2731
2832 private def createDataSource () = {
2933
@@ -46,10 +50,10 @@ class StreamPrefetchSuite extends munit.FunSuite {
4650 ExecutionContext .fromExecutor(Executors .newFixedThreadPool(32 ))
4751 )
4852 val insert = for {
49- _ <- Stream .eval(sql " CREATE TABLE if not exists stream_cancel_test (i text) " .update.run.transact(xa))
50- _ <- Stream .eval(sql " truncate table stream_cancel_test " .update.run.transact(xa))
53+ _ <- Stream .eval(sql " CREATE TABLE if not exists ${testTable} (i text) " .update.run.transact(xa))
54+ _ <- Stream .eval(sql " truncate table ${testTable} " .update.run.transact(xa))
5155 _ <- Stream .eval(
52- sql " INSERT INTO stream_cancel_test select 1 from generate_series(1, $count) " .update.run.transact(xa))
56+ sql " INSERT INTO ${testTable} select 1 from generate_series(1, $count) " .update.run.transact(xa))
5357 } yield ()
5458
5559 insert.compile.drain.unsafeRunSync()
@@ -66,15 +70,15 @@ class StreamPrefetchSuite extends munit.FunSuite {
6670 ExecutionContext .fromExecutor(Executors .newFixedThreadPool(32 ))
6771 )
6872
69- val streamLargerBuffer = fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(200 ).transact(xa)
73+ val streamLargerBuffer = fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(200 ).transact(xa)
7074 .evalMap(_ => fr " select 1 " .query[Int ].unique.transact(xa))
7175 .compile.count
7276
7377 assertEquals(streamLargerBuffer.unsafeRunSync(), count.toLong)
7478 }
7579
7680 test(" Connection is not returned after consuming only 1 chunk, if chunk size is smaller than result count" ) {
77- val streamSmallerBuffer = fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(10 ).transact(xa)
81+ val streamSmallerBuffer = fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(10 ).transact(xa)
7882 .evalMap(_ => fr " select 1 " .query[Int ].unique.transact(xa))
7983 .compile.count
8084
@@ -92,7 +96,7 @@ class StreamPrefetchSuite extends munit.FunSuite {
9296 val earlyClose = new AtomicBoolean (false )
9397
9498 val streamSmallerBufferValid =
95- fr " select * from stream_cancel_test " .query[Int ].streamWithChunkSize(10 ).transact(xaCopy)
99+ fr " select * from ${testTable} " .query[Int ].streamWithChunkSize(10 ).transact(xaCopy)
96100 .evalMap { _ => IO { if (hasClosed.get()) earlyClose.set(true ) } >> IO .sleep(10 .milliseconds) }
97101 .compile.count
98102
0 commit comments