-
Notifications
You must be signed in to change notification settings - Fork 5
Fix fencepost issues with stream algorithm plus fix tests #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| package io.aiven.guardian.kafka.backup | ||
|
|
||
| import akka.NotUsed | ||
| import akka.stream.scaladsl._ | ||
| import akka.util.ByteString | ||
| import io.aiven.guardian.kafka.Errors | ||
|
|
@@ -17,25 +18,6 @@ import java.time._ | |
| import java.time.format.DateTimeFormatter | ||
| import java.time.temporal._ | ||
|
|
||
| /** A marker used to indicate in which position the current backup stream is | ||
| */ | ||
| sealed abstract class BackupStreamPosition | ||
|
|
||
| object BackupStreamPosition { | ||
|
|
||
| /** The backup stream has just started right now | ||
| */ | ||
| case object Start extends BackupStreamPosition | ||
|
|
||
| /** The backup stream is in the middle of a time period | ||
| */ | ||
| case object Middle extends BackupStreamPosition | ||
|
|
||
| /** The backup stream position has just hit a boundary for when a new period starts | ||
| */ | ||
| case object Boundary extends BackupStreamPosition | ||
| } | ||
|
|
||
| /** An interface for a template on how to backup a Kafka Stream into some data storage | ||
| * @tparam T | ||
| * The underlying `kafkaClientInterface` type | ||
|
|
@@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] { | |
| implicit val kafkaClientInterface: T | ||
| implicit val backupConfig: Backup | ||
|
|
||
| /** An element from the original record | ||
| */ | ||
| private[backup] sealed trait RecordElement | ||
| private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord, | ||
| context: kafkaClientInterface.CursorContext | ||
| ) extends RecordElement | ||
| private[backup] case object End extends RecordElement | ||
|
|
||
| /** An element after the record has been transformed to a ByteString | ||
| */ | ||
| private[backup] sealed trait ByteStringElement { | ||
| val data: ByteString | ||
| val context: kafkaClientInterface.CursorContext | ||
| } | ||
|
|
||
| private[backup] case class Start(override val data: ByteString, | ||
| override val context: kafkaClientInterface.CursorContext, | ||
| key: String | ||
| ) extends ByteStringElement | ||
| private[backup] case class Tail(override val data: ByteString, | ||
| override val context: kafkaClientInterface.CursorContext | ||
| ) extends ByteStringElement | ||
|
|
||
| /** Override this type to define the result of backing up data to a datasource | ||
| */ | ||
| type BackupResult | ||
|
|
@@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] { | |
| */ | ||
| def empty: () => Future[BackupResult] | ||
|
|
||
| @nowarn("msg=not.*?exhaustive") | ||
| private[backup] def calculateBackupStreamPositions( | ||
| sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long), | ||
| kafkaClientInterface.CursorContext, | ||
| kafkaClientInterface.Control | ||
| ] | ||
| ): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition), | ||
| kafkaClientInterface.CursorContext, | ||
| kafkaClientInterface.Control | ||
| ] = sourceWithPeriods | ||
| .sliding(2) | ||
| .map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) => | ||
| val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions) | ||
| ): Source[RecordElement, kafkaClientInterface.Control] = | ||
| sourceWithPeriods.asSource | ||
| .prefixAndTail(2) | ||
| // This algorithm only works with Source's that have 2 or more elements | ||
| .flatMapConcat { | ||
| case (Seq( | ||
| ((firstReducedConsumerRecord, firstDivision), firstContext), | ||
| ((secondReducedConsumerRecord, secondDivision), secondContext) | ||
| ), | ||
| rest | ||
| ) => | ||
| val all = Source | ||
| .combine( | ||
| Source( | ||
| List( | ||
| ((firstReducedConsumerRecord, firstDivision), firstContext), | ||
| ((secondReducedConsumerRecord, secondDivision), secondContext) | ||
| ) | ||
| ), | ||
| rest | ||
| )(Concat(_)) | ||
|
|
||
| (beforeReducedConsumerRecord, backupStreamPosition) | ||
| } | ||
| .mapContext { case Seq(cursorContext, _) => cursorContext } | ||
| val withDivisions = | ||
| all | ||
| .sliding(2) | ||
| .map { | ||
| case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) => | ||
| if (isAtBoundary(beforeDivisions, afterDivisions)) | ||
| List( | ||
| End, | ||
| Element(afterReducedConsumerRecord, afterContext) | ||
| ) | ||
| else | ||
| List(Element(afterReducedConsumerRecord, afterContext)) | ||
| case rest => | ||
| throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
| .mapConcat(identity) | ||
|
|
||
| Source.combine( | ||
| Source.single(Element(firstReducedConsumerRecord, firstContext)), | ||
| withDivisions | ||
| )(Concat(_)) | ||
| // This case only occurs if a Source has a single element so we just directly return it | ||
| case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) => | ||
| Source.single(Element(singleReducedConsumerRecord, singleContext)) | ||
| case (rest, _) => | ||
| throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
|
|
||
| private[backup] def sourceWithPeriods( | ||
| source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)), | ||
|
|
@@ -107,12 +149,101 @@ trait BackupClientInterface[T <: KafkaClientInterface] { | |
|
|
||
| Source.combine( | ||
| Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))), | ||
| rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) } | ||
| rest.map { case (reducedConsumerRecord, context) => | ||
| (firstTimestamp, (reducedConsumerRecord, context)) | ||
| } | ||
| )(Concat(_)) | ||
| case None => throw Errors.ExpectedStartOfSource | ||
| } | ||
| } | ||
|
|
||
| /** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage | ||
| * | ||
| * @param sourceElements | ||
| * A sequence of [[RecordElement]]'s as a result of `sliding(2)` | ||
| * @return | ||
| * a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]] | ||
| */ | ||
| private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = { | ||
| val stringWithContext = sourceElements match { | ||
| case Seq(Element(reducedConsumerRecord, context)) => | ||
| // Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user | ||
| // in which case stream needs to be terminated with `null]` in order to be valid | ||
| List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context))) | ||
jlprat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case Seq(Element(firstReducedConsumerRecord, firstContext), | ||
| Element(secondReducedConsumerRecord, secondContext) | ||
| ) => | ||
| List( | ||
| (s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)), | ||
| (s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext)) | ||
| ) | ||
| case Seq(Element(reducedConsumerRecord, context), End) => | ||
| List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context))) | ||
| case Seq(End) => | ||
| List(("]", None)) | ||
| case rest => throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
| stringWithContext.map { case (string, context) => (ByteString(string), context) } | ||
| } | ||
|
|
||
| /** Applies the transformation to the first element of a Stream so that it starts of as a JSON array. | ||
| * | ||
| * @param element | ||
| * Starting [[Element]] | ||
| * @param key | ||
| * The current key being processed | ||
| * @param terminate | ||
| * Whether to immediately terminate the JSON array for single element in Stream case | ||
| * @return | ||
| * A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]] | ||
| */ | ||
| private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) = | ||
| transformReducedConsumerRecords(List(element)).map { | ||
| case (byteString, Some(context)) => | ||
| if (terminate) | ||
| Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key) | ||
|
||
| else | ||
| Start(ByteString("[") ++ byteString, context, key) | ||
| case _ => | ||
| throw Errors.UnhandledStreamCase(List(element)) | ||
| } | ||
|
|
||
| /** Fixes the case where is an odd amount of elements in the stream | ||
| * @param head | ||
| * of stream as a result of `prefixAndTail` | ||
| * @param restSource | ||
| * of the stream as a result of `prefixAndTail` | ||
| * @return | ||
| * A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up. | ||
| */ | ||
| private[backup] def transformTailingElement( | ||
| head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])], | ||
| restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed] | ||
| ) = { | ||
| val restTransformed = restSource | ||
| .sliding(2, step = 2) | ||
| .map { | ||
| case Seq((before, Some(context)), (after, None)) => | ||
| List((before.dropRight(1) ++ after, context)) | ||
| case Seq((before, Some(beforeContext)), (after, Some(afterContext))) => | ||
| List((before, beforeContext), (after, afterContext)) | ||
| case Seq((single, Some(context))) => | ||
| List((single, context)) | ||
| case rest => | ||
| throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
|
|
||
| head match { | ||
| case Seq((byteString, Some(cursorContext))) => | ||
| Source.combine( | ||
| Source.single((List((byteString, cursorContext)))), | ||
| restTransformed | ||
| )(Concat(_)) | ||
| case rest => | ||
| throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
| } | ||
|
|
||
| /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a | ||
| * data source. | ||
| * @return | ||
|
|
@@ -127,56 +258,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] { | |
|
|
||
| val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord)) | ||
|
|
||
| val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) => | ||
| backupStreamPosition == BackupStreamPosition.Boundary | ||
| } | ||
| val split = withBackupStreamPositions | ||
| .splitAfter { case sourceElement => | ||
| sourceElement match { | ||
| case End => true | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| val substreams = split | ||
| .prefixAndTail(1) | ||
| .flatMapConcat { case (head, restOfReducedConsumerRecords) => | ||
| head.headOption match { | ||
| case Some(((firstReducedConsumerRecord, _), firstContext)) => | ||
| // We need to retrieve the first element of the stream in order to calculate the key/filename | ||
| val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime) | ||
|
|
||
| // Now that we have retrieved the first element of the stream, lets recombine it so we create the | ||
| // original stream | ||
| val combined = Source.combine( | ||
| Source.single( | ||
| ( | ||
| (firstReducedConsumerRecord, BackupStreamPosition.Start), | ||
| firstContext | ||
| ) | ||
| ), | ||
| restOfReducedConsumerRecords | ||
| )(Concat(_)) | ||
| .prefixAndTail(2) | ||
| .flatMapConcat { | ||
| case (Seq(only: Element, End), _) => | ||
| // This case only occurs when you have a single element in a timeslice. | ||
| // We have to terminate immediately to create a JSON array with a single element | ||
| val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) | ||
| Source(transformFirstElement(only, key, terminate = true)) | ||
| case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) => | ||
| val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime) | ||
| val firstSource = transformFirstElement(first, key, terminate = false) | ||
|
|
||
| val rest = Source.combine( | ||
| Source.single(second), | ||
| restOfReducedConsumerRecords | ||
| )(Concat(_)) | ||
|
|
||
| // Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream | ||
| val transformed = combined.map { case ((record, position), context) => | ||
| val transform = transformReducedConsumerRecords(record, position) | ||
| (transform, context) | ||
| } | ||
| val restTransformed = rest | ||
| .sliding(2, step = 2) | ||
| .map(transformReducedConsumerRecords) | ||
| .mapConcat(identity) | ||
| .prefixAndTail(1) | ||
| .flatMapConcat((transformTailingElement _).tupled) | ||
| .mapConcat(identity) | ||
| .map { case (byteString, context) => Tail(byteString, context) } | ||
|
|
||
| transformed.map(data => (data, key)) | ||
| case None => Source.empty | ||
| } | ||
| Source.combine( | ||
| Source( | ||
| firstSource | ||
| ), | ||
| restTransformed | ||
| )(Concat(_)) | ||
| case (Seq(only: Element), _) => | ||
| // This case can also occur when user terminates the stream | ||
| val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) | ||
| Source(transformFirstElement(only, key, terminate = false)) | ||
| case (rest, _) => | ||
| throw Errors.UnhandledStreamCase(rest) | ||
| } | ||
|
|
||
| // Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071 | ||
| @nowarn("msg=method lazyInit in object Sink is deprecated") | ||
| val subFlowSink = substreams | ||
| .alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { | ||
| case ((_, context), _) => context | ||
| .alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement => | ||
| byteStringElement.context | ||
| }) | ||
| .to( | ||
| // See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660 | ||
| Sink.lazyInit( | ||
| { case (_, key) => | ||
| Future.successful( | ||
| backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { | ||
| case ((byteString, _), _) => byteString | ||
| } | ||
| ) | ||
| { | ||
| case start: Start => | ||
| Future.successful( | ||
| backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement => | ||
| byteStringElement.data | ||
| } | ||
| ) | ||
| case _ => throw Errors.ExpectedStartOfSource | ||
| }, | ||
| empty | ||
| ) | ||
|
|
@@ -202,44 +348,22 @@ object BackupClientInterface { | |
| def calculateKey(offsetDateTime: OffsetDateTime): String = | ||
| s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json" | ||
|
|
||
| /** Calculates the current position in 2 element sliding of a Stream. | ||
| /** Calculates whether we have rolled over a time period given number of divided periods. | ||
| * @param dividedPeriodsBefore | ||
| * The number of divided periods in the first element of the slide. -1 is used as a sentinel value to indicate the | ||
| * start of the stream | ||
| * @param dividedPeriodsAfter | ||
| * The number of divided periods in the second element of the slide | ||
| * @return | ||
| * The position of the Stream | ||
| * `true` if we have hit a time boundary otherwise `false` | ||
| */ | ||
| def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition = | ||
| def isAtBoundary(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean = | ||
| (dividedPeriodsBefore, dividedPeriodsAfter) match { | ||
| case (before, after) if after > before => | ||
| BackupStreamPosition.Boundary | ||
| true | ||
| case _ => | ||
| BackupStreamPosition.Middle | ||
| } | ||
|
|
||
| /** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage | ||
| * @param reducedConsumerRecord | ||
| * The ReducedConsumerRecord to persist | ||
| * @param backupStreamPosition | ||
| * The position of the record relative in the stream (so it knows if its at the start, middle or end) | ||
| * @return | ||
| * a `ByteString` ready to be persisted | ||
| */ | ||
| def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord, | ||
| backupStreamPosition: BackupStreamPosition | ||
| ): ByteString = { | ||
| val string = backupStreamPosition match { | ||
| case BackupStreamPosition.Start => | ||
| s"[${reducedConsumerRecordAsString(reducedConsumerRecord)}," | ||
| case BackupStreamPosition.Middle => | ||
| s"${reducedConsumerRecordAsString(reducedConsumerRecord)}," | ||
| case BackupStreamPosition.Boundary => | ||
| s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]" | ||
| false | ||
| } | ||
| ByteString(string) | ||
| } | ||
|
|
||
| protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime, | ||
| period: FiniteDuration, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.