From 0b8a879421036f80c8820d2f304bbc3751f4ebbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mareks=20Ramp=C4=81ns?= <8796159+mr-git@users.noreply.github.com> Date: Wed, 6 Nov 2024 15:17:06 +0200 Subject: [PATCH 1/3] make version non-optional --- .../kafka/journal/Version.scala | 3 ++ .../cassandra/JournalStatements.scala | 6 +-- .../cassandra/EventualCassandraTest.scala | 2 +- .../cassandra/ReplicatedCassandraTest.scala | 2 +- .../kafka/journal/Action.scala | 10 ++--- .../kafka/journal/ActionHeader.scala | 44 +++++-------------- .../kafka/journal/EventRecord.scala | 2 +- .../kafka/journal/Produce.scala | 8 ++-- .../kafka/journal/Delete-None.json | 3 +- .../kafka/journal/Mark-origin.json | 3 +- .../kafka/journal/Purge-origin.json | 3 +- .../kafka/journal/ActionHeaderJsonSpec.scala | 28 +++++++++--- .../journal/ActionToProducerRecordSpec.scala | 2 +- .../kafka/journal/HeadCacheSpec.scala | 4 +- .../kafka/journal/HeadInfoSpec.scala | 8 ++-- .../kafka/journal/PartitionCacheSpec.scala | 6 +-- .../journal/StreamActionRecordsSpec.scala | 4 +- .../eventual/EventualJournalSpec.scala | 2 +- notes.md | 2 +- .../kafka/journal/JournalAdapterSpec.scala | 2 +- .../kafka/journal/replicator/Batch.scala | 2 +- .../journal/replicator/ReplicateRecords.scala | 21 ++++----- .../kafka/journal/replicator/BatchSpec.scala | 12 ++--- .../replicator/TopicReplicatorSpec.scala | 10 ++--- .../kafka/journal/JournalIntSpec.scala | 6 +-- .../replicator/ReplicatorIntSpec.scala | 2 +- 26 files changed, 96 insertions(+), 101 deletions(-) diff --git a/core/src/main/scala/com/evolutiongaming/kafka/journal/Version.scala b/core/src/main/scala/com/evolutiongaming/kafka/journal/Version.scala index e2f4208a3..67e0022d2 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/journal/Version.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/journal/Version.scala @@ -14,6 +14,9 @@ object Version { val current: Version = Version(Option(Version.getClass.getPackage.getImplementationVersion).getOrElse("unknown")) + /** The last release before [[Version]] was introduced, should be used only as fallback during data recovery */ + val obsolete: Version = Version("0.0.152") + implicit val eqVersion: Eq[Version] = Eq.fromUniversalEquals implicit val showVersion: Show[Version] = Show.fromToString diff --git a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/JournalStatements.scala b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/JournalStatements.scala index a92f98a90..ffb811de6 100644 --- a/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/JournalStatements.scala +++ b/eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/JournalStatements.scala @@ -122,7 +122,7 @@ private[journal] object JournalStatements { .encode(record.event.partitionOffset) .encode("timestamp", record.event.timestamp) .encodeSome(record.event.origin) - .encodeSome(record.event.version) + .encode(record.event.version) .encode("tags", record.event.event.tags) .encodeSome("meta_record_id", record.metaRecordId) .encodeSome("payload_type", payloadType) @@ -189,7 +189,7 @@ private[journal] object JournalStatements { } yield { new SelectRecords[F] { - def apply(key: Key, segment: SegmentNr, range: SeqRange) = { + def apply(key: Key, segment: SegmentNr, range: SeqRange): Stream[F, JournalRecord] = { def readPayload(row: Row): Option[EventualPayloadAndType] = { val payloadType = row.decode[Option[PayloadType]]("payload_type") @@ -226,7 +226,7 @@ private[journal] object JournalStatements { event = event, timestamp = row.decode[Instant]("timestamp"), origin = row.decode[Option[Origin]], - version = row.decode[Option[Version]], + version = row.decode[Option[Version]].getOrElse(Version.obsolete), partitionOffset = partitionOffset, metadata = metadata, headers = headers, diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala index d7ea344bf..d0dd0b1e4 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala @@ -42,7 +42,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers { timestamp = timestamp0, partitionOffset = partitionOffset, origin = origin.some, - version = version.some, + version = version, metadata = RecordMetadata(HeaderMetadata(Json.obj(("key", "value")).some), PayloadMetadata.empty), headers = Headers(("key", "value")), ) diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala index 13bbb5d08..217083b69 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala @@ -47,7 +47,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers { timestamp = timestamp0, partitionOffset = partitionOffset, origin = origin.some, - version = version.some, + version = version, metadata = RecordMetadata(HeaderMetadata(Json.obj(("key", "value")).some), PayloadMetadata.empty), headers = Headers(("key", "value")), ) diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Action.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Action.scala index 6234acb5d..dc9d96fcb 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Action.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Action.scala @@ -17,7 +17,7 @@ sealed abstract class Action extends Product { def origin: Option[Origin] = header.origin - def version: Option[Version] = header.version + def version: Version = header.version } object Action { @@ -79,7 +79,7 @@ object Action { key: Key, timestamp: Instant, origin: Option[Origin], - version: Option[Version], + version: Version, events: Events[A], metadata: HeaderMetadata, headers: Headers, @@ -120,7 +120,7 @@ object Action { timestamp: Instant, to: DeleteTo, origin: Option[Origin], - version: Option[Version], + version: Version, ): Delete = { val header = ActionHeader.Delete(to, origin, version) Delete(key, timestamp, header) @@ -135,7 +135,7 @@ object Action { object Purge { - def apply(key: Key, timestamp: Instant, origin: Option[Origin], version: Option[Version]): Purge = { + def apply(key: Key, timestamp: Instant, origin: Option[Origin], version: Version): Purge = { val header = ActionHeader.Purge(origin, version) Purge(key, timestamp, header) } @@ -157,7 +157,7 @@ object Action { timestamp: Instant, id: String, origin: Option[Origin], - version: Option[Version], + version: Version, ): Mark = { val header = ActionHeader.Mark(id, origin, version) Mark(key, timestamp, header) diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ActionHeader.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ActionHeader.scala index 6da698a8c..5c20f366f 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/ActionHeader.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/ActionHeader.scala @@ -9,7 +9,7 @@ sealed abstract class ActionHeader extends Product { def origin: Option[Origin] - def version: Option[Version] + def version: Version } object ActionHeader { @@ -18,34 +18,10 @@ object ActionHeader { implicit val formatOptActionHeader: OFormat[Option[ActionHeader]] = { - val appendFormat = { - val format = Json.format[Append] - val reads = format orElse new Reads[Append] { - def reads(json: JsValue) = { - - def metadata = { - (json \ "metadata").validate[JsObject] match { - case JsSuccess(a, _) => a.validate[HeaderMetadata] - case _: JsError => HeaderMetadata.empty.pure[JsResult] - } - } - - for { - range <- (json \ "range").validate[SeqRange] - origin <- (json \ "origin").validateOpt[Origin] - version <- (json \ "version").validateOpt[Version] - payloadType <- (json \ "payloadType").validate[PayloadType.BinaryOrJson] - metadata <- metadata - } yield { - Append(range, origin, version, payloadType, metadata) - } - } - } - OFormat(reads, format) - } - val deleteFormat = Json.format[Delete] - val purgeFormat = Json.format[Purge] - val readFormat = Json.format[Mark] + val appendFormat = Json.using[Json.WithDefaultValues].format[Append] + val deleteFormat = Json.using[Json.WithDefaultValues].format[Delete] + val purgeFormat = Json.using[Json.WithDefaultValues].format[Purge] + val readFormat = Json.using[Json.WithDefaultValues].format[Mark] new OFormat[Option[ActionHeader]] { @@ -96,25 +72,25 @@ object ActionHeader { final case class Append( range: SeqRange, origin: Option[Origin], - version: Option[Version], + version: Version = Version.obsolete, payloadType: PayloadType.BinaryOrJson, - metadata: HeaderMetadata, + metadata: HeaderMetadata = HeaderMetadata.empty, ) extends AppendOrDelete final case class Delete( to: DeleteTo, origin: Option[Origin], - version: Option[Version], + version: Version = Version.obsolete, ) extends AppendOrDelete final case class Purge( origin: Option[Origin], - version: Option[Version], + version: Version = Version.obsolete, ) extends AppendOrDelete final case class Mark( id: String, origin: Option[Origin], - version: Option[Version], + version: Version = Version.obsolete, ) extends ActionHeader } diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/EventRecord.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/EventRecord.scala index 67a5356a8..d17f62d90 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/EventRecord.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/EventRecord.scala @@ -14,7 +14,7 @@ final case class EventRecord[A]( timestamp: Instant, partitionOffset: PartitionOffset, origin: Option[Origin], - version: Option[Version], + version: Version, metadata: RecordMetadata, headers: Headers, ) { diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Produce.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Produce.scala index fa95e8122..025af9ef2 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/Produce.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/Produce.scala @@ -90,7 +90,7 @@ private[journal] object Produce { ActionHeader.Append( range = range, origin = origin, - version = version.some, + version = version, payloadType = payloadAndType.payloadType, metadata = metadata, ), @@ -109,7 +109,7 @@ private[journal] object Produce { def delete(key: Key, to: DeleteTo): F[PartitionOffset] = { for { timestamp <- Clock[F].instant - action = Action.Delete(key, timestamp, to, origin, version.some) + action = Action.Delete(key, timestamp, to, origin, version) result <- send(action) } yield result } @@ -117,7 +117,7 @@ private[journal] object Produce { def purge(key: Key): F[PartitionOffset] = { for { timestamp <- Clock[F].instant - action = Action.Purge(key, timestamp, origin, version.some) + action = Action.Purge(key, timestamp, origin, version) result <- send(action) } yield result } @@ -126,7 +126,7 @@ private[journal] object Produce { for { timestamp <- Clock[F].instant id = randomId.value - action = Action.Mark(key, timestamp, id, origin, version.some) + action = Action.Mark(key, timestamp, id, origin, version) result <- send(action) } yield result } diff --git a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Delete-None.json b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Delete-None.json index 820dc4759..34b486f04 100644 --- a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Delete-None.json +++ b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Delete-None.json @@ -1,6 +1,5 @@ { "delete": { - "to": 3, - "version": "0.0.1" + "to": 3 } } \ No newline at end of file diff --git a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Mark-origin.json b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Mark-origin.json index ea70ac4aa..edd8a63e6 100644 --- a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Mark-origin.json +++ b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Mark-origin.json @@ -1,6 +1,7 @@ { "mark": { "id": "id", - "origin": "origin" + "origin": "origin", + "version": "0.0.1" } } \ No newline at end of file diff --git a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Purge-origin.json b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Purge-origin.json index 7f6b7d29e..2d69d6b29 100644 --- a/journal/src/test/resources/com/evolutiongaming/kafka/journal/Purge-origin.json +++ b/journal/src/test/resources/com/evolutiongaming/kafka/journal/Purge-origin.json @@ -1,5 +1,6 @@ { "purge": { - "origin": "origin" + "origin": "origin", + "version": "0.0.1" } } \ No newline at end of file diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionHeaderJsonSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionHeaderJsonSpec.scala index 7ab1c864b..11dbbc3d7 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionHeaderJsonSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionHeaderJsonSpec.scala @@ -30,24 +30,42 @@ class ActionHeaderJsonSpec extends AnyFunSuite with Matchers { test(s"Append format, origin: $origin, payloadType: $payloadType, metadata: $metadataStr") { val range = SeqRange.unsafe(1, 5) val header = - ActionHeader.Append(range = range, origin = origin, version = none, payloadType = payloadType, metadata = metadata) + ActionHeader.Append( + range = range, + origin = origin, + version = Version.obsolete, + payloadType = payloadType, + metadata = metadata, + ) verify(header, s"Append-$originStr-$payloadType-$metadataStr") } } test(s"Delete format, origin: $origin") { - val seqNr = SeqNr.unsafe(3) - val header = ActionHeader.Delete(seqNr.toDeleteTo, origin, Version("0.0.1").some) + val seqNr = SeqNr.unsafe(3) + val version = origin match { + case Some(_) => Version("0.0.1") + case None => Version.obsolete + } + val header = ActionHeader.Delete(seqNr.toDeleteTo, origin, version) verify(header, s"Delete-$originStr") } test(s"Purge format, origin: $origin") { - val header = ActionHeader.Purge(origin, none) + val version = origin match { + case Some(_) => Version("0.0.1") + case None => Version.obsolete + } + val header = ActionHeader.Purge(origin, version) verify(header, s"Purge-$originStr") } test(s"Mark format, origin: $origin") { - val header = ActionHeader.Mark("id", origin, none) + val version = origin match { + case Some(_) => Version("0.0.1") + case None => Version.obsolete + } + val header = ActionHeader.Mark("id", origin, version) verify(header, s"Mark-$originStr") } } diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionToProducerRecordSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionToProducerRecordSpec.scala index 66a52ef7d..9d7f72b21 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionToProducerRecordSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionToProducerRecordSpec.scala @@ -29,7 +29,7 @@ class ActionToProducerRecordSpec extends AnyFunSuite with Matchers { private val origins = List(Origin("origin").some, none[Origin]) - private val versions = List(Version.current.some, none[Version]) + private val versions = List(Version.current) private val seqNrs = List(SeqNr.min, SeqNr.max) diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala index fed2e8f65..2c11029e6 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala @@ -109,7 +109,7 @@ class HeadCacheSpec extends AsyncWordSpec with Matchers { result <- Concurrent[IO].start { headCache.get(key = key, partition = partition, offset = marker) } _ <- stateRef.update { _.copy(topics = Map((topic, List(partition)))) } _ <- stateRef.update { state => - val action = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current.some)) + val action = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current)) val record = consumerRecordOf(action, topicPartition, marker) val records = ConsumerRecordsOf(List(record)) state.enqueue(records.pure[Try]) @@ -260,7 +260,7 @@ object HeadCacheSpec { key = key, timestamp = timestamp, origin = none, - version = Version.current.some, + version = Version.current, events = Events(Nel.of(Event(seqNr)), PayloadMetadata.empty), metadata = recordMetadata, headers = headers, diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadInfoSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadInfoSpec.scala index 60c9d6794..81b883059 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadInfoSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadInfoSpec.scala @@ -66,7 +66,7 @@ class HeadInfoSpec extends AnyFunSuite with Matchers { ActionHeader.Append( range = SeqRange.unsafe(from, to), origin = None, - version = Version.current.some, + version = Version.current, payloadType = PayloadType.Json, metadata = HeaderMetadata.empty, ) @@ -74,12 +74,12 @@ class HeadInfoSpec extends AnyFunSuite with Matchers { private def delete(seqNr: Int) = { val deleteTo = SeqNr.unsafe(seqNr).toDeleteTo - ActionHeader.Delete(deleteTo, none, Version.current.some) + ActionHeader.Delete(deleteTo, none, Version.current) } - private def mark = ActionHeader.Mark("id", none, Version.current.some) + private def mark = ActionHeader.Mark("id", none, Version.current) - private def purge = ActionHeader.Purge(none, Version.current.some) + private def purge = ActionHeader.Purge(none, Version.current) private def deleteInfo(seqNr: Int) = { val deleteTo = SeqNr.unsafe(seqNr).toDeleteTo diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/PartitionCacheSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/PartitionCacheSpec.scala index 35af2d1b8..9f7ef9cae 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/PartitionCacheSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/PartitionCacheSpec.scala @@ -120,7 +120,7 @@ class PartitionCacheSpec extends AsyncFunSuite with Matchers { test("get HeadInfo.delete") { partitionCacheOf() .use { cache => - val actionHeader = ActionHeader.Delete(to = DeleteTo(seqNr0), origin = none, version = none) + val actionHeader = ActionHeader.Delete(to = DeleteTo(seqNr0), origin = none, version = Version.obsolete) for { a <- cache.add(Record(id0, offset0, actionHeader)) _ <- IO { a shouldEqual none } @@ -638,7 +638,7 @@ object PartitionCacheSpec { val seqNr0: SeqNr = SeqNr.min val seqNr1: SeqNr = seqNr0.next[Try].get - val actionHeader: ActionHeader = ActionHeader.Mark("mark", none, none) + val actionHeader: ActionHeader = ActionHeader.Mark("mark", none, Version.obsolete) def actionHeaderOf(seqNr: SeqNr): ActionHeader.Append = { ActionHeader.Append( @@ -646,7 +646,7 @@ object PartitionCacheSpec { origin = none, payloadType = PayloadType.Binary, metadata = HeaderMetadata.empty, - version = none, + version = Version.obsolete, ) } diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/StreamActionRecordsSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/StreamActionRecordsSpec.scala index 015f7b051..499ad0cb6 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/StreamActionRecordsSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/StreamActionRecordsSpec.scala @@ -73,7 +73,7 @@ object StreamActionRecordsSpec { val (marker, markRecord) = { val offset = pointers.lastOption.fold(1L) { _.offset + 1 } - val mark = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current.some)) + val mark = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current)) val partitionOffset = PartitionOffset(offset = Offset.unsafe(offset)) val record = ActionRecord(mark, partitionOffset) val marker = Marker(mark.id, partitionOffset) @@ -87,7 +87,7 @@ object StreamActionRecordsSpec { val header = ActionHeader.Append( range = range, origin = none, - version = Version.current.some, + version = Version.current, payloadType = PayloadType.Json, metadata = HeaderMetadata.empty, ) diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/eventual/EventualJournalSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/eventual/EventualJournalSpec.scala index fd5f53bed..1bd8c2d28 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/eventual/EventualJournalSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/eventual/EventualJournalSpec.scala @@ -81,7 +81,7 @@ trait EventualJournalSpec extends AnyWordSpec with Matchers { metadata = RecordMetadata(HeaderMetadata(Json.obj(("key", "value")).some), PayloadMetadata.empty), headers = headers, origin = none, - version = none, + version = Version.obsolete, ) } diff --git a/notes.md b/notes.md index bfbf75d2d..cf799cfbc 100644 --- a/notes.md +++ b/notes.md @@ -46,4 +46,4 @@ Actions: * Measure three important operations of Kafka consumer: init, seek, poll. So we can optimise journal even better * More corner cases to come in order to support re-partition [^_^] -* Decide on when to clean/cut head caches \ No newline at end of file +* Decide on when to clean/cut head caches diff --git a/persistence/src/test/scala/akka/persistence/kafka/journal/JournalAdapterSpec.scala b/persistence/src/test/scala/akka/persistence/kafka/journal/JournalAdapterSpec.scala index fcbf71159..d96001849 100644 --- a/persistence/src/test/scala/akka/persistence/kafka/journal/JournalAdapterSpec.scala +++ b/persistence/src/test/scala/akka/persistence/kafka/journal/JournalAdapterSpec.scala @@ -103,7 +103,7 @@ object JournalAdapterSpec { private val headers = Headers(("key", "value")) private val origin = Origin("origin") private val version = Version.current - private val eventRecord = EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata, headers) + private val eventRecord = EventRecord(event, timestamp, partitionOffset, origin.some, version, recordMetadata, headers) sealed abstract class Action diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala index b50393b1e..16e3b59d0 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala @@ -161,7 +161,7 @@ private[journal] object Batch { offset: Offset, to: DeleteTo, origin: Option[Origin], - version: Option[Version], + version: Version, ) extends Batch final case class Purge( diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala index bbb90262c..4cae79d99 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicateRecords.scala @@ -68,20 +68,19 @@ private[journal] object ReplicateRecords { val origin = records.head.action.origin val originStr = origin.foldMap { origin => s", origin: $origin" } val version = records.last.action.version - val versionStr = version.fold("none") { _.toString } val expireAfterStr = expireAfter.foldMap { expireAfter => s", expireAfter: $expireAfter" } - s"append in ${latency.toMillis}ms, id: $id, partition: $partition, offset: $offset, $seqNrs$originStr, version: $versionStr$expireAfterStr" + s"append in ${latency.toMillis}ms, id: $id, partition: $partition, offset: $offset, $seqNrs$originStr, version: $version$expireAfterStr" } def measure(events: Nel[EventRecord[EventualPayloadAndType]], expireAfter: Option[ExpireAfter]): F[Unit] = { for { measurements <- measurements(records.size) - version = events.last.version.map(_.value).getOrElse("none") + version = events.last.version expiration = expireAfter.map(_.value.toString).getOrElse("none") result <- metrics.append( events = events.length, bytes = bytes, - clientVersion = version, + clientVersion = version.value, expiration = expiration, measurements = measurements, ) @@ -111,12 +110,11 @@ private[journal] object ReplicateRecords { } yield result } - def delete(offset: Offset, deleteTo: DeleteTo, origin: Option[Origin], version: Option[Version]): F[Int] = { + def delete(offset: Offset, deleteTo: DeleteTo, origin: Option[Origin], version: Version): F[Int] = { def msg(latency: FiniteDuration): String = { - val originStr = origin.foldMap { origin => s", origin: $origin" } - val versionStr = version.fold("none") { _.toString } - s"delete in ${latency.toMillis}ms, id: $id, offset: $partition:$offset, deleteTo: $deleteTo$originStr, version: $versionStr" + val originStr = origin.foldMap { origin => s", origin: $origin" } + s"delete in ${latency.toMillis}ms, id: $id, offset: $partition:$offset, deleteTo: $deleteTo$originStr, version: $version" } def measure(): F[Unit] = { @@ -134,12 +132,11 @@ private[journal] object ReplicateRecords { } yield result } - def purge(offset: Offset, origin: Option[Origin], version: Option[Version]): F[Int] = { + def purge(offset: Offset, origin: Option[Origin], version: Version): F[Int] = { def msg(latency: FiniteDuration): String = { - val originStr = origin.foldMap { origin => s", origin: $origin" } - val versionStr = version.fold("none") { _.toString } - s"purge in ${latency.toMillis}ms, id: $id, offset: $partition:$offset$originStr, version: $versionStr" + val originStr = origin.foldMap { origin => s", origin: $origin" } + s"purge in ${latency.toMillis}ms, id: $id, offset: $partition:$offset$originStr, version: $version" } def measure(): F[Unit] = { diff --git a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala index 9d0943d18..84041bf0b 100644 --- a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala +++ b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/BatchSpec.scala @@ -312,11 +312,11 @@ class BatchSpec extends AnyFunSuite with Matchers { } def deletes(offset: Int, to: Int, origin: String = ""): Batch.Delete = { - Batch.Delete(Offset.unsafe(offset), SeqNr.unsafe(to).toDeleteTo, originOf(origin), version = none) + Batch.Delete(Offset.unsafe(offset), SeqNr.unsafe(to).toDeleteTo, originOf(origin), version = Version.obsolete) } def purges(offset: Int, origin: String = ""): Batch.Purge = { - Batch.Purge(Offset.unsafe(offset), originOf(origin), version = none) + Batch.Purge(Offset.unsafe(offset), originOf(origin), version = Version.obsolete) } def append(offset: Int, seqNr: Int, seqNrs: Int*): A.Append = { @@ -349,7 +349,7 @@ class BatchSpec extends AnyFunSuite with Matchers { range = SeqRange(seqNrOf(seqNrs.head), seqNrOf(seqNrs.last)), payloadType = PayloadType.Binary, origin = none, - version = none, + version = Version.obsolete, metadata = HeaderMetadata.empty, ), payload = ByteVector.empty, @@ -358,15 +358,15 @@ class BatchSpec extends AnyFunSuite with Matchers { } def deleteOf(seqNr: Int, origin: String): Action.Delete = { - Action.Delete(keyOf, timestamp, seqNrOf(seqNr).toDeleteTo, originOf(origin), version = none) + Action.Delete(keyOf, timestamp, seqNrOf(seqNr).toDeleteTo, originOf(origin), version = Version.obsolete) } def actionOf(a: A): Action = { a match { case a: A.Append => appendOf(Nel(a.seqNr, a.seqNrs)) case a: A.Delete => deleteOf(seqNr = a.seqNr, origin = a.origin) - case a: A.Purge => Action.Purge(keyOf, timestamp, origin = originOf(a.origin), version = none) - case _: A.Mark => Action.Mark(keyOf, timestamp, ActionHeader.Mark("id", none, version = none)) + case a: A.Purge => Action.Purge(keyOf, timestamp, origin = originOf(a.origin), version = Version.obsolete) + case _: A.Mark => Action.Mark(keyOf, timestamp, ActionHeader.Mark("id", none, version = Version.obsolete)) } } diff --git a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala index 6ad4a4e1c..acdc927cf 100644 --- a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala +++ b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala @@ -725,7 +725,7 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { timestamp, partitionOffset, origin.some, - version.some, + version, recordMetadata.withExpireAfter(expireAfter), headers, ) @@ -739,7 +739,7 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { key = key, timestamp = timestamp, origin = origin.some, - version = version.some, + version = version, events = Events( events = seqNrs.map { seqNr => Event(SeqNr.unsafe(seqNr), Set(seqNr.toString)) }, recordMetadata.payload.copy(expireAfter = expireAfter), @@ -751,15 +751,15 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { } private def markOf(key: Key) = { - Action.Mark(key, timestamp, "id", origin.some, version.some) + Action.Mark(key, timestamp, "id", origin.some, version) } private def deleteOf(key: Key, to: Int) = { - Action.Delete(key, timestamp, SeqNr.unsafe(to).toDeleteTo, origin.some, version.some) + Action.Delete(key, timestamp, SeqNr.unsafe(to).toDeleteTo, origin.some, version) } private def purgeOf(key: Key) = { - Action.Purge(key, timestamp, origin.some, version.some) + Action.Purge(key, timestamp, origin.some, version) } } diff --git a/tests/src/test/scala/com/evolutiongaming/kafka/journal/JournalIntSpec.scala b/tests/src/test/scala/com/evolutiongaming/kafka/journal/JournalIntSpec.scala index f1caa03a8..c571a01eb 100644 --- a/tests/src/test/scala/com/evolutiongaming/kafka/journal/JournalIntSpec.scala +++ b/tests/src/test/scala/com/evolutiongaming/kafka/journal/JournalIntSpec.scala @@ -97,7 +97,7 @@ abstract class JournalIntSpec[A] extends AsyncWordSpec with JournalSuite { _ = pointer shouldEqual None anEvent = event(seqNr) offset <- journal.append(Nel.of(anEvent), recordMetadata, headers) - record = EventRecord(anEvent, timestamp, offset, origin.some, version.some, recordMetadata, headers) + record = EventRecord(anEvent, timestamp, offset, origin.some, version, recordMetadata, headers) partition = offset.partition events <- journal.read _ = events shouldEqual List(record) @@ -115,7 +115,7 @@ abstract class JournalIntSpec[A] extends AsyncWordSpec with JournalSuite { _ = events shouldEqual List.empty metadata = recordMetadata.withExpireAfter(1.day.toExpireAfter.some) offset <- journal.append(Nel.of(anEvent), metadata, headers) - record = EventRecord(anEvent, timestamp, offset, origin.some, version.some, metadata, headers) + record = EventRecord(anEvent, timestamp, offset, origin.some, version, metadata, headers) events <- journal.read _ = events shouldEqual List(record) pointer <- journal.delete(DeleteTo.max) @@ -212,7 +212,7 @@ abstract class JournalIntSpec[A] extends AsyncWordSpec with JournalSuite { _ <- append offset <- append records = events.map { event => - EventRecord(event, timestamp, offset, origin.some, version.some, recordMetadata, headers) + EventRecord(event, timestamp, offset, origin.some, version, recordMetadata, headers) } partition = offset.partition events <- journal.read diff --git a/tests/src/test/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatorIntSpec.scala b/tests/src/test/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatorIntSpec.scala index cd40cf782..218f6df1b 100644 --- a/tests/src/test/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatorIntSpec.scala +++ b/tests/src/test/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatorIntSpec.scala @@ -165,7 +165,7 @@ class ReplicatorIntSpec extends AsyncWordSpec with BeforeAndAfterAll with Matche } yield for { event <- events } yield { - EventRecord(event, timestamp, partitionOffset, origin.some, version.some, recordMetadata1, headers) + EventRecord(event, timestamp, partitionOffset, origin.some, version, recordMetadata1, headers) } } From 59e028eddc0b485cc3ab03b5e00cc07a5eff48fd Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 14 Nov 2024 13:45:40 +0100 Subject: [PATCH 2/3] change version type --- .../evolutiongaming/kafka/journal/replicator/Batch.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala index 16e3b59d0..7bd2b3270 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala @@ -78,7 +78,7 @@ private[journal] object Batch { case ActionRecord(_: Action.Mark, _) => state case ActionRecord(purge: Action.Purge, partitionOffset: PartitionOffset) => - def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version) + def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version1) state.next match { case Some(_: Purge) => state @@ -88,7 +88,7 @@ private[journal] object Batch { case ActionRecord(delete: Action.Delete, partitionOffset: PartitionOffset) => def deleteBatch(delete: Action.Delete) = - Delete(partitionOffset.offset, delete.to, delete.origin, delete.version) + Delete(partitionOffset.offset, delete.to, delete.origin, delete.version1) state.next match { @@ -167,6 +167,6 @@ private[journal] object Batch { final case class Purge( offset: Offset, origin: Option[Origin], // used only for logging - version: Option[Version], // used only for logging + version: Version, // used only for logging ) extends Batch } From 1de7acaf5ed5938e27d764e9739097c2e3b64b6e Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Thu, 14 Nov 2024 13:50:42 +0100 Subject: [PATCH 3/3] fix compile errs --- .../com/evolutiongaming/kafka/journal/replicator/Batch.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala index 7bd2b3270..fea1c854a 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala @@ -78,7 +78,7 @@ private[journal] object Batch { case ActionRecord(_: Action.Mark, _) => state case ActionRecord(purge: Action.Purge, partitionOffset: PartitionOffset) => - def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version1) + def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version) state.next match { case Some(_: Purge) => state @@ -88,7 +88,7 @@ private[journal] object Batch { case ActionRecord(delete: Action.Delete, partitionOffset: PartitionOffset) => def deleteBatch(delete: Action.Delete) = - Delete(partitionOffset.offset, delete.to, delete.origin, delete.version1) + Delete(partitionOffset.offset, delete.to, delete.origin, delete.version) state.next match {