Skip to content

Commit 1445a02

Browse files
committed
introduce new factory methods to support logging keys' data when deleting; deprecate old methods
1 parent 3e77660 commit 1445a02

File tree

10 files changed

+51
-19
lines changed

10 files changed

+51
-19
lines changed

core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeyDatabase.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ trait KeyDatabase[F[_], K] {
2020

2121
def all(applicationId: String, groupId: String, topicPartition: TopicPartition): Stream[F, K]
2222

23-
def keysOf(implicit F: Monad[F], logOf: LogOf[F], logPrefix: LogPrefix[K]): F[KeysOf[F, K]] =
23+
@deprecated("Use `toKeysOf` instead", "5.0.6")
24+
def keysOf(implicit F: Monad[F], logOf: LogOf[F]): F[KeysOf[F, K]] =
2425
logOf(KeyDatabase.getClass) map { implicit log => KeysOf(this) }
2526

27+
def toKeysOf(implicit F: Monad[F], logOf: LogOf[F], logPrefix: LogPrefix[K]): F[KeysOf[F, K]] =
28+
logOf(KeyDatabase.getClass) map { implicit log => KeysOf.of(this) }
2629
}
2730
object KeyDatabase {
2831

core/src/main/scala/com/evolutiongaming/kafka/flow/key/Keys.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,24 @@ trait KeyWriter[F[_]] {
2424
object Keys {
2525

2626
/** Creates a buffer for a given writer */
27-
private[key] def apply[F[_]: Monad: Log, K: LogPrefix](
27+
@deprecated("Use `of` instead", "5.0.6")
28+
private[key] def apply[F[_]: Monad: Log, K](
29+
key: K,
30+
database: KeyDatabase[F, K]
31+
): Keys[F] = new Keys[F] {
32+
33+
def flush: F[Unit] = database.persist(key)
34+
35+
def delete(persist: Boolean): F[Unit] =
36+
if (persist) {
37+
database.delete(key) *> Log[F].info("deleted key")
38+
} else {
39+
().pure[F]
40+
}
41+
42+
}
43+
44+
private[key] def of[F[_]: Monad: Log, K: LogPrefix](
2845
key: K,
2946
database: KeyDatabase[F, K]
3047
): Keys[F] = new Keys[F] {

core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeysOf.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,30 @@ trait KeysOf[F[_], K] {
1717
}
1818
object KeysOf {
1919

20-
def memory[F[_]: Sync: Log, K: LogPrefix]: F[KeysOf[F, K]] =
21-
KeyDatabase.memory[F, K] map { database =>
22-
KeysOf(database)
23-
}
20+
@deprecated("Use another `memory1` instead", "5.0.6")
21+
def memory[F[_]: Sync: Log, K]: F[KeysOf[F, K]] =
22+
KeyDatabase.memory[F, K].map(database => KeysOf.apply(database))
23+
24+
def memory1[F[_]: Sync: Log, K: LogPrefix]: F[KeysOf[F, K]] =
25+
KeyDatabase.memory[F, K].map(database => KeysOf.of(database))
2426

2527
/** Creates `KeysOf` with a passed logger */
26-
def apply[F[_]: Monad: Log, K: LogPrefix](
28+
@deprecated("Use `of` instead", "5.0.6")
29+
def apply[F[_]: Monad: Log, K](
2730
database: KeyDatabase[F, K]
2831
): KeysOf[F, K] = new KeysOf[F, K] {
2932
def apply(key: K) = Keys(key, database)
3033
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
3134
database.all(applicationId, groupId, topicPartition)
3235
}
3336

37+
/** Creates `KeysOf` with a passed logger */
38+
def of[F[_]: Monad: Log, K: LogPrefix](
39+
database: KeyDatabase[F, K]
40+
): KeysOf[F, K] = new KeysOf[F, K] {
41+
def apply(key: K) = Keys.of(key, database)
42+
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
43+
database.all(applicationId, groupId, topicPartition)
44+
}
45+
3446
}

core/src/main/scala/com/evolutiongaming/kafka/flow/persistence/PersistenceModule.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ trait PersistenceModule[F[_], S] {
2323
implicit F: Sync[F],
2424
logOf: LogOf[F]
2525
): Resource[F, PersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
26-
keysOf <- Resource.eval(keys.keysOf)
26+
keysOf <- Resource.eval(keys.toKeysOf)
2727
journalsOf <- Resource.eval(journals.journalsOf)
2828
snapshotsOf <- Resource.eval(snapshots.snapshotsOf)
2929
persistenceOf <- PersistenceOf.restoreEvents(keysOf, journalsOf, snapshotsOf)
@@ -34,7 +34,7 @@ trait PersistenceModule[F[_], S] {
3434
implicit F: Sync[F],
3535
logOf: LogOf[F]
3636
): F[SnapshotPersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
37-
keysOf <- keys.keysOf
37+
keysOf <- keys.toKeysOf
3838
journalsOf <- journals.journalsOf
3939
snapshotsOf <- snapshots.snapshotsOf
4040
} yield PersistenceOf.restoreSnapshots(keysOf, journalsOf, snapshotsOf)
@@ -44,7 +44,7 @@ trait PersistenceModule[F[_], S] {
4444
implicit F: Sync[F],
4545
logOf: LogOf[F]
4646
): F[SnapshotPersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
47-
keysOf <- keys.keysOf
47+
keysOf <- keys.toKeysOf
4848
snapshotsOf <- snapshots.snapshotsOf
4949
} yield PersistenceOf.snapshotsOnly(keysOf, snapshotsOf)
5050

core/src/test/scala/com/evolutiongaming/kafka/flow/AdditionalPersistSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ object AdditionalPersistSpec {
299299

300300
def partitionFlow: Resource[IO, PartitionFlow[IO]] =
301301
for {
302-
keysOf <- Resource.eval(KeysOf.memory[IO, KafkaKey])
302+
keysOf <- Resource.eval(KeysOf.memory1[IO, KafkaKey])
303303
timersOf <- Resource.eval(TimersOf.memory[IO, KafkaKey])
304304
partitionFlow <- PartitionFlow.resource(
305305
topicPartition = TopicPartition.empty,

core/src/test/scala/com/evolutiongaming/kafka/flow/PartitionFlowSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ class PartitionFlowSpec extends FunSuite {
398398
logOf.apply(classOf[PartitionFlowSpec]).toResource.flatMap { implicit log =>
399399
val committedOffset = Ref.unsafe[IO, Offset](Offset.min)
400400
val keyStorage = Ref.unsafe[IO, Set[KafkaKey]](initialData.keySet)
401-
val keysOf = KeysOf.apply[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance))
401+
val keysOf = KeysOf.of[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance))
402402
val snapshotsStorage = Ref.unsafe[IO, Map[KafkaKey, String]](initialData)
403403
val persistenceOf =
404404
PersistenceOf
@@ -466,7 +466,7 @@ object PartitionFlowSpec {
466466

467467
type State = (Offset, Int)
468468

469-
val keysOf = KeysOf.memory[IO, String].unsafeRunSync()(IORuntime.global)
469+
val keysOf = KeysOf.memory1[IO, String].unsafeRunSync()(IORuntime.global)
470470
val journalsOf = JournalsOf.memory[IO, String, ConsumerRecord[String, ByteVector]].unsafeRunSync()(IORuntime.global)
471471
val snapshotsOf = SnapshotsOf.memory[IO, String, State].unsafeRunSync()(IORuntime.global)
472472
val (persistenceOf, _) =

core/src/test/scala/com/evolutiongaming/kafka/flow/key/KeysSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class KeysSpec extends FunSuite {
1414

1515
// Given("empty database")
1616
val database = KeyDatabase.memory(f.database)
17-
val keys = Keys("key1", database)
17+
val keys = Keys.of("key1", database)
1818

1919
// When("Keys is flushed")
2020
val program = keys.flush
@@ -32,7 +32,7 @@ class KeysSpec extends FunSuite {
3232

3333
// Given("database with contents")
3434
val database = KeyDatabase.memory(f.database)
35-
val snapshots = Keys("key1", database)
35+
val snapshots = Keys.of("key1", database)
3636
val context = Set("key1")
3737

3838
// When("delete is requested")
@@ -50,7 +50,7 @@ class KeysSpec extends FunSuite {
5050

5151
// Given("database with contents")
5252
val database = KeyDatabase.memory(f.database)
53-
val snapshots = Keys("key1", database)
53+
val snapshots = Keys.of("key1", database)
5454
val context = Set("key1")
5555

5656
// When("delete is requested")

core/src/test/scala/com/evolutiongaming/kafka/flow/registry/EntityRegistryTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class EntityRegistryTest extends FunSuite {
4545
)
4646

4747
val resource = for {
48-
keysOf <- KeysOf.memory[IO, KafkaKey].toResource
48+
keysOf <- KeysOf.memory1[IO, KafkaKey].toResource
4949
timersOf <- TimersOf.memory[IO, KafkaKey].toResource
5050
registry <- EntityRegistry.memory[IO, KafkaKey, Int].toResource
5151
partitionFlowOf = PartitionFlowOf.apply(

persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/FlowSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class FlowSpec extends CassandraSpec {
3333
)
3434
)
3535
timersOf <- Resource.eval(TimersOf.memory[IO, KafkaKey])
36-
keysOf <- Resource.eval(storage.keys.keysOf)
36+
keysOf <- Resource.eval(storage.keys.toKeysOf)
3737
persistenceOf <- storage.restoreEvents
3838
keyStateOf = KeyStateOf.eagerRecovery[IO, KafkaSnapshot[String]](
3939
applicationId = "FlowSpec",

persistence-kafka-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/StatefulProcessingWithKafkaSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class StatefulProcessingWithKafkaSpec extends ForAllKafkaSuite {
6767
val snapshotPersistenceOf: SnapshotPersistenceOf[IO, KafkaKey, State, ConsumerRecord[String, ByteVector]] =
6868
PersistenceOf.snapshotsOnly(keysOf, snapshotsOf)
6969

70-
override def keysOf: KeysOf[IO, KafkaKey] = KeysOf.memory[IO, KafkaKey].unsafeRunSync()
70+
override def keysOf: KeysOf[IO, KafkaKey] = KeysOf.memory1[IO, KafkaKey].unsafeRunSync()
7171
override def persistenceOf: SnapshotPersistenceOf[IO, KafkaKey, State, ConsumerRecord[String, ByteVector]] =
7272
snapshotPersistenceOf
7373
}

0 commit comments

Comments
 (0)