- 
                Notifications
    You must be signed in to change notification settings 
- Fork 5
Add suspend and resume functionality #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
55b43b2    to
    d14913d      
    Compare
  
    | kafkaClientInterface.batchCursorContext(cursors) | ||
| } | ||
|  | ||
| val kafkaBatchSink: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how you make sinks do something different depending on an input, in our case if we have an SuccessfulUploadPart we want to commit the cursors and if we have a  FailedUploadPart we just log (note that Alpakka S3's client will automatically retry uploading a failed chunk)
| implicit val ec: ExecutionContext = system.dispatcher | ||
| implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) | ||
| implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis) | ||
| implicit override val generatorDrivenConfig: PropertyCheckConfiguration = | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing issues in our tests, normally for property driven tests they run a certain number of times however in our case with a real integration test with Kafka we only want it to run once (and to succeed once)
| def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] | ||
| def backupToStorageSink(key: String, | ||
| currentState: Option[CurrentState] | ||
| ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now instead of just having a Sink that is a generic ByteString instead we also have to pass the cursor context into the Sink (and its now the sinks responsibility on how to handle each outputting element)
| }, | ||
| empty | ||
| ) | ||
| val subFlowSink = substreams.to( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change here is that we no longer commit the cursors after a timeslice since it makes no sense to do so.
| * A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors | ||
| */ | ||
| override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = | ||
| override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being changed from a val to def because you cannot make super calls in subclasses on a field that is defined as a val (it has to be a method). This is relevant for KafkaClientWithKillSwitch which extends KafkaClient
| * A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message | ||
| */ | ||
| override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings) | ||
| override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being changed from a val to def because you cannot make super calls in subclasses on a field that is defined as a val (it has to be a method). This is relevant for KafkaClientWithKillSwitch which extends KafkaClient
| * @return | ||
| * A collection data structure that represents the batched cursors | ||
| */ | ||
| override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch = | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a list of cursors, we need to define how to "batch" these together so that they can be committed to Kafka (which is done with commitCursor. For the actual batching we use CommittableOffsetBatch which is Alpakka's own internal mechanism for batching Kafka cursors (internally it uses a Map[GroupPartitionOffset, Long] to continuously maintain a set of cursors for each given group/partition/offset).
Note that  CommittableOffsetBatch is also global
|  | ||
| /** The type that represents the result of batching a `CursorContext` | ||
| */ | ||
| type BatchedCursorContext | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed because there is a difference between a single cursor and a batch of cursors.
| * @return | ||
| * A collection data structure that represents the batched cursors | ||
| */ | ||
| override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case of a dummy mock, we just get the largest value out of all of the currently submitted cursors
df6e4fc    to
    1526f2e      
    Compare
  
    | So I just updated this PR (note that its not compiling because it requires a locally published Alpakka with akka/alpakka#2770, since this PR is in flux I will create a snapshot of it when I get feedback from the Alpakka maintainers). Here are some major notes 
 | 
0915de4    to
    b27a14e      
    Compare
  
    94d83fa    to
    c39b723      
    Compare
  
    c39b723    to
    887516a      
    Compare
  
    c275bfb    to
    6210752      
    Compare
  
    |  | ||
| /** Prepares the sink before it gets handed to `backupToStorageSink` | ||
| */ | ||
| private[backup] def prepareStartOfStream(state: Option[CurrentState], | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function handles the case where if there is current state (i.e. you are resuming from a previously upload) then we need to drop the first character [ from the stream since there is an already existing half complete upload which is the middle of an array
| configureConsumer | ||
| .fold(base)(block => block(base)) | ||
| .withProperties( | ||
| ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the magic configuration without which the suspend/resume case does not work. It basically prevents Kafka from pushing the commit to an offset that is after the one we manually/explicitly committed.
Another thing to note here is that this configuration is applied last which essentially means its hardcoded/impossible to override. This is done deliberately because otherwise Guardian just won't work correctly
6210752    to
    441fd83      
    Compare
  
    | The PR is ready to review, the main notable change since the last comment is that the configuration for the timeslice has been changed (see  Ontop of this the testing strategy for the end to end tests have changed, rather than manually terminating incomplete uploads instead we send all of the data we care about in a single time window (i.e. a  | 
| ): Long = { | ||
| val (period, finalInitialTime) = timeConfiguration match { | ||
| case PeriodFromFirst(duration) => (duration, initialTime) | ||
| case ChronoUnitSlice(chronoUnit) => | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its configured for the suspend/resume case then we need to set the initial time to also be the start of the time unit for things to work correctly.
441fd83    to
    93bfcd5      
    Compare
  
    93bfcd5    to
    151614d      
    Compare
  
    | max-interval = 1 hour | ||
| max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL} | ||
| parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM} | ||
| delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY} | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These configurations are being removed because they are hardcoded by the Kafka client itself so its impossible to override.
| /** @return | ||
| * A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message | ||
| */ | ||
| override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is the same as what was done in #76, i.e. committedOffsets ++ Iterable(cursor) doesn't actually end up doing anything because committedOffsets is immutable so we are just throwing away the reference.
        
          
                backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      134dce5    to
    1e950e1      
    Compare
  
            
          
                backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | LGTM but certainly @jlprat is needed here :-) | 
0845e66    to
    9a41aef      
    Compare
  
    9a41aef    to
    bc7f1c2      
    Compare
  
    | I just rebased the original commit, I forgot to change the  | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple of minor comments, but LGTM
| override type BackupResult = Option[StorageObject] | ||
|  | ||
| override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { | ||
| override type CurrentState = Nothing | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing this Nothing is purely a placeholder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, GCS is not implemented fully
| override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { | ||
| override type CurrentState = Nothing | ||
|  | ||
| override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
        
          
                backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | override type CurrentState = CurrentS3State | ||
|  | ||
| override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = { | ||
| implicit val ec: ExecutionContext = system.classicSystem.getDispatcher | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it would be better to use a dedicated EC in this case. This one is the internal one used by Akka, and usually you don't want to cog that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought the same however for this case I believe its overcomplicated due to the fact that this ec will only get executed per time slice which in the grand scheme of things is nothing (unless you do something silly and configure it with timeslice of 1 nanosecond).
| case rest => | ||
| val last = rest.maxBy(_.initiated)(Ordering[Instant]) | ||
| logger.warn( | ||
| s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a call to action to clean the other ones up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I want to resolve this comprehensively with #82, it may be that this case won't actually occur after resolving this ticket.
| parts | ||
| case _ => | ||
| // We drop the last part here since its broken | ||
| parts.dropRight(1) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? And what happens this last part? Is it corrupt? Is it simply useless data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this was added when I was figuring out what was causing corrupted data in the S3 buckets. I don't think this case can occur and I just left it there as a safeguard but I can remove it if requested (I think it theoretically can occur but I need to simulate it via a test case)
| val diffxVersion = "0.5.6" | ||
| val testContainersVersion = "0.39.8" | ||
| val testContainersJavaVersion = "1.16.2" | ||
| val scalaCheckVersion = "1.15.5-1-SNAPSHOT" | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why a snapshot version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It contains typelevel/scalacheck#849 which is required to generate data that is approximately the size of S3 chunks
        
          
                backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
About this change - What it does
This PR is currently an in progress PR on the suspend/resume functionality. Its currently blocked because of the following issues