Skip to content

Commit 9f1048e

Browse files
author
Vladislav Filatov
committed
Custom schema for CassandraSnapshots
1 parent 2c0a4a4 commit 9f1048e

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/CassandraSnapshots.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,39 @@ object CassandraSnapshots {
8282
)(
8383
implicit fromBytes: FromBytes[F, T],
8484
toBytes: ToBytes[F, T]
85+
): F[SnapshotDatabase[F, KafkaKey, KafkaSnapshot[T]]] =
86+
withCustomSchema(SnapshotSchema.of(session, sync, tableName), session, consistencyOverrides, tableName, ttl)
87+
88+
/** Create table with a user defined schema for storing snapshots. If table already exists it will not be recreated.
89+
* Note that the table schema must be compatible with predefined queries for storing and retrieving snapshots data.
90+
*
91+
* @param snapshotSchema
92+
* Custom schema definition
93+
* @param session
94+
* Cassandra session to use for creating table
95+
* @param consistencyOverrides
96+
* overrides for read/write consistency levels for the snapshots table
97+
* @param tableName
98+
* name of the table to create. The default value is "snapshots_v2"
99+
* @param ttl
100+
* optional TTL to set on inserted records
101+
* @param fromBytes
102+
* deserializer function to convert array of bytes to the snapshot type T
103+
* @param toBytes
104+
* serializer function to convert the snapshot type T to array of bytes
105+
*/
106+
def withCustomSchema[F[_]: Async, T](
107+
snapshotSchema: SnapshotSchema[F],
108+
session: CassandraSession[F],
109+
consistencyOverrides: ConsistencyOverrides = ConsistencyOverrides.none,
110+
tableName: String = DefaultTableName,
111+
ttl: Option[FiniteDuration] = None,
112+
)(
113+
implicit fromBytes: FromBytes[F, T],
114+
toBytes: ToBytes[F, T]
85115
): F[SnapshotDatabase[F, KafkaKey, KafkaSnapshot[T]]] =
86116
for {
87-
_ <- SnapshotSchema.of(session, sync, tableName).create
117+
_ <- snapshotSchema.create
88118
getStatement <- Statements.prepareGet(session, tableName)
89119
persistStatement <- Statements.preparePersist(session, tableName, ttl)
90120
deleteStatement <- Statements.prepareDelete(session, tableName)

0 commit comments

Comments
 (0)