Skip to content

Commit ddeef7f

Browse files
feat(FFI): snapshot log tail FFI (#1379)
## What changes are proposed in this pull request? Adds two new APIs for our FFI to support catalog-managed tables: 1. `snapshot_with_log_tail` to get the latest snapshot of a catalog-managed table 2. `snapshot_at_version_with_log_tail` to get the snapshot of a catalog-managed table at a specific version And in order to implement the above, a new `LogPathArray` type is introduced to communicate `Vec<LogPath>` across FFI (basically just uses ptr + len for the array type and the `FfiLogPath` does KernelStringSlice + size + mod time) The above APIs are behind a new `catalog-managed` (FFI) feature flag similar to kernel's own `catalog-managed` flag. ## How was this change tested? new UT
1 parent fe01172 commit ddeef7f

File tree

3 files changed

+233
-12
lines changed

3 files changed

+233
-12
lines changed

ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ object_store = "0.12.3"
4646
default = ["default-engine-rustls"]
4747
default-engine-native-tls = ["delta_kernel/default-engine-native-tls", "default-engine-base"]
4848
default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-base"]
49+
catalog-managed = ["delta_kernel/catalog-managed"]
4950

5051
# This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and
5152
# default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of

ffi/src/lib.rs

Lines changed: 131 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ use url::Url;
1313

1414
use delta_kernel::schema::Schema;
1515
use delta_kernel::snapshot::Snapshot;
16-
use delta_kernel::Version;
17-
use delta_kernel::{DeltaResult, Engine, EngineData};
16+
use delta_kernel::{DeltaResult, Engine, EngineData, LogPath, Version};
1817
use delta_kernel_ffi_macros::handle_descriptor;
1918

2019
// cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle`
@@ -43,6 +42,8 @@ use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult};
4342
pub mod expressions;
4443
#[cfg(feature = "tracing")]
4544
pub mod ffi_tracing;
45+
#[cfg(feature = "catalog-managed")]
46+
pub mod log_path;
4647
pub mod scan;
4748
pub mod schema;
4849

@@ -596,10 +597,36 @@ pub unsafe extern "C" fn snapshot(
596597
) -> ExternResult<Handle<SharedSnapshot>> {
597598
let url = unsafe { unwrap_and_parse_path_as_url(path) };
598599
let engine = unsafe { engine.as_ref() };
599-
snapshot_impl(url, engine, None).into_extern_result(&engine)
600+
snapshot_impl(url, engine, None, Vec::new()).into_extern_result(&engine)
600601
}
601602

602-
/// Get the snapshot from the specified table at a specific version
603+
/// Get the latest snapshot from the specified table with optional log tail
604+
///
605+
/// # Safety
606+
///
607+
/// Caller is responsible for passing valid handles and path pointer.
608+
/// The log_paths array and its contents must remain valid for the duration of this call.
609+
#[cfg(feature = "catalog-managed")]
610+
#[no_mangle]
611+
pub unsafe extern "C" fn snapshot_with_log_tail(
612+
path: KernelStringSlice,
613+
engine: Handle<SharedExternEngine>,
614+
log_paths: log_path::LogPathArray,
615+
) -> ExternResult<Handle<SharedSnapshot>> {
616+
let url = unsafe { unwrap_and_parse_path_as_url(path) };
617+
let engine_ref = unsafe { engine.as_ref() };
618+
619+
// Convert LogPathArray to Vec<LogPath>
620+
let log_tail = match unsafe { log_paths.log_paths() } {
621+
Ok(paths) => paths,
622+
Err(err) => return Err(err).into_extern_result(&engine_ref),
623+
};
624+
625+
snapshot_impl(url, engine_ref, None, log_tail).into_extern_result(&engine_ref)
626+
}
627+
628+
/// Get the snapshot from the specified table at a specific version. Note this is only safe for
629+
/// non-catalog-managed tables.
603630
///
604631
/// # Safety
605632
///
@@ -612,21 +639,52 @@ pub unsafe extern "C" fn snapshot_at_version(
612639
) -> ExternResult<Handle<SharedSnapshot>> {
613640
let url = unsafe { unwrap_and_parse_path_as_url(path) };
614641
let engine = unsafe { engine.as_ref() };
615-
snapshot_impl(url, engine, version.into()).into_extern_result(&engine)
642+
snapshot_impl(url, engine, version.into(), Vec::new()).into_extern_result(&engine)
643+
}
644+
645+
/// Get the snapshot from the specified table at a specific version with log tail.
646+
///
647+
/// # Safety
648+
///
649+
/// Caller is responsible for passing valid handles and path pointer.
650+
/// The log_tail array and its contents must remain valid for the duration of this call.
651+
#[cfg(feature = "catalog-managed")]
652+
#[no_mangle]
653+
pub unsafe extern "C" fn snapshot_at_version_with_log_tail(
654+
path: KernelStringSlice,
655+
engine: Handle<SharedExternEngine>,
656+
version: Version,
657+
log_tail: log_path::LogPathArray,
658+
) -> ExternResult<Handle<SharedSnapshot>> {
659+
let url = unsafe { unwrap_and_parse_path_as_url(path) };
660+
let engine_ref = unsafe { engine.as_ref() };
661+
662+
// Convert LogPathArray to Vec<LogPath>
663+
let log_tail = match unsafe { log_tail.log_paths() } {
664+
Ok(paths) => paths,
665+
Err(err) => return Err(err).into_extern_result(&engine_ref),
666+
};
667+
668+
snapshot_impl(url, engine_ref, version.into(), log_tail).into_extern_result(&engine_ref)
616669
}
617670

618671
fn snapshot_impl(
619672
url: DeltaResult<Url>,
620673
extern_engine: &dyn ExternEngine,
621674
version: Option<Version>,
675+
#[allow(unused_variables)] log_tail: Vec<LogPath>,
622676
) -> DeltaResult<Handle<SharedSnapshot>> {
623-
let builder = Snapshot::builder_for(url?);
624-
let builder = if let Some(v) = version {
625-
// TODO: should we include a `with_version_opt` method for the builder?
626-
builder.at_version(v)
627-
} else {
628-
builder
629-
};
677+
let mut builder = Snapshot::builder_for(url?);
678+
679+
if let Some(v) = version {
680+
builder = builder.at_version(v);
681+
}
682+
683+
#[cfg(feature = "catalog-managed")]
684+
if !log_tail.is_empty() {
685+
builder = builder.with_log_tail(log_tail);
686+
}
687+
630688
let snapshot = builder.build(extern_engine.engine().as_ref())?;
631689
Ok(snapshot.into())
632690
}
@@ -964,4 +1022,65 @@ mod tests {
9641022
unsafe { free_engine(engine) }
9651023
Ok(())
9661024
}
1025+
1026+
#[cfg(feature = "catalog-managed")]
1027+
#[tokio::test]
1028+
async fn test_snapshot_log_tail() -> Result<(), Box<dyn std::error::Error>> {
1029+
use test_utils::add_staged_commit;
1030+
let storage = Arc::new(InMemory::new());
1031+
add_commit(
1032+
storage.as_ref(),
1033+
0,
1034+
actions_to_string(vec![TestAction::Metadata]),
1035+
)
1036+
.await?;
1037+
let commit1 = add_staged_commit(
1038+
storage.as_ref(),
1039+
1,
1040+
actions_to_string(vec![TestAction::Add("path1".into())]),
1041+
)
1042+
.await?;
1043+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
1044+
let engine = engine_to_handle(Arc::new(engine), allocate_err);
1045+
let path = "memory:///";
1046+
1047+
let commit1_path = format!(
1048+
"{}_delta_log/_staged_commits/{}",
1049+
path,
1050+
commit1.filename().unwrap()
1051+
);
1052+
let log_path =
1053+
log_path::FfiLogPath::new(kernel_string_slice!(commit1_path), 123456789, 100);
1054+
let log_tail = [log_path];
1055+
let log_tail = log_path::LogPathArray {
1056+
ptr: log_tail.as_ptr(),
1057+
len: log_tail.len(),
1058+
};
1059+
let snapshot = unsafe {
1060+
ok_or_panic(snapshot_with_log_tail(
1061+
kernel_string_slice!(path),
1062+
engine.shallow_copy(),
1063+
log_tail.clone(),
1064+
))
1065+
};
1066+
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
1067+
assert_eq!(snapshot_version, 1);
1068+
1069+
// Test getting snapshot at version
1070+
let snapshot2 = unsafe {
1071+
ok_or_panic(snapshot_at_version_with_log_tail(
1072+
kernel_string_slice!(path),
1073+
engine.shallow_copy(),
1074+
1,
1075+
log_tail,
1076+
))
1077+
};
1078+
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
1079+
assert_eq!(snapshot_version, 1);
1080+
1081+
unsafe { free_snapshot(snapshot) }
1082+
unsafe { free_snapshot(snapshot2) }
1083+
unsafe { free_engine(engine) }
1084+
Ok(())
1085+
}
9671086
}

ffi/src/log_path.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
//! FFI interface for LogPath.
2+
3+
use delta_kernel::{DeltaResult, FileMeta, LogPath};
4+
use url::Url;
5+
6+
use crate::{KernelStringSlice, TryFromStringSlice};
7+
8+
/// FFI-safe array of LogPaths. Note that we _explicitly_ do not implement `Copy` on this struct
9+
/// despite all types being `Copy`, to avoid accidental misuse of the pointer.
10+
///
11+
/// This struct is essentially a borrowed view into an array. The owner must ensure the underlying
12+
/// array remains valid for the duration of its use.
13+
#[repr(C)]
14+
#[derive(Debug, Clone)]
15+
pub struct LogPathArray {
16+
/// Pointer to the first element of the FfiLogPath array. If len is 0, this pointer may be null,
17+
/// otherwise it must be non-null.
18+
pub ptr: *const FfiLogPath,
19+
/// Number of elements in the array
20+
pub len: usize,
21+
}
22+
23+
impl LogPathArray {
24+
/// Create an empty LogPathArray
25+
pub fn empty() -> Self {
26+
Self {
27+
ptr: std::ptr::null(),
28+
len: 0,
29+
}
30+
}
31+
32+
/// Convert this array into a Vec of kernel LogPaths
33+
///
34+
/// # Safety
35+
/// The ptr must point to `len` valid FfiLogPath elements, and those elements
36+
/// must remain valid for the duration of this call
37+
pub(crate) unsafe fn log_paths(&self) -> DeltaResult<Vec<LogPath>> {
38+
if self.ptr.is_null() || self.len == 0 {
39+
return Ok(Vec::new());
40+
}
41+
42+
let slice = unsafe { std::slice::from_raw_parts(self.ptr, self.len) };
43+
slice
44+
.iter()
45+
.map(|ffi_path| unsafe { ffi_path.log_path() })
46+
.collect::<Result<Vec<_>, _>>()
47+
}
48+
}
49+
50+
/// FFI-safe LogPath representation that can be passed from the engine
51+
#[repr(C)]
52+
pub struct FfiLogPath {
53+
/// URL location of the log file
54+
location: KernelStringSlice,
55+
/// Last modified time as milliseconds since unix epoch
56+
last_modified: i64,
57+
/// Size in bytes of the log file
58+
size: u64,
59+
}
60+
61+
impl FfiLogPath {
62+
/// Create a new FFI LogPath. The location string slice must be valid UTF-8.
63+
pub fn new(location: KernelStringSlice, last_modified: i64, size: u64) -> Self {
64+
Self {
65+
location,
66+
last_modified,
67+
size,
68+
}
69+
}
70+
71+
/// URL location of the log file as a string slice
72+
pub fn location(&self) -> &KernelStringSlice {
73+
&self.location
74+
}
75+
76+
/// Last modified time as milliseconds since unix epoch
77+
pub fn last_modified(&self) -> i64 {
78+
self.last_modified
79+
}
80+
81+
/// Size in bytes of the log file
82+
pub fn size(&self) -> u64 {
83+
self.size
84+
}
85+
86+
/// Convert this FFI log path into a kernel LogPath
87+
///
88+
/// # Safety
89+
///
90+
/// The `self.location` string slice must be valid UTF-8 and represent a valid URL.
91+
unsafe fn log_path(&self) -> DeltaResult<LogPath> {
92+
let location_str = unsafe { TryFromStringSlice::try_from_slice(&self.location) }?;
93+
let url = Url::parse(location_str)?;
94+
let file_meta = FileMeta {
95+
location: url,
96+
last_modified: self.last_modified,
97+
size: self.size,
98+
};
99+
LogPath::try_new(file_meta)
100+
}
101+
}

0 commit comments

Comments
 (0)