-
Notifications
You must be signed in to change notification settings - Fork 123
feat: Leaf Checkpoint Reader #1501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| /// # Distributability | ||
| /// | ||
| /// This phase is designed to be distributable. To distribute: | ||
| /// 1. Partition `files` across N executors | ||
| /// 2. Create N `LeafCheckpointReader` instances, one per executor with its file partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an odd place to bring up distributability? The log replay phase that consumes this reader is the thing getting distributed, and which needs the careful choreography? Seems like this reader is just a detail in that bigger picture.
| let actions = engine | ||
| .parquet_handler() | ||
| .read_parquet_files(&files, schema, None)? | ||
| .map(|batch| batch.map(|b| ActionsBatch::new(b, false))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're splitting up the phases instead of chaining everything into one big iterator, we should probably reconsider whether we still need these (batch, bool) pairs -- I suspect that whoever invokes the dedup visitor will now know, structurally, whether it's a log or checkpoint batch, and can just pass true or false accordingly?
| if log_segment.checkpoint_parts.is_empty() { | ||
| println!("Test table has no checkpoint parts, skipping"); | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems very strange to have in a test?? Either the test case has a checkpoint or it doesn't.
And in this case it should have a checkpoint, and panicking at L110 below is a perfectly reasonable test failure mode if the checkpoint somehow went missing.
| ManifestPhase::new(manifest_file, log_segment.log_root.clone(), engine.clone())?; | ||
|
|
||
| // Drain manifest phase and apply processor | ||
| for batch in manifest_phase.by_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: TIL that Iterator::by_ref is a thing!
| let batch = batch?; | ||
| processor.process_actions_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| let batch = batch?; | |
| processor.process_actions_batch(batch)?; | |
| processor.process_actions_batch(batch?)?; |
(not sure which way is more readable?)
| let mut sidecar_file_paths = Vec::new(); | ||
| let mut batch_count = 0; | ||
|
|
||
| while let Some(result) = sidecar_phase.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this just
| while let Some(result) = sidecar_phase.next() { | |
| for result in sidecar_phase { |
🥞 Stacked PR
Use this link to review incremental changes.
What changes are proposed in this pull request?
How was this change tested?