Skip to content

Commit a9538e2

Browse files
authored
Prepare CassandraJournals statements only once at the startup (#739)
1 parent 1066ce4 commit a9538e2

File tree

6 files changed

+111
-202
lines changed

6 files changed

+111
-202
lines changed

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/CassandraSessionStub.scala

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.datastax.driver.core.{Host, PreparedStatement, RegularStatement, Resu
77
import com.evolutiongaming.scassandra.CassandraSession
88

99
object CassandraSessionStub {
10-
10+
// Doesn't inject failures on statements preparation since we don't prepare them on each call.
1111
def injectFailures[F[_]](
1212
session: CassandraSession[F],
1313
failAfter: Ref[F, Int]
@@ -24,49 +24,6 @@ object CassandraSessionStub {
2424
override def init: F[Unit] = F.unit
2525
override def execute(query: String): F[ResultSet] = failed.ifM(fail(query), session.execute(query))
2626

27-
override def execute(query: String, values: Any*): F[ResultSet] =
28-
failed.ifM(fail(query), session.execute(query, values: _*))
29-
30-
override def execute(query: String, values: Map[String, AnyRef]): F[ResultSet] =
31-
failed.ifM(fail(query), session.execute(query, values))
32-
33-
override def execute(statement: Statement): F[ResultSet] =
34-
failed.ifM(fail(statement.toString), session.execute(statement))
35-
36-
override def prepare(query: String): F[PreparedStatement] =
37-
failed.ifM(fail(query), session.prepare(query))
38-
39-
override def prepare(statement: RegularStatement): F[PreparedStatement] =
40-
failed.ifM(fail(statement.toString), session.prepare(statement))
41-
42-
override def state: CassandraSession.State[F] = new CassandraSession.State[F] {
43-
override def connectedHosts: F[Iterable[Host]] = F.pure(Iterable.empty)
44-
override def openConnections(host: Host): F[Int] = F.pure(0)
45-
override def trashedConnections(host: Host): F[Int] = F.pure(0)
46-
override def inFlightQueries(host: Host): F[Int] = F.pure(0)
47-
}
48-
49-
}
50-
51-
// Doesn't inject failures on statements preparation since we want to avoid preparing them on each call.
52-
// This is a temporary method to be used in specs that test the classes that have already been migrated to
53-
// preparing statements in advance
54-
def injectFailuresOnExecute[F[_]](
55-
session: CassandraSession[F],
56-
failAfter: Ref[F, Int]
57-
)(implicit F: MonadThrow[F]): CassandraSession[F] = new CassandraSession[F] {
58-
def fail[T](query: String): F[T] = F.raiseError {
59-
new RuntimeException(s"CassandraSessionStub: failing after proper calls exhausted: $query")
60-
}
61-
62-
val failed = failAfter modify { failAfter =>
63-
(failAfter - 1, failAfter <= 0)
64-
}
65-
66-
override def loggedKeyspace: F[Option[String]] = F.pure(None)
67-
override def init: F[Unit] = F.unit
68-
override def execute(query: String): F[ResultSet] = failed.ifM(fail(query), session.execute(query))
69-
7027
override def execute(query: String, values: Any*): F[ResultSet] =
7128
failed.ifM(fail(query), session.execute(query, values: _*))
7229

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/FlowSpec.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import scala.concurrent.duration.*
2020
class FlowSpec extends CassandraSpec {
2121

2222
test("flow fails when Cassandra insert fails") {
23-
val flow = for {
23+
val flow: Resource[IO, IO[Unit]] = for {
2424
failAfter <- Resource.eval(Ref.of[IO, Int](10000))
2525
session = CassandraSessionStub.injectFailures(cassandra().session, failAfter)
2626
storage <- Resource.eval(
@@ -78,11 +78,7 @@ class FlowSpec extends CassandraSpec {
7878
}
7979
} yield join
8080

81-
val test: IO[Unit] = flow use { join =>
82-
join.attempt map { result =>
83-
assert(clue(result.isLeft))
84-
}
85-
}
81+
val test: IO[Unit] = flow.use(join => join.attempt.map(result => assert(clue(result.isLeft))))
8682

8783
test.unsafeRunSync()
8884
}

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class JournalSpec extends CassandraSpec {
4747
failAfter <- Ref.of[IO, Int](100)
4848
session = CassandraSessionStub.injectFailures(cassandra().session, failAfter)
4949
journals <- CassandraJournals.withSchema(session, cassandra().sync)
50-
_ <- failAfter.set(1)
50+
_ <- failAfter.set(0) // fail immediately on the first read attempt
5151
records <- journals.get(key).toList.attempt
5252
} yield assert(clue(records.isLeft))
5353

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class KeySpec extends CassandraSpec {
5252
test("failures") {
5353
val test: IO[Unit] = for {
5454
failAfter <- Ref.of[IO, Int](100)
55-
session = CassandraSessionStub.injectFailuresOnExecute(cassandra().session, failAfter)
55+
session = CassandraSessionStub.injectFailures(cassandra().session, failAfter)
5656
keys <- CassandraKeys.withSchema(
5757
session,
5858
cassandra().sync,

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class SnapshotSpec extends CassandraSpec {
3636
val key = KafkaKey("SnapshotSpec", "integration-tests-1", TopicPartition.empty, "queries")
3737
val test: IO[Unit] = for {
3838
failAfter <- Ref.of[IO, Int](100)
39-
session = CassandraSessionStub.injectFailuresOnExecute(cassandra().session, failAfter)
39+
session = CassandraSessionStub.injectFailures(cassandra().session, failAfter)
4040
snapshots <- CassandraSnapshots.withSchema[IO, String](session, cassandra().sync)
4141
_ <- failAfter.set(0) // fail immediately on the first read attempt
4242
snapshots <- snapshots.get(key).attempt

0 commit comments

Comments
 (0)