Skip to content

Commit f4f1851

Browse files
authored
refactor: Avoid overly complex itertools methods in log listing code (#1434)
## What changes are proposed in this pull request? `Itertools::process_results` and `chunk_by` methods have very complex semantics without any significant code size reduction to compensate. Eliminate them from log listing logic to make the control flow easier to understand and maintain. ## How was this change tested? Existing log listing unit tests are quite extensive.
1 parent ddeef7f commit f4f1851

File tree

1 file changed

+119
-83
lines changed

1 file changed

+119
-83
lines changed

kernel/src/listed_log_files.rs

Lines changed: 119 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -87,37 +87,35 @@ fn list_log_files(
8787
// if the log_tail covers the entire requested range (i.e. starts at or before start_version),
8888
// we skip listing entirely. note that if we don't include this check, we will end up listing
8989
// and then just filtering out all the files we listed.
90-
let listed_files = log_tail_start
91-
// log_tail covers the entire requested range, so no listing is required
92-
.is_none_or(|tail_start| start_version < tail_start.version)
93-
.then(|| -> DeltaResult<_> {
94-
// NOTE: since engine APIs don't limit listing, we list from start_version and filter
95-
Ok(storage
96-
.list_from(&start_from)?
97-
.map(|meta| ParsedLogPath::try_from(meta?))
98-
// NOTE: this filters out .crc files etc which start with "." - some engines
99-
// produce `.something.parquet.crc` corresponding to `something.parquet`. Kernel
100-
// doesn't care about these files. Critically, note these are _different_ than
101-
// normal `version.crc` files which are listed + captured normally. Additionally
102-
// we likely aren't even 'seeing' these files since lexicographically the string
103-
// "." comes before the string "0".
104-
.filter_map_ok(|path_opt| path_opt.filter(|p| p.should_list()))
105-
.take_while(move |path_res| match path_res {
106-
// discard any path with too-large version; keep errors
107-
Ok(path) => path.version <= list_end_version,
108-
Err(_) => true,
109-
}))
110-
})
111-
.transpose()?
112-
.into_iter()
113-
.flatten();
90+
let listed_files = if log_tail_start.is_none_or(|tail| start_version < tail.version) {
91+
// NOTE: since engine APIs don't limit listing, we list from start_version and filter
92+
let files = storage
93+
.list_from(&start_from)?
94+
.map(|meta| ParsedLogPath::try_from(meta?))
95+
// NOTE: this filters out .crc files etc which start with "." - some engines
96+
// produce `.something.parquet.crc` corresponding to `something.parquet`. Kernel
97+
// doesn't care about these files. Critically, note these are _different_ than
98+
// normal `version.crc` files which are listed + captured normally. Additionally
99+
// we likely aren't even 'seeing' these files since lexicographically the string
100+
// "." comes before the string "0".
101+
.filter_map_ok(|path_opt| path_opt.filter(|p| p.should_list()))
102+
.take_while(move |path_res| match path_res {
103+
// discard any path with too-large version; keep errors
104+
Ok(path) => path.version <= list_end_version,
105+
Err(_) => true,
106+
});
107+
Some(files)
108+
} else {
109+
None
110+
};
114111

115112
// return chained [listed_files..log_tail], filtering log_tail by the requested range
116113
let filtered_log_tail = log_tail
117114
.into_iter()
118115
.filter(move |entry| entry.version >= start_version && entry.version <= end_version)
119116
.map(Ok);
120117

118+
let listed_files = listed_files.into_iter().flatten();
121119
Ok(listed_files.chain(filtered_log_tail))
122120
}
123121

@@ -256,85 +254,123 @@ impl ListedLogFiles {
256254
) -> DeltaResult<Self> {
257255
let log_files = list_log_files(storage, log_root, log_tail, start_version, end_version)?;
258256

259-
log_files.process_results(|iter| {
260-
let mut ascending_commit_files = Vec::new();
261-
let mut ascending_compaction_files = Vec::new();
262-
let mut checkpoint_parts = vec![];
263-
let mut latest_crc_file: Option<ParsedLogPath> = None;
264-
let mut latest_commit_file: Option<ParsedLogPath> = None;
265-
266-
// Group log files by version
267-
let log_files_per_version = iter.chunk_by(|x| x.version);
268-
269-
for (version, files) in &log_files_per_version {
270-
let mut new_checkpoint_parts = vec![];
271-
for file in files {
272-
use LogPathFileType::*;
273-
match file.file_type {
274-
Commit | StagedCommit => ascending_commit_files.push(file),
275-
CompactedCommit { hi } if end_version.is_none_or(|end| hi <= end) => {
276-
ascending_compaction_files.push(file);
277-
}
278-
CompactedCommit { .. } => (), // Failed the bounds check above
279-
SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
280-
new_checkpoint_parts.push(file)
281-
}
282-
Crc => {
283-
let latest_crc_ref = latest_crc_file.as_ref();
284-
if latest_crc_ref.is_none_or(|latest| latest.version < file.version) {
285-
latest_crc_file = Some(file);
286-
}
287-
}
288-
Unknown => {
289-
// It is possible that there are other files being stashed away into
290-
// _delta_log/ This is not necessarily forbidden, but something we
291-
// want to know about in a debugging scenario
292-
debug!(
293-
"Found file {} with unknown file type {:?} at version {}",
294-
file.filename, file.file_type, version
295-
);
296-
}
257+
// Helper that accumulates and groups log files during listing. Each "group" consists of all
258+
// files that share the same version number (e.g., commit, checkpoint parts, CRC files).
259+
//
260+
// We need to group by version because:
261+
// 1. A version may have multiple checkpoint parts that must be collected before we can
262+
// determine if the checkpoint is complete
263+
// 2. If a complete checkpoint exists, we can discard all commits before it
264+
//
265+
// Groups are flushed (processed) when we encounter a file with a different version or
266+
// reach EOF, at which point we check for complete checkpoints and update our state.
267+
#[derive(Default)]
268+
struct LogListingGroupBuilder {
269+
ascending_commit_files: Vec<ParsedLogPath>,
270+
ascending_compaction_files: Vec<ParsedLogPath>,
271+
checkpoint_parts: Vec<ParsedLogPath>,
272+
latest_crc_file: Option<ParsedLogPath>,
273+
latest_commit_file: Option<ParsedLogPath>,
274+
new_checkpoint_parts: Vec<ParsedLogPath>,
275+
end_version: Option<Version>,
276+
}
277+
278+
impl LogListingGroupBuilder {
279+
fn process_file(&mut self, file: ParsedLogPath) {
280+
use LogPathFileType::*;
281+
match file.file_type {
282+
Commit | StagedCommit => self.ascending_commit_files.push(file),
283+
CompactedCommit { hi } if self.end_version.is_none_or(|end| hi <= end) => {
284+
self.ascending_compaction_files.push(file);
285+
}
286+
CompactedCommit { .. } => (), // Failed the bounds check above
287+
SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
288+
self.new_checkpoint_parts.push(file)
289+
}
290+
Crc => {
291+
self.latest_crc_file.replace(file);
292+
}
293+
Unknown => {
294+
// It is possible that there are other files being stashed away into
295+
// _delta_log/ This is not necessarily forbidden, but something we
296+
// want to know about in a debugging scenario
297+
debug!(
298+
"Found file {} with unknown file type {:?} at version {}",
299+
file.filename, file.file_type, file.version
300+
);
297301
}
298302
}
299-
// Group and find the first complete checkpoint for this version.
300-
// All checkpoints for the same version are equivalent, so we only take one.
303+
}
304+
305+
// Group and find the first complete checkpoint for this version.
306+
// All checkpoints for the same version are equivalent, so we only take one.
307+
//
308+
// If this version has a complete checkpoint, we can drop the existing commit and
309+
// compaction files we collected so far -- except we must keep the latest commit.
310+
fn flush_checkpoint_group(&mut self, version: Version) {
311+
let new_checkpoint_parts = std::mem::take(&mut self.new_checkpoint_parts);
301312
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts)
302313
.into_iter()
303314
// `num_parts` is guaranteed to be non-negative and within `usize` range
304315
.find(|(num_parts, part_files)| part_files.len() == *num_parts as usize)
305316
{
306-
checkpoint_parts = complete_checkpoint;
317+
self.checkpoint_parts = complete_checkpoint;
307318
// Check if there's a commit file at the same version as this checkpoint. We pop
308319
// the last element from ascending_commit_files (which is sorted by version) and
309320
// set latest_commit_file to it only if it matches the checkpoint version. If it
310321
// doesn't match, we set latest_commit_file to None to discard any older commits
311322
// from before the checkpoint
312-
latest_commit_file = ascending_commit_files
323+
self.latest_commit_file = self
324+
.ascending_commit_files
313325
.pop()
314326
.filter(|commit| commit.version == version);
315327
// Log replay only uses commits/compactions after a complete checkpoint
316-
ascending_commit_files.clear();
317-
ascending_compaction_files.clear();
328+
self.ascending_commit_files.clear();
329+
self.ascending_compaction_files.clear();
318330
}
319331
}
332+
}
320333

321-
// Since ascending_commit_files is cleared at each checkpoint, if it's non-empty here
322-
// it contains only commits after the most recent checkpoint. The last element is the
323-
// highest version commit overall, so we update latest_commit_file to it. If it's empty,
324-
// we keep the value set at the checkpoint (if a commit existed at the checkpoint version),
325-
// or remains None.
326-
if let Some(commit_file) = ascending_commit_files.last() {
327-
latest_commit_file = Some(commit_file.clone());
334+
let mut builder = LogListingGroupBuilder {
335+
end_version,
336+
..Default::default()
337+
};
338+
339+
let mut log_files = log_files;
340+
if let Some(file) = log_files.next().transpose()? {
341+
// Process first file to establish an initial group
342+
let mut group_version = file.version;
343+
builder.process_file(file);
344+
345+
// Process remaining files, flushing the previous groups first if the version changed
346+
while let Some(file) = log_files.next().transpose()? {
347+
if file.version != group_version {
348+
builder.flush_checkpoint_group(group_version);
349+
group_version = file.version;
350+
}
351+
builder.process_file(file);
328352
}
329353

330-
ListedLogFiles::try_new(
331-
ascending_commit_files,
332-
ascending_compaction_files,
333-
checkpoint_parts,
334-
latest_crc_file,
335-
latest_commit_file,
336-
)
337-
})?
354+
// Flush the final group, which must always contain at least one file
355+
builder.flush_checkpoint_group(group_version);
356+
}
357+
358+
// Since ascending_commit_files is cleared at each checkpoint, if it's non-empty here
359+
// it contains only commits after the most recent checkpoint. The last element is the
360+
// highest version commit overall, so we update latest_commit_file to it. If it's empty,
361+
// we keep the value set at the checkpoint (if a commit existed at the checkpoint version),
362+
// or remains None.
363+
if let Some(commit_file) = builder.ascending_commit_files.last() {
364+
builder.latest_commit_file = Some(commit_file.clone());
365+
}
366+
367+
ListedLogFiles::try_new(
368+
builder.ascending_commit_files,
369+
builder.ascending_compaction_files,
370+
builder.checkpoint_parts,
371+
builder.latest_crc_file,
372+
builder.latest_commit_file,
373+
)
338374
}
339375

340376
/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all

0 commit comments

Comments
 (0)