Skip to content

Commit 83e0263

Browse files
Introduce partition mapping to kafka-backed persistence module (#718)
1 parent 760d9f0 commit 83e0263

File tree

4 files changed

+87
-13
lines changed

4 files changed

+87
-13
lines changed

persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModule.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,21 +82,33 @@ object KafkaPersistenceModule {
8282
producer: Producer[F],
8383
consumerConfig: ConsumerConfig,
8484
snapshotTopicPartition: TopicPartition,
85-
metrics: FlowMetrics[F]
85+
metrics: FlowMetrics[F],
86+
partitionMapper: KafkaPersistencePartitionMapper = KafkaPersistencePartitionMapper.identity,
8687
)(
8788
implicit fromBytesKey: FromBytes[F, String],
8889
fromBytesState: FromBytes[F, S],
8990
toBytesState: ToBytes[F, S]
9091
): Resource[F, KafkaPersistenceModule[F, S]] = {
9192
implicit val fromTry: FromTry[F] = FromTry.lift
9293

93-
def readPartitionData(implicit log: Log[F]): F[BytesByKey] =
94-
KafkaPartitionPersistence.readSnapshots[F](
95-
consumerOf = consumerOf,
96-
consumerConfig = consumerConfig,
97-
snapshotTopic = snapshotTopicPartition.topic,
98-
partition = snapshotTopicPartition.partition
99-
)
94+
def readPartitionData(implicit log: Log[F]): F[BytesByKey] = {
95+
val targetPartition = partitionMapper.getStatePartition(snapshotTopicPartition.partition)
96+
KafkaPartitionPersistence
97+
.readSnapshots[F](
98+
consumerOf = consumerOf,
99+
consumerConfig = consumerConfig,
100+
snapshotTopic = snapshotTopicPartition.topic,
101+
partition = targetPartition
102+
)
103+
.map { snapshots =>
104+
snapshots
105+
.view
106+
.filterKeys { key =>
107+
partitionMapper.isStateKeyOwned(key, snapshotTopicPartition.partition)
108+
}
109+
.toMap
110+
}
111+
}
100112

101113
def makeKeysOf(cache: Cache[F, String, ByteVector]): F[KeysOf[F, KafkaKey]] = {
102114
LogOf[F].apply(classOf[KeysOf[F, KafkaKey]]).map { implicit log =>
@@ -127,7 +139,7 @@ object KafkaPersistenceModule {
127139

128140
val snapshotDatabase = SnapshotDatabase(
129141
read = read,
130-
write = KafkaSnapshotWriteDatabase.of[F, S](snapshotTopicPartition, producer)
142+
write = KafkaSnapshotWriteDatabase.of[F, S](snapshotTopicPartition, producer, partitionMapper)
131143
).withMetricsK(metrics.snapshotDatabaseMetrics)
132144

133145
PersistenceOf.snapshotsOnly[F, KafkaKey, S, ConsumerRecord[String, ByteVector]](

persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModuleOf.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ object KafkaPersistenceModuleOf {
3636
producer: Producer[F],
3737
consumerConfig: ConsumerConfig,
3838
snapshotTopic: Topic,
39-
metrics: FlowMetrics[F]
39+
metrics: FlowMetrics[F],
40+
partitionMapper: KafkaPersistencePartitionMapper = KafkaPersistencePartitionMapper.identity,
4041
)(
4142
implicit fromBytesKey: FromBytes[F, String],
4243
fromBytesState: FromBytes[F, S],
@@ -47,7 +48,8 @@ object KafkaPersistenceModuleOf {
4748
producer = producer,
4849
consumerConfig = consumerConfig,
4950
snapshotTopicPartition = TopicPartition(snapshotTopic, partition),
50-
metrics = metrics
51+
metrics = metrics,
52+
partitionMapper = partitionMapper,
5153
)
5254
}
5355

@@ -60,6 +62,7 @@ object KafkaPersistenceModuleOf {
6062
implicit fromBytesKey: FromBytes[F, String],
6163
fromBytesState: FromBytes[F, S],
6264
toBytesState: ToBytes[F, S]
63-
): KafkaPersistenceModuleOf[F, S] = caching(consumerOf, producer, consumerConfig, snapshotTopic, FlowMetrics.empty[F])
65+
): KafkaPersistenceModuleOf[F, S] =
66+
caching(consumerOf, producer, consumerConfig, snapshotTopic, FlowMetrics.empty[F])
6467

6568
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.evolutiongaming.kafka.flow.kafkapersistence
2+
3+
import com.evolutiongaming.skafka.Partition
4+
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner
5+
6+
/** Maps partitions of source Kafka topics into persistence topics.
7+
*
8+
* Please be careful when using this with [[com.evolutiongaming.kafka.flow.RemapKey]]. Only the identity mapper is
9+
* guaranteed to work properly with an arbitrary `RemapKey`, for other combinations you have to manually ensure that
10+
* the [[isStateKeyOwned]] implementation is correct and will not allow duplicate KeyFlows.
11+
*
12+
* If the aggregate key depends on the record's contents, then only the identity mapper can be used.
13+
*/
14+
trait KafkaPersistencePartitionMapper {
15+
16+
/** Called after rebalance or initial partition assignment.
17+
* @param sourcePartition
18+
* partition of the input stream, i.e. the kafka-journal topic.
19+
* @return
20+
* partition of the persistence topic that has snapshots for aggregates built by events from the `sourcePartition`.
21+
*/
22+
def getStatePartition(sourcePartition: Partition): Partition
23+
24+
/** Checks if the aggregate in the state partition should be initialized as a
25+
* [[com.evolutiongaming.kafka.flow.KeyFlow]].
26+
*
27+
* If the aggregate is initialized, it will have timers and ticks started. This is not desirable if the aggregate is
28+
* actually sourced from a different partition, which will also be started concurrently.
29+
* @param stateKey
30+
* the aggregate's key.
31+
* @param sourcePartition
32+
* partition of the input stream.
33+
* @return
34+
* `true` if the aggregate is built from events in `sourcePartition`.
35+
*/
36+
def isStateKeyOwned(stateKey: String, sourcePartition: Partition): Boolean
37+
}
38+
39+
object KafkaPersistencePartitionMapper {
40+
def identity: KafkaPersistencePartitionMapper = Identity
41+
def modulo(sourcePartitions: Int, statePartitions: Int): KafkaPersistencePartitionMapper =
42+
new Modulo(sourcePartitions, statePartitions)
43+
44+
private object Identity extends KafkaPersistencePartitionMapper {
45+
override def getStatePartition(sourcePartition: Partition): Partition = sourcePartition
46+
47+
override def isStateKeyOwned(stateKey: String, sourcePartition: Partition): Boolean = true
48+
}
49+
50+
private class Modulo(sourcePartitions: Int, statePartitions: Int) extends KafkaPersistencePartitionMapper {
51+
override def getStatePartition(sourcePartition: Partition): Partition =
52+
Partition.unsafe(sourcePartition.value % statePartitions)
53+
54+
override def isStateKeyOwned(stateKey: String, sourcePartition: Partition): Boolean =
55+
BuiltInPartitioner.partitionForKey(stateKey.getBytes, sourcePartitions) == sourcePartition.value
56+
}
57+
}

persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaSnapshotWriteDatabase.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ object KafkaSnapshotWriteDatabase {
1212
def of[F[_]: FromTry: Monad, S: ToBytes[F, *]](
1313
snapshotTopicPartition: TopicPartition,
1414
producer: Producer[F],
15+
partitionMapper: KafkaPersistencePartitionMapper = KafkaPersistencePartitionMapper.identity,
1516
): SnapshotWriteDatabase[F, KafkaKey, S] = new SnapshotWriteDatabase[F, KafkaKey, S] {
1617
override def persist(key: KafkaKey, snapshot: S): F[Unit] = produce(key, snapshot.some)
1718

1819
override def delete(key: KafkaKey): F[Unit] = produce(key, none)
1920

2021
private def produce(key: KafkaKey, snapshot: Option[S]): F[Unit] = {
22+
val targetPartition = partitionMapper.getStatePartition(key.topicPartition.partition)
2123
val record = new ProducerRecord(
2224
topic = snapshotTopicPartition.topic,
23-
partition = snapshotTopicPartition.partition.some,
25+
partition = targetPartition.some,
2426
key = key.key.some,
2527
value = snapshot
2628
)

0 commit comments

Comments
 (0)