Skip to content

Commit 600fca2

Browse files
authored
feat: add FFI bindings for domain metadata write operations (delta-io#2327)
## What changes are proposed in this pull request? ### This PR affects the following public APIs Two new FFI functions are added (not breaking -- purely additive): - `with_domain_metadata(txn, domain, configuration, engine)` -- add a `domainMetadata` action to a transaction - `with_domain_metadata_removed(txn, domain, engine)` -- add a `domainMetadata` removal tombstone to a transaction Both follow the existing consume-and-return handle pattern used by `with_engine_info`. ## How was this change tested? Three new FFI tests: - `test_domain_metadata_add_and_remove` -- happy path: adds domain metadata, commits, verifies JSON in commit log; then removes it in a second transaction and verifies the tombstone - `test_domain_metadata_system_domain_rejected_at_commit` -- error path: system `delta.*` domain is rejected at commit - `test_domain_metadata_duplicate_domain_rejected_at_commit` -- error path: duplicate domain in a single transaction is rejected at commit
1 parent de79cf6 commit 600fca2

File tree

1 file changed

+319
-1
lines changed

1 file changed

+319
-1
lines changed

ffi/src/transaction/mod.rs

Lines changed: 319 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,71 @@ fn with_engine_info_impl(
148148
Ok(Box::new(txn.with_engine_info(info)).into())
149149
}
150150

151+
/// Add domain metadata to the transaction. The domain metadata will be written to the Delta log
152+
/// as a `domainMetadata` action when the transaction is committed.
153+
///
154+
/// `domain` identifies the metadata domain (e.g. `"myApp"`). `configuration` is an arbitrary
155+
/// string value associated with the domain (typically JSON).
156+
///
157+
/// Each domain can only appear once per transaction. Setting metadata for multiple distinct
158+
/// domains is allowed. Duplicate domains or setting and removing the same domain in a single
159+
/// transaction will cause the commit to fail.
160+
///
161+
/// # Safety
162+
///
163+
/// Caller is responsible for passing valid handles. CONSUMES the transaction handle and returns
164+
/// a new one.
165+
#[no_mangle]
166+
pub unsafe extern "C" fn with_domain_metadata(
167+
txn: Handle<ExclusiveTransaction>,
168+
domain: KernelStringSlice,
169+
configuration: KernelStringSlice,
170+
engine: Handle<SharedExternEngine>,
171+
) -> ExternResult<Handle<ExclusiveTransaction>> {
172+
let txn = unsafe { txn.into_inner() };
173+
let engine = unsafe { engine.as_ref() };
174+
with_domain_metadata_impl(*txn, domain, configuration).into_extern_result(&engine)
175+
}
176+
177+
fn with_domain_metadata_impl(
178+
txn: Transaction,
179+
domain: KernelStringSlice,
180+
configuration: KernelStringSlice,
181+
) -> DeltaResult<Handle<ExclusiveTransaction>> {
182+
let domain = unsafe { TryFromStringSlice::try_from_slice(&domain) }?;
183+
let configuration = unsafe { TryFromStringSlice::try_from_slice(&configuration) }?;
184+
Ok(Box::new(txn.with_domain_metadata(domain, configuration)).into())
185+
}
186+
187+
/// Remove domain metadata from the table in this transaction. A tombstone action with
188+
/// `removed: true` will be written to the Delta log when the transaction is committed.
189+
///
190+
/// The caller does not need to provide a configuration value -- the existing value is
191+
/// automatically preserved in the tombstone.
192+
///
193+
/// # Safety
194+
///
195+
/// Caller is responsible for passing valid handles. CONSUMES the transaction handle and returns
196+
/// a new one.
197+
#[no_mangle]
198+
pub unsafe extern "C" fn with_domain_metadata_removed(
199+
txn: Handle<ExclusiveTransaction>,
200+
domain: KernelStringSlice,
201+
engine: Handle<SharedExternEngine>,
202+
) -> ExternResult<Handle<ExclusiveTransaction>> {
203+
let txn = unsafe { txn.into_inner() };
204+
let engine = unsafe { engine.as_ref() };
205+
with_domain_metadata_removed_impl(*txn, domain).into_extern_result(&engine)
206+
}
207+
208+
fn with_domain_metadata_removed_impl(
209+
txn: Transaction,
210+
domain: KernelStringSlice,
211+
) -> DeltaResult<Handle<ExclusiveTransaction>> {
212+
let domain = unsafe { TryFromStringSlice::try_from_slice(&domain) }?;
213+
Ok(Box::new(txn.with_domain_metadata_removed(domain)).into())
214+
}
215+
151216
/// Add file metadata to the transaction for files that have been written. The metadata contains
152217
/// information about files written during the transaction that will be added to the Delta log
153218
/// during commit.
@@ -506,8 +571,10 @@ mod tests {
506571
use delta_kernel_ffi::engine_data::get_engine_data;
507572
use delta_kernel_ffi::engine_data::ArrowFFIData;
508573

574+
use delta_kernel_ffi::error::KernelError;
509575
use delta_kernel_ffi::ffi_test_utils::{
510-
allocate_err, allocate_str, build_snapshot, ok_or_panic, recover_error, recover_string,
576+
allocate_err, allocate_str, assert_extern_result_error_with_message, build_snapshot,
577+
ok_or_panic, recover_error, recover_string,
511578
};
512579
use delta_kernel_ffi::tests::get_default_engine;
513580

@@ -762,6 +829,257 @@ mod tests {
762829
Ok(())
763830
}
764831

832+
/// Read the commit log at `version` and return the `domainMetadata` action JSON.
833+
async fn read_domain_metadata_action(
834+
store: &dyn ObjectStore,
835+
table_url: &Url,
836+
version: u64,
837+
) -> serde_json::Value {
838+
let path = format!("_delta_log/{version:020}.json");
839+
let commit_url = table_url.join(&path).unwrap();
840+
let data = store
841+
.get(&Path::from_url_path(commit_url.path()).unwrap())
842+
.await
843+
.unwrap();
844+
let actions: Vec<serde_json::Value> =
845+
Deserializer::from_slice(&data.bytes().await.unwrap())
846+
.into_iter::<serde_json::Value>()
847+
.try_collect()
848+
.unwrap();
849+
actions
850+
.into_iter()
851+
.find(|a| a.get("domainMetadata").is_some())
852+
.expect("commit should contain a domainMetadata action")
853+
}
854+
855+
/// Create a table with the `domainMetadata` writer feature enabled and return the table
856+
/// URL, object store, and FFI engine handle.
857+
async fn setup_domain_metadata_table(
858+
dir_url: &Url,
859+
name: &str,
860+
) -> Result<(Url, Arc<dyn ObjectStore>, Handle<SharedExternEngine>), Box<dyn std::error::Error>>
861+
{
862+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
863+
"id",
864+
DataType::INTEGER,
865+
)])?);
866+
let (store, _test_engine, table_location) =
867+
test_utils::engine_store_setup(name, Some(dir_url));
868+
let table_url = test_utils::create_table(
869+
store.clone(),
870+
table_location,
871+
schema,
872+
&[],
873+
true,
874+
vec![],
875+
vec!["domainMetadata"],
876+
)
877+
.await?;
878+
let table_path = table_url.to_file_path().unwrap();
879+
let table_path_str = table_path.to_str().unwrap();
880+
let engine = get_default_engine(table_path_str);
881+
Ok((table_url, store, engine))
882+
}
883+
884+
#[tokio::test]
885+
#[cfg_attr(miri, ignore)]
886+
async fn test_domain_metadata_add_and_remove() -> Result<(), Box<dyn std::error::Error>> {
887+
let tmp_test_dir = tempdir()?;
888+
let tmp_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
889+
let (table_url, store, engine) =
890+
setup_domain_metadata_table(&tmp_dir_url, "test_dm").await?;
891+
let table_path = table_url.to_file_path().unwrap();
892+
let table_path_str = table_path.to_str().unwrap();
893+
894+
// === Transaction 1: add domain metadata ===
895+
let txn = ok_or_panic(unsafe {
896+
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
897+
});
898+
unsafe { set_data_change(txn.shallow_copy(), false) };
899+
900+
let domain = "testDomain";
901+
let configuration = r#"{"key": "value"}"#;
902+
let txn = ok_or_panic(unsafe {
903+
with_domain_metadata(
904+
txn,
905+
kernel_string_slice!(domain),
906+
kernel_string_slice!(configuration),
907+
engine.shallow_copy(),
908+
)
909+
});
910+
911+
let version = ok_or_panic(unsafe { commit(txn, engine.shallow_copy()) });
912+
assert_eq!(version, 1);
913+
914+
let dm = read_domain_metadata_action(&*store, &table_url, 1).await;
915+
assert_eq!(dm["domainMetadata"]["domain"], "testDomain");
916+
assert_eq!(dm["domainMetadata"]["configuration"], r#"{"key": "value"}"#);
917+
assert_eq!(dm["domainMetadata"]["removed"], false);
918+
919+
// === Transaction 2: remove domain metadata ===
920+
let txn = ok_or_panic(unsafe {
921+
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
922+
});
923+
unsafe { set_data_change(txn.shallow_copy(), false) };
924+
925+
let txn = ok_or_panic(unsafe {
926+
with_domain_metadata_removed(txn, kernel_string_slice!(domain), engine.shallow_copy())
927+
});
928+
929+
let version = ok_or_panic(unsafe { commit(txn, engine.shallow_copy()) });
930+
assert_eq!(version, 2);
931+
932+
let dm = read_domain_metadata_action(&*store, &table_url, 2).await;
933+
assert_eq!(dm["domainMetadata"]["domain"], "testDomain");
934+
assert_eq!(dm["domainMetadata"]["removed"], true);
935+
assert_eq!(dm["domainMetadata"]["configuration"], r#"{"key": "value"}"#);
936+
937+
unsafe { free_engine(engine) };
938+
Ok(())
939+
}
940+
941+
#[tokio::test]
942+
#[cfg_attr(miri, ignore)]
943+
async fn test_domain_metadata_system_domain_rejected_at_commit(
944+
) -> Result<(), Box<dyn std::error::Error>> {
945+
let tmp_test_dir = tempdir()?;
946+
let tmp_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
947+
let (table_url, _store, engine) =
948+
setup_domain_metadata_table(&tmp_dir_url, "test_dm_sys").await?;
949+
let table_path = table_url.to_file_path().unwrap();
950+
let table_path_str = table_path.to_str().unwrap();
951+
952+
// with_domain_metadata succeeds (validation is lazy), but commit should fail
953+
let txn = ok_or_panic(unsafe {
954+
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
955+
});
956+
unsafe { set_data_change(txn.shallow_copy(), false) };
957+
958+
let sys_domain = "delta.system";
959+
let config = "config";
960+
let txn = ok_or_panic(unsafe {
961+
with_domain_metadata(
962+
txn,
963+
kernel_string_slice!(sys_domain),
964+
kernel_string_slice!(config),
965+
engine.shallow_copy(),
966+
)
967+
});
968+
969+
let result = unsafe { commit(txn, engine.shallow_copy()) };
970+
assert_extern_result_error_with_message(
971+
result,
972+
KernelError::GenericError,
973+
Some("Generic delta kernel error: Cannot modify domains that start with 'delta.' as those are system controlled"),
974+
);
975+
976+
unsafe { free_engine(engine) };
977+
Ok(())
978+
}
979+
980+
#[tokio::test]
981+
#[cfg_attr(miri, ignore)]
982+
async fn test_domain_metadata_duplicate_domain_rejected_at_commit(
983+
) -> Result<(), Box<dyn std::error::Error>> {
984+
let tmp_test_dir = tempdir()?;
985+
let tmp_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
986+
let (table_url, _store, engine) =
987+
setup_domain_metadata_table(&tmp_dir_url, "test_dm_dup").await?;
988+
let table_path = table_url.to_file_path().unwrap();
989+
let table_path_str = table_path.to_str().unwrap();
990+
991+
// Adding the same domain twice should cause commit to fail
992+
let txn = ok_or_panic(unsafe {
993+
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
994+
});
995+
unsafe { set_data_change(txn.shallow_copy(), false) };
996+
997+
let dup_domain = "dup";
998+
let config_a = "a";
999+
let config_b = "b";
1000+
let txn = ok_or_panic(unsafe {
1001+
with_domain_metadata(
1002+
txn,
1003+
kernel_string_slice!(dup_domain),
1004+
kernel_string_slice!(config_a),
1005+
engine.shallow_copy(),
1006+
)
1007+
});
1008+
let txn = ok_or_panic(unsafe {
1009+
with_domain_metadata(
1010+
txn,
1011+
kernel_string_slice!(dup_domain),
1012+
kernel_string_slice!(config_b),
1013+
engine.shallow_copy(),
1014+
)
1015+
});
1016+
1017+
let result = unsafe { commit(txn, engine.shallow_copy()) };
1018+
assert_extern_result_error_with_message(
1019+
result,
1020+
KernelError::GenericError,
1021+
Some("Generic delta kernel error: Metadata for domain dup already specified in this transaction"),
1022+
);
1023+
1024+
unsafe { free_engine(engine) };
1025+
Ok(())
1026+
}
1027+
1028+
#[tokio::test]
1029+
#[cfg_attr(miri, ignore)]
1030+
async fn test_domain_metadata_rejected_without_feature(
1031+
) -> Result<(), Box<dyn std::error::Error>> {
1032+
let tmp_test_dir = tempdir()?;
1033+
let tmp_dir_url = Url::from_directory_path(tmp_test_dir.path()).unwrap();
1034+
1035+
// Create a table WITHOUT the domainMetadata writer feature (v1/v1 protocol)
1036+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1037+
"id",
1038+
DataType::INTEGER,
1039+
)])?);
1040+
let (store, _test_engine, table_location) =
1041+
test_utils::engine_store_setup("test_dm_no_feature", Some(&tmp_dir_url));
1042+
let table_url = test_utils::create_table(
1043+
store.clone(),
1044+
table_location,
1045+
schema,
1046+
&[],
1047+
false,
1048+
vec![],
1049+
vec![],
1050+
)
1051+
.await?;
1052+
let table_path = table_url.to_file_path().unwrap();
1053+
let table_path_str = table_path.to_str().unwrap();
1054+
let engine = get_default_engine(table_path_str);
1055+
1056+
let txn = ok_or_panic(unsafe {
1057+
transaction(kernel_string_slice!(table_path_str), engine.shallow_copy())
1058+
});
1059+
unsafe { set_data_change(txn.shallow_copy(), false) };
1060+
1061+
let domain = "myDomain";
1062+
let config = "config";
1063+
let txn = ok_or_panic(unsafe {
1064+
with_domain_metadata(
1065+
txn,
1066+
kernel_string_slice!(domain),
1067+
kernel_string_slice!(config),
1068+
engine.shallow_copy(),
1069+
)
1070+
});
1071+
1072+
let result = unsafe { commit(txn, engine.shallow_copy()) };
1073+
assert_extern_result_error_with_message(
1074+
result,
1075+
KernelError::UnsupportedError,
1076+
Some("Unsupported: Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"),
1077+
);
1078+
1079+
unsafe { free_engine(engine) };
1080+
Ok(())
1081+
}
1082+
7651083
#[cfg(feature = "delta-kernel-unity-catalog")]
7661084
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
7671085
#[cfg_attr(miri, ignore)]

0 commit comments

Comments
 (0)