Skip to content

Commit 561c7e1

Browse files
feat!: new Engine StorageHandler head API (#1465)
## What changes are proposed in this pull request? In CCv2 we need a head API to fetch metadata about commits after they are published. This is similar to the [getFileStatus](https://github.com/delta-io/delta/blob/master/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java#L96) API in the java kernel. This PR (1) adds the new `head` method to the `StorageHandler` trait, and (2) implements it for the default engine ### This PR affects the following public APIs New `head` API for `StorageHandler` ## How was this change tested? New UT for default engine impl
1 parent cfd3e26 commit 561c7e1

File tree

4 files changed

+70
-0
lines changed

4 files changed

+70
-0
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,23 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
200200
Ok(())
201201
})
202202
}
203+
204+
fn head(&self, path: &Url) -> DeltaResult<FileMeta> {
205+
let store = self.inner.clone();
206+
let url = path.clone();
207+
let path = Path::from_url_path(path.path())?;
208+
self.task_executor.block_on(async move {
209+
store
210+
.head(&path)
211+
.await
212+
.map_err(Into::into)
213+
.map(|meta| FileMeta {
214+
location: url,
215+
last_modified: meta.last_modified.timestamp_millis(),
216+
size: meta.size,
217+
})
218+
})
219+
}
203220
}
204221

205222
#[cfg(test)]
@@ -357,4 +374,45 @@ mod tests {
357374
let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap();
358375
assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err());
359376
}
377+
378+
#[tokio::test]
379+
async fn test_head() {
380+
let tmp = tempfile::tempdir().unwrap();
381+
let store = Arc::new(LocalFileSystem::new());
382+
let executor = Arc::new(TokioBackgroundExecutor::new());
383+
let handler = ObjectStoreStorageHandler::new(store.clone(), executor);
384+
385+
let data = Bytes::from("test-content");
386+
let file_path = Path::from_absolute_path(tmp.path().join("test.txt")).unwrap();
387+
let write_time = current_time_duration().unwrap();
388+
store.put(&file_path, data.clone().into()).await.unwrap();
389+
390+
let file_url = Url::from_file_path(tmp.path().join("test.txt")).unwrap();
391+
let file_meta = handler.head(&file_url).unwrap();
392+
393+
assert_eq!(file_meta.location, file_url);
394+
assert_eq!(file_meta.size, data.len() as u64);
395+
396+
// Verify timestamp is within the expected range
397+
let meta_time = Duration::from_millis(file_meta.last_modified as u64);
398+
assert!(
399+
meta_time.abs_diff(write_time) < Duration::from_millis(100),
400+
"last_modified timestamp should be around {} ms, but was {} ms",
401+
write_time.as_millis(),
402+
meta_time.as_millis()
403+
);
404+
}
405+
406+
#[tokio::test]
407+
async fn test_head_non_existent() {
408+
let tmp = tempfile::tempdir().unwrap();
409+
let store = Arc::new(LocalFileSystem::new());
410+
let executor = Arc::new(TokioBackgroundExecutor::new());
411+
let handler = ObjectStoreStorageHandler::new(store, executor);
412+
413+
let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap();
414+
let result = handler.head(&missing_url);
415+
416+
assert!(matches!(result, Err(Error::FileNotFound(_))));
417+
}
360418
}

kernel/src/engine/sync/storage.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ impl StorageHandler for SyncStorageHandler {
7474
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
7575
unimplemented!("SyncStorageHandler does not implement copy");
7676
}
77+
78+
fn head(&self, _path: &Url) -> DeltaResult<FileMeta> {
79+
unimplemented!("head is not implemented for SyncStorageHandler")
80+
}
7781
}
7882

7983
#[cfg(test)]

kernel/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,11 @@ pub trait StorageHandler: AsAny {
539539
/// Copy a file atomically from source to destination. If the destination file already exists,
540540
/// it must return Err(Error::FileAlreadyExists).
541541
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
542+
543+
/// Perform a HEAD request for the given file at a Url, returning the file metadata.
544+
///
545+
/// If the file does not exist, this must return an `Err` with [`Error::FileNotFound`].
546+
fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
542547
}
543548

544549
/// Provides JSON handling functionality to Delta Kernel.

kernel/src/listed_log_files.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,9 @@ mod list_log_files_with_log_tail_tests {
637637
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
638638
panic!("copy used from {src} to {dest}");
639639
}
640+
fn head(&self, _path: &Url) -> DeltaResult<crate::FileMeta> {
641+
panic!("head used");
642+
}
640643
}
641644

642645
// when log_tail covers the entire requested range, no filesystem listing should occur

0 commit comments

Comments
 (0)