Skip to content

Commit a42ca3f

Browse files
committed
add ability to specify Cassandra table names for Cassandra-based
persistence
1 parent 464fd7b commit a42ca3f

File tree

10 files changed

+505
-245
lines changed

10 files changed

+505
-245
lines changed

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSchemaSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class JournalSchemaSpec extends CassandraSpec {
1212
test("table is created using scassandra session API") {
1313
val session = cassandra().session
1414
val sync = cassandra().sync
15-
val schema = JournalSchema.of(session, sync)
15+
val schema = JournalSchema.of(session, sync, CassandraJournals.DefaultTableName)
1616

1717
val test = for {
1818
_ <- schema.create
@@ -26,7 +26,7 @@ class JournalSchemaSpec extends CassandraSpec {
2626
val session = cassandra().session
2727
val sync = cassandra().sync
2828

29-
val schema = JournalSchema.of(session, sync)
29+
val schema = JournalSchema.of(session, sync, CassandraJournals.DefaultTableName)
3030

3131
val test = for {
3232
_ <- schema.create

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySchemaSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class KeySchemaSpec extends CassandraSpec {
1313
val session = cassandra().session
1414
val sync = cassandra().sync
1515

16-
val keySchema = KeySchema.of(session, sync)
16+
val keySchema = KeySchema.of(session, sync, CassandraKeys.DefaultTableName)
1717

1818
val test = for {
1919
_ <- keySchema.create
@@ -27,7 +27,7 @@ class KeySchemaSpec extends CassandraSpec {
2727
val session = cassandra().session
2828
val sync = cassandra().sync
2929

30-
val keySchema = KeySchema.of(session, sync)
30+
val keySchema = KeySchema.of(session, sync, CassandraKeys.DefaultTableName)
3131

3232
val test = for {
3333
_ <- keySchema.create

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchemaSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class SnapshotSchemaSpec extends CassandraSpec {
1212
test("table is created using scassandra session API") {
1313
val session = cassandra().session
1414
val sync = cassandra().sync
15-
val schema = SnapshotSchema.of(session, sync)
15+
val schema = SnapshotSchema.of(session, sync, CassandraSnapshots.DefaultTableName)
1616

1717
val test = for {
1818
_ <- schema.create
@@ -26,7 +26,7 @@ class SnapshotSchemaSpec extends CassandraSpec {
2626
val session = cassandra().session
2727
val sync = cassandra().sync
2828

29-
val schema = SnapshotSchema.of(session, sync)
29+
val schema = SnapshotSchema.of(session, sync, CassandraSnapshots.DefaultTableName)
3030

3131
val test = for {
3232
_ <- schema.create

persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraPersistence.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object CassandraPersistence {
5656
session: scassandra.CassandraSession[F],
5757
sync: CassandraSync[F]
5858
): F[Unit] =
59-
CassandraKeys.truncate(session, sync) *>
59+
CassandraKeys.truncate(session, sync, CassandraKeys.DefaultTableName) *>
6060
CassandraJournals.truncate(session, sync) *>
61-
CassandraSnapshots.truncate(session, sync)
61+
CassandraSnapshots.truncate(session, sync, CassandraSnapshots.DefaultTableName)
6262
}

persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/CassandraJournals.scala

Lines changed: 111 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import com.evolutiongaming.kafka.flow.cassandra.CassandraCodecs.*
1111
import com.evolutiongaming.kafka.flow.cassandra.ConsistencyOverrides
1212
import com.evolutiongaming.kafka.flow.cassandra.StatementHelper.StatementOps
1313
import com.evolutiongaming.kafka.flow.journal.conversions.{HeaderToTuple, TupleToHeader}
14-
import com.evolutiongaming.scassandra
14+
import com.evolutiongaming.scassandra.CassandraSession
1515
import com.evolutiongaming.scassandra.StreamingCassandraSession.*
1616
import com.evolutiongaming.scassandra.syntax.*
1717
import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize}
@@ -24,20 +24,24 @@ import java.time.Instant
2424
import CassandraJournals.*
2525

2626
class CassandraJournals[F[_]: Async](
27-
session: scassandra.CassandraSession[F],
28-
consistencyOverrides: ConsistencyOverrides = ConsistencyOverrides.none
27+
session: CassandraSession[F],
28+
consistencyOverrides: ConsistencyOverrides = ConsistencyOverrides.none,
29+
tableName: String,
2930
) extends JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]] {
3031

32+
def this(session: CassandraSession[F], consistencyOverrides: ConsistencyOverrides) =
33+
this(session, consistencyOverrides, DefaultTableName)
34+
3135
def persist(key: KafkaKey, event: ConsumerRecord[String, ByteVector]): F[Unit] =
3236
for {
33-
boundStatement <- Statements.persist(session, key, event)
37+
boundStatement <- Statements.persist(session, key, event, tableName)
3438
statement = boundStatement.withConsistencyLevel(consistencyOverrides.write)
3539
_ <- session.execute(statement).void
3640
} yield ()
3741

3842
def get(key: KafkaKey): Stream[F, ConsumerRecord[String, ByteVector]] = {
3943
val boundStatement = Statements
40-
.get(session, key)
44+
.get(session, key, tableName)
4145
.map(_.withConsistencyLevel(consistencyOverrides.read))
4246

4347
Stream.lift(boundStatement).flatMap(session.executeStream(_)).mapM { row =>
@@ -47,30 +51,50 @@ class CassandraJournals[F[_]: Async](
4751

4852
def delete(key: KafkaKey): F[Unit] =
4953
for {
50-
boundStatement <- Statements.delete(session, key)
54+
boundStatement <- Statements.delete(session, key, tableName)
5155
statement = boundStatement.withConsistencyLevel(consistencyOverrides.write)
5256
_ <- session.execute(statement).void
5357
} yield ()
5458

5559
}
5660
object CassandraJournals {
61+
62+
val DefaultTableName: String = "records"
63+
64+
def withSchema[F[_]: Async](
65+
session: CassandraSession[F],
66+
sync: CassandraSync[F],
67+
consistencyOverrides: ConsistencyOverrides,
68+
tableName: String,
69+
): F[JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]]] =
70+
JournalSchema
71+
.of(session, sync, tableName)
72+
.create
73+
.as(new CassandraJournals(session, consistencyOverrides, tableName))
74+
5775
def withSchema[F[_]: Async](
58-
session: scassandra.CassandraSession[F],
76+
session: CassandraSession[F],
5977
sync: CassandraSync[F],
60-
consistencyOverrides: ConsistencyOverrides
78+
consistencyOverrides: ConsistencyOverrides,
6179
): F[JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]]] =
62-
JournalSchema.of(session, sync).create as new CassandraJournals(session, consistencyOverrides)
80+
withSchema(session, sync, consistencyOverrides, DefaultTableName)
6381

6482
def withSchema[F[_]: Async](
65-
session: scassandra.CassandraSession[F],
83+
session: CassandraSession[F],
6684
sync: CassandraSync[F],
6785
): F[JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]]] =
68-
withSchema(session, sync, ConsistencyOverrides.none)
86+
withSchema(session, sync, ConsistencyOverrides.none, DefaultTableName)
6987

7088
def truncate[F[_]: Monad](
71-
session: scassandra.CassandraSession[F],
72-
sync: CassandraSync[F]
73-
): F[Unit] = JournalSchema.of(session, sync).truncate
89+
session: CassandraSession[F],
90+
sync: CassandraSync[F],
91+
tableName: String,
92+
): F[Unit] = JournalSchema.of(session, sync, tableName).truncate
93+
94+
def truncate[F[_]: Monad](
95+
session: CassandraSession[F],
96+
sync: CassandraSync[F],
97+
): F[Unit] = truncate(session, sync, DefaultTableName)
7498

7599
// we cannot use DecodeRow here because TupleToHeader is effectful
76100
protected def decode[F[_]: MonadThrow](key: KafkaKey, row: Row): F[ConsumerRecord[String, ByteVector]] = {
@@ -95,26 +119,34 @@ object CassandraJournals {
95119
}
96120

97121
protected object Statements {
98-
def get[F[_]: Monad](session: scassandra.CassandraSession[F], key: KafkaKey): F[BoundStatement] =
122+
@deprecated(
123+
"Use the version with an explicit table name. This exists to preserve binary compatibility until the next major release",
124+
since = "6.1.3"
125+
)
126+
def get[F[_]: Monad](session: CassandraSession[F], key: KafkaKey): F[BoundStatement] =
127+
get(session, key, DefaultTableName)
128+
129+
def get[F[_]: Monad](session: CassandraSession[F], key: KafkaKey, tableName: String): F[BoundStatement] =
99130
session
100131
.prepare(
101-
""" SELECT
102-
| offset,
103-
| created,
104-
| timestamp,
105-
| timestamp_type,
106-
| headers,
107-
| metadata,
108-
| value
109-
| FROM
110-
| records
111-
| WHERE
112-
| application_id = :application_id
113-
| AND group_id = :group_id
114-
| AND topic = :topic
115-
| AND partition = :partition
116-
| AND key = :key
117-
| ORDER BY offset
132+
s"""
133+
|SELECT
134+
| offset,
135+
| created,
136+
| timestamp,
137+
| timestamp_type,
138+
| headers,
139+
| metadata,
140+
| value
141+
|FROM
142+
| $tableName
143+
|WHERE
144+
| application_id = :application_id
145+
| AND group_id = :group_id
146+
| AND topic = :topic
147+
| AND partition = :partition
148+
| AND key = :key
149+
|ORDER BY offset
118150
""".stripMargin
119151
)
120152
.map(
@@ -126,28 +158,40 @@ object CassandraJournals {
126158
.encode("key", key.key)
127159
)
128160

161+
@deprecated(
162+
"Use the version with an explicit table name. This exists to preserve binary compatibility until the next major release",
163+
since = "6.1.3"
164+
)
129165
def persist[F[_]: MonadThrow: Clock](
130-
session: scassandra.CassandraSession[F],
166+
session: CassandraSession[F],
131167
key: KafkaKey,
132-
event: ConsumerRecord[String, ByteVector]
168+
event: ConsumerRecord[String, ByteVector],
169+
): F[BoundStatement] = persist(session, key, event, DefaultTableName)
170+
171+
def persist[F[_]: MonadThrow: Clock](
172+
session: CassandraSession[F],
173+
key: KafkaKey,
174+
event: ConsumerRecord[String, ByteVector],
175+
tableName: String,
133176
): F[BoundStatement] = for {
134177
preparedStatement <- session.prepare(
135-
""" UPDATE
136-
| records
137-
| SET
138-
| created = :created,
139-
| timestamp = :timestamp,
140-
| timestamp_type = :timestamp_type,
141-
| headers = :headers,
142-
| metadata = :metadata,
143-
| value = :value
144-
| WHERE
145-
| application_id = :application_id
146-
| AND group_id = :group_id
147-
| AND topic = :topic
148-
| AND partition = :partition
149-
| AND key = :key
150-
| AND offset = :offset
178+
s"""
179+
|UPDATE
180+
| $tableName
181+
|SET
182+
| created = :created,
183+
| timestamp = :timestamp,
184+
| timestamp_type = :timestamp_type,
185+
| headers = :headers,
186+
| metadata = :metadata,
187+
| value = :value
188+
|WHERE
189+
| application_id = :application_id
190+
| AND group_id = :group_id
191+
| AND topic = :topic
192+
| AND partition = :partition
193+
| AND key = :key
194+
| AND offset = :offset
151195
""".stripMargin
152196
)
153197

@@ -170,17 +214,25 @@ object CassandraJournals {
170214
.encodeSome("value", event.value map (_.value))
171215
}
172216

173-
def delete[F[_]: Monad](session: scassandra.CassandraSession[F], key: KafkaKey): F[BoundStatement] =
217+
@deprecated(
218+
"Use the version with an explicit table name. This exists to preserve binary compatibility until the next major release",
219+
since = "6.1.3"
220+
)
221+
def delete[F[_]: Monad](session: CassandraSession[F], key: KafkaKey): F[BoundStatement] =
222+
delete(session, key, DefaultTableName)
223+
224+
def delete[F[_]: Monad](session: CassandraSession[F], key: KafkaKey, tableName: String): F[BoundStatement] =
174225
session
175226
.prepare(
176-
""" DELETE FROM
177-
| records
178-
| WHERE
179-
| application_id = :application_id
180-
| AND group_id = :group_id
181-
| AND topic = :topic
182-
| AND partition = :partition
183-
| AND key = :key
227+
s"""
228+
|DELETE FROM
229+
| $tableName
230+
|WHERE
231+
| application_id = :application_id
232+
| AND group_id = :group_id
233+
| AND topic = :topic
234+
| AND partition = :partition
235+
| AND key = :key
184236
""".stripMargin
185237
)
186238
.map(

persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/JournalSchema.scala

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.evolutiongaming.kafka.flow.journal
33
import cats.Monad
44
import cats.syntax.all.*
55
import com.evolutiongaming.cassandra.sync.CassandraSync
6-
import com.evolutiongaming.scassandra
6+
import com.evolutiongaming.scassandra.CassandraSession
77

88
trait JournalSchema[F[_]] {
99
def create: F[Unit]
@@ -12,35 +12,46 @@ trait JournalSchema[F[_]] {
1212
}
1313

1414
object JournalSchema {
15+
@deprecated(
16+
"Use the version with an explicit table name. This exists to preserve binary compatibility until the next major release",
17+
since = "6.1.3"
18+
)
1519
def of[F[_]: Monad](
16-
session: scassandra.CassandraSession[F],
17-
synchronize: CassandraSync[F]
20+
session: CassandraSession[F],
21+
synchronize: CassandraSync[F],
22+
): JournalSchema[F] = of(session, synchronize, CassandraJournals.DefaultTableName)
23+
24+
def of[F[_]: Monad](
25+
session: CassandraSession[F],
26+
synchronize: CassandraSync[F],
27+
tableName: String,
1828
): JournalSchema[F] = new JournalSchema[F] {
1929
def create: F[Unit] = synchronize("JournalSchema") {
2030
session
2131
.execute(
22-
"""CREATE TABLE IF NOT EXISTS records(
23-
|application_id TEXT,
24-
|group_id TEXT,
25-
|topic TEXT,
26-
|partition INT,
27-
|key TEXT,
28-
|offset BIGINT,
29-
|created TIMESTAMP,
30-
|timestamp TIMESTAMP,
31-
|timestamp_type TEXT,
32-
|headers MAP<TEXT, TEXT>,
33-
|metadata TEXT,
34-
|value BLOB,
35-
|PRIMARY KEY((application_id, group_id, topic, partition, key), offset)
32+
s"""
33+
|CREATE TABLE IF NOT EXISTS $tableName (
34+
| application_id TEXT,
35+
| group_id TEXT,
36+
| topic TEXT,
37+
| partition INT,
38+
| key TEXT,
39+
| offset BIGINT,
40+
| created TIMESTAMP,
41+
| timestamp TIMESTAMP,
42+
| timestamp_type TEXT,
43+
| headers MAP<TEXT, TEXT>,
44+
| metadata TEXT,
45+
| value BLOB,
46+
| PRIMARY KEY((application_id, group_id, topic, partition, key), offset)
3647
|)
3748
|""".stripMargin
3849
)
3950
.void
4051
}
4152

4253
def truncate: F[Unit] = synchronize("JournalSchema") {
43-
session.execute("TRUNCATE records").void
54+
session.execute(s"TRUNCATE $tableName").void
4455
}
4556
}
4657

0 commit comments

Comments
 (0)