diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 206ce3eec..963397bea 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -91,14 +91,18 @@ fn list_log_files( // NOTE: since engine APIs don't limit listing, we list from start_version and filter let files = storage .list_from(&start_from)? - .map(|meta| ParsedLogPath::try_from(meta?)) + .filter_map(|meta| match meta { + Ok(m) => ParsedLogPath::try_from(m) + .filter(|p| p.should_list()) + .map(Ok), + Err(e) => Some(Err(e)), + }) // NOTE: this filters out .crc files etc which start with "." - some engines // produce `.something.parquet.crc` corresponding to `something.parquet`. Kernel // doesn't care about these files. Critically, note these are _different_ than // normal `version.crc` files which are listed + captured normally. Additionally // we likely aren't even 'seeing' these files since lexicographically the string // "." comes before the string "0". - .filter_map_ok(|path_opt| path_opt.filter(|p| p.should_list())) .take_while(move |path_res| match path_res { // discard any path with too-large version; keep errors Ok(path) => path.version <= list_end_version, diff --git a/kernel/src/log_path.rs b/kernel/src/log_path.rs index fe468873c..33b9e7cdf 100644 --- a/kernel/src/log_path.rs +++ b/kernel/src/log_path.rs @@ -25,7 +25,7 @@ impl LogPath { /// valid log path. pub fn try_new(file_meta: FileMeta) -> DeltaResult { // TODO: we should avoid the clone - let parsed = ParsedLogPath::try_from(file_meta.clone())? + let parsed = ParsedLogPath::try_from(file_meta.clone()) .ok_or_else(|| Error::invalid_log_path(&file_meta.location))?; require!( diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 67bef982d..0c3fedd9b 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -205,7 +205,6 @@ fn create_log_path(path: &str) -> ParsedLogPath { size: 0, }) .unwrap() - .unwrap() } #[test] diff --git a/kernel/src/path.rs b/kernel/src/path.rs index 3afef3640..093ae1486 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -108,11 +108,10 @@ fn path_contains_delta_log_dir(mut path_segments: std::str::Split<'_, char>) -> impl ParsedLogPath { // NOTE: We can't actually impl TryFrom because Option is a foreign struct even if T is local. #[internal_api] - pub(crate) fn try_from(location: Location) -> DeltaResult>> { + pub(crate) fn try_from(location: Location) -> Option> { let url = location.as_url(); - let Some(mut path_segments) = url.path_segments() else { - return Ok(None); - }; + let mut path_segments = url.path_segments()?; + #[allow(clippy::unwrap_used)] let filename = path_segments .next_back() @@ -120,7 +119,7 @@ impl ParsedLogPath { .to_string(); let subdir = path_segments.next_back(); if filename.is_empty() { - return Ok(None); // Not a valid log path + return None; // Not a valid log path } let mut split = filename.split('.'); @@ -132,17 +131,16 @@ impl ParsedLogPath { // Every valid log path starts with a numeric version part. If version parsing fails, it // must not be a log path and we simply return None. However, it is an error if version // parsing succeeds for a wrong-length numeric string. - let version = match version.parse().ok() { - Some(v) if version.len() == VERSION_LEN => v, - Some(_) => return Ok(None), // has a version but it's not 20 chars - None => return Ok(None), - }; + if version.len() != VERSION_LEN { + return None; + } + let version = version.parse().ok()?; // Every valid log path has a file extension as its last part. Return None if it's missing. let split: Vec<_> = split.collect(); let extension = match split.last() { Some(extension) => extension.to_string(), - None => return Ok(None), + None => return None, }; // this check determines if we're in the delta log dir, or in the staged commits dir. The check is: @@ -179,28 +177,20 @@ impl ParsedLogPath { ["crc"] if in_delta_log_dir => LogPathFileType::Crc, ["checkpoint", "parquet"] if in_delta_log_dir => LogPathFileType::SinglePartCheckpoint, ["checkpoint", uuid, "json" | "parquet"] if in_delta_log_dir => { - let Some(_) = parse_path_part::(uuid, UUID_PART_LEN) else { - return Ok(None); - }; + let _ = parse_path_part::(uuid, UUID_PART_LEN)?; LogPathFileType::UuidCheckpoint } [hi, "compacted", "json"] if in_delta_log_dir => { - let Some(hi) = parse_path_part(hi, VERSION_LEN) else { - return Ok(None); - }; + let hi = parse_path_part(hi, VERSION_LEN)?; LogPathFileType::CompactedCommit { hi } } ["checkpoint", part_num, num_parts, "parquet"] if in_delta_log_dir => { - let Some(part_num) = parse_path_part(part_num, MULTIPART_PART_LEN) else { - return Ok(None); - }; - let Some(num_parts) = parse_path_part(num_parts, MULTIPART_PART_LEN) else { - return Ok(None); - }; + let part_num = parse_path_part(part_num, MULTIPART_PART_LEN)?; + let num_parts = parse_path_part(num_parts, MULTIPART_PART_LEN)?; // A valid part_num must be in the range [1, num_parts] if !(0 < part_num && part_num <= num_parts) { - return Ok(None); + return None; } LogPathFileType::MultiPartCheckpoint { part_num, @@ -211,13 +201,13 @@ impl ParsedLogPath { // Unrecognized log paths are allowed, so long as they have a valid version. _ => LogPathFileType::Unknown, }; - Ok(Some(ParsedLogPath { + Some(ParsedLogPath { location, filename, extension, version, file_type, - })) + }) } pub(crate) fn should_list(&self) -> bool { @@ -303,7 +293,7 @@ impl ParsedLogPath { /// Helper method to create a path with the given filename generator fn create_path(table_root: &Url, filename: String) -> DeltaResult { let location = table_root.join(DELTA_LOG_DIR_WITH_SLASH)?.join(&filename)?; - Self::try_from(location)?.ok_or_else(|| { + Self::try_from(location).ok_or_else(|| { Error::internal_error(format!("Attempted to create an invalid path: {filename}")) }) } @@ -423,7 +413,7 @@ impl LogRoot { pub(crate) fn new_commit_path(&self, version: Version) -> DeltaResult> { let filename = format!("{version:020}.json"); let path = self.log_root().join(&filename)?; - ParsedLogPath::try_from(path)?.ok_or_else(|| { + ParsedLogPath::try_from(path).ok_or_else(|| { Error::internal_error(format!("Attempted to create an invalid path: {filename}")) }) } @@ -437,7 +427,7 @@ impl LogRoot { let uuid = uuid::Uuid::new_v4(); let filename = format!("{version:020}.{uuid}.json"); let path = self.log_root().join(STAGED_COMMITS_DIR)?.join(&filename)?; - ParsedLogPath::try_from(path)?.ok_or_else(|| { + ParsedLogPath::try_from(path).ok_or_else(|| { Error::internal_error(format!("Attempted to create an invalid path: {filename}")) }) } @@ -482,21 +472,18 @@ mod tests { assert!(log_path .path() .ends_with("/tests/data/table-with-dv-small/_delta_log/subdir/")); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // ignored - not versioned let log_path = table_log_dir.join("_last_checkpoint").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // ignored - no extension let log_path = table_log_dir.join("00000000000000000010").unwrap(); let result = ParsedLogPath::try_from(log_path); - assert!( - matches!(result, Ok(None)), - "Expected Ok(None) for missing file extension" - ); + assert!(result.is_none()); // empty extension - should be treated as unknown file type let log_path = table_log_dir.join("00000000000000000011.").unwrap(); @@ -504,32 +491,32 @@ mod tests { assert!( matches!( result, - Ok(Some(ParsedLogPath { + Some(ParsedLogPath { file_type: LogPathFileType::Unknown, .. - })) + }) ), "Expected Unknown file type, got {result:?}" ); // ignored - version fails to parse let log_path = table_log_dir.join("abc.json").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // invalid - version has too many digits let log_path = table_log_dir.join("000000000000000000010.json").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // invalid - version has too few digits let log_path = table_log_dir.join("0000000000000000010.json").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // unknown - two parts let log_path = table_log_dir.join("00000000000000000010.foo").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000010.foo"); assert_eq!(log_path.extension, "foo"); assert_eq!(log_path.version, 10); @@ -540,7 +527,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000010.a.b.c.foo") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000010.a.b.c.foo"); assert_eq!(log_path.extension, "foo"); assert_eq!(log_path.version, 10); @@ -552,7 +539,7 @@ mod tests { let table_log_dir = table_log_dir_url(); let log_path = table_log_dir.join("00000000000000000000.json").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000000.json"); assert_eq!(log_path.extension, "json"); assert_eq!(log_path.version, 0); @@ -561,7 +548,7 @@ mod tests { assert!(!log_path.is_checkpoint()); let log_path = table_log_dir.join("00000000000000000005.json").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.version, 5); assert!(log_path.is_commit()); } @@ -571,7 +558,7 @@ mod tests { let table_log_dir = table_log_dir_url(); let log_path = table_log_dir.join("00000000000000000000.crc").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000000.crc"); assert_eq!(log_path.extension, "crc"); assert_eq!(log_path.version, 0); @@ -580,9 +567,9 @@ mod tests { assert!(!log_path.is_checkpoint()); let log_path = table_log_dir.join("00000000000000000005.crc").unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.version, 5); - assert!(log_path.file_type == LogPathFileType::Crc); + assert_eq!(log_path.file_type, LogPathFileType::Crc); } #[test] @@ -592,7 +579,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000002.checkpoint.parquet"); assert_eq!(log_path.extension, "parquet"); assert_eq!(log_path.version, 2); @@ -607,7 +594,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000002.checkpoint.json"); assert_eq!(log_path.extension, "json"); assert_eq!(log_path.version, 2); @@ -623,7 +610,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet" @@ -640,7 +627,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json" @@ -657,7 +644,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo" @@ -671,14 +658,14 @@ mod tests { let log_path = table_log_dir .join("00000000000000000002.checkpoint.foo.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); // invalid file extension let log_path = table_log_dir .join("00000000000000000002.checkpoint.foo") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!(log_path.filename, "00000000000000000002.checkpoint.foo"); assert_eq!(log_path.extension, "foo"); assert_eq!(log_path.version, 2); @@ -690,7 +677,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000010.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); } @@ -701,7 +688,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000000.0000000002.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000008.checkpoint.0000000000.0000000002.json" @@ -715,13 +702,13 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000000.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000001.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000008.checkpoint.0000000001.0000000002.parquet" @@ -741,7 +728,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000002.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000008.checkpoint.0000000002.0000000002.parquet" @@ -761,31 +748,31 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000003.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.checkpoint.000000001.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000001.000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.checkpoint.00000000x1.0000000002.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.checkpoint.0000000001.00000000x2.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); } @@ -796,7 +783,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.00000000000000000015.compacted.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000008.00000000000000000015.compacted.json" @@ -814,7 +801,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.00000000000000000015.compacted.parquet") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000008.00000000000000000015.compacted.parquet" @@ -828,19 +815,19 @@ mod tests { let log_path = table_log_dir .join("00000000000000000008.0000000000000000015.compacted.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.000000000000000000015.compacted.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); let log_path = table_log_dir .join("00000000000000000008.00000000000000000a15.compacted.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap(); + let log_path = ParsedLogPath::try_from(log_path); assert!(log_path.is_none()); } @@ -900,7 +887,7 @@ mod tests { let log_path = table_log_dir .join("_staged_commits/00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json" @@ -916,7 +903,7 @@ mod tests { let log_path = table_log_dir .join("_staged_commits/00000000000000000010.not-a-uuid.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert!(log_path.is_unknown()); assert!(!log_path.is_commit()); assert!(!log_path.is_checkpoint()); @@ -925,7 +912,7 @@ mod tests { let log_path = table_log_dir .join("00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json") .unwrap(); - let log_path = ParsedLogPath::try_from(log_path).unwrap().unwrap(); + let log_path = ParsedLogPath::try_from(log_path).unwrap(); assert_eq!( log_path.filename, "00000000000000000010.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json" @@ -995,7 +982,6 @@ mod tests { last_modified: 0, size: commit_content.len() as u64, }) - .unwrap() .unwrap(); // Now actually test reading the timestamp @@ -1024,7 +1010,6 @@ mod tests { last_modified: 0, size: commit_content.len() as u64, }) - .unwrap() .unwrap(); // Should return error when ICT is missing @@ -1046,7 +1031,6 @@ mod tests { last_modified: 0, size: 100, }) - .unwrap() .unwrap(); // Should return error for non-commit files diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index ae1316e9f..84393a4dd 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -1062,7 +1062,6 @@ mod tests { assert_eq!(snapshot.log_segment.checkpoint_parts.len(), 1); assert_eq!( ParsedLogPath::try_from(snapshot.log_segment.checkpoint_parts[0].location.clone()) - .unwrap() .unwrap() .version, 2, @@ -1075,7 +1074,6 @@ mod tests { .clone() ) .unwrap() - .unwrap() .version, 3, ); @@ -1395,7 +1393,7 @@ mod tests { location: url.join("_delta_log/00000000000000000000.checkpoint.parquet")?, last_modified: 0, size: 100, - })? + }) .unwrap()]; let listed_files = ListedLogFiles { @@ -1583,7 +1581,7 @@ mod tests { last_modified: 1234567890, size: 100, }; - let parsed_path = ParsedLogPath::try_from(file_meta)? + let parsed_path = ParsedLogPath::try_from(file_meta) .ok_or_else(|| Error::Generic("Failed to parse log path".to_string()))?; let log_tail = vec![parsed_path];