Skip to content

Commit 09a82a3

Browse files
feat: support writing domain metadata (2/2) (delta-io#1275)
## What changes are proposed in this pull request? This PR is (2/2) to support writing domain metadata. It adds support for removing metadata for user-specified domains. As per the Delta [specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#reader-requirements-for-domain-metadata:~:text=When%20true%2C%20the%20action%20serves%20as%20a%20tombstone%20to%20logically%20delete%20a%20metadata%20domain.%20Writers%20should%20preserve%20an%20accurate%20pre%2Dimage%20of%20the%20configuration.), for a removed domain metadata "writers should preserve an accurate pre-image of the configuration". Thus for any removals, we need to perform a log replay and restore the original configuration of the domain with the `removed` flag set to `true`. Furthermore, for any removal where the domain does not already exist in the log, we treat this as a no-op and do not write any record to the Delta Log. - resolves delta-io#1270 ## How was this change tested? Added new integration tests in `write.rs`. --------- Co-authored-by: Zach Schuermann <zachary.zvs@gmail.com>
1 parent e05af8c commit 09a82a3

File tree

4 files changed

+285
-14
lines changed

4 files changed

+285
-14
lines changed

kernel/src/actions/domain_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub(crate) fn domain_metadata_configuration(
3838
/// Scan the entire log for all domain metadata actions but terminate early if a specific domain
3939
/// is provided. Note that this returns the latest domain metadata for each domain, accounting for
4040
/// tombstones (removed=true) - that is, removed domain metadatas will _never_ be returned.
41-
fn scan_domain_metadatas(
41+
pub(crate) fn scan_domain_metadatas(
4242
log_segment: &LogSegment,
4343
domain: Option<&str>,
4444
engine: &dyn Engine,

kernel/src/actions/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,15 @@ impl DomainMetadata {
969969
}
970970
}
971971

972+
// Create a new DomainMetadata action to remove a domain.
973+
pub(crate) fn remove(domain: String, configuration: String) -> Self {
974+
Self {
975+
domain,
976+
configuration,
977+
removed: true,
978+
}
979+
}
980+
972981
// returns true if the domain metadata is an system-controlled domain (all domains that start
973982
// with "delta.")
974983
#[allow(unused)]
@@ -979,6 +988,14 @@ impl DomainMetadata {
979988
pub(crate) fn domain(&self) -> &str {
980989
&self.domain
981990
}
991+
992+
pub(crate) fn configuration(&self) -> &str {
993+
&self.configuration
994+
}
995+
996+
pub(crate) fn is_removed(&self) -> bool {
997+
self.removed
998+
}
982999
}
9831000

9841001
#[cfg(test)]

kernel/src/transaction/mod.rs

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
use std::collections::HashSet;
1+
use std::collections::{HashMap, HashSet};
22
use std::iter;
33
use std::ops::Deref;
44
use std::sync::{Arc, LazyLock};
55

66
use url::Url;
77

88
use crate::actions::{
9-
as_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema,
10-
get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
9+
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
10+
get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
1111
};
1212
#[cfg(feature = "catalog-managed")]
1313
use crate::committer::FileSystemCommitter;
@@ -352,7 +352,27 @@ impl Transaction {
352352
self
353353
}
354354

355+
/// Remove domain metadata from the Delta log.
356+
/// If the domain exists in the Delta log, this creates a tombstone to logically delete
357+
/// the domain. The tombstone preserves the previous configuration value.
358+
/// If the domain does not exist in the Delta log, this is a no-op.
359+
/// Note that each domain can only appear once per transaction. That is, multiple operations
360+
/// on the same domain are disallowed in a single transaction, as well as setting and removing
361+
/// the same domain in a single transaction. If a duplicate domain is included, the `commit` will
362+
/// fail (that is, we don't eagerly check domain validity here).
363+
/// Removing metadata for multiple distinct domains is allowed.
364+
pub fn with_domain_metadata_removed(mut self, domain: String) -> Self {
365+
// actual configuration value determined during commit
366+
self.domain_metadatas
367+
.push(DomainMetadata::remove(domain, String::new()));
368+
self
369+
}
370+
355371
/// Generate domain metadata actions with validation. Handle both user and system domains.
372+
///
373+
/// This function may perform an expensive log replay operation if there are any domain removals.
374+
/// The log replay is required to fetch the previous configuration value for the domain to preserve
375+
/// in removal tombstones as mandated by the Delta spec.
356376
fn generate_domain_metadata_actions<'a>(
357377
&'a self,
358378
engine: &'a dyn Engine,
@@ -370,32 +390,58 @@ impl Transaction {
370390
));
371391
}
372392

373-
// validate domain metadata
374-
let mut domains = HashSet::new();
375-
for domain_metadata in &self.domain_metadatas {
376-
if domain_metadata.is_internal() {
393+
// validate user domain metadata and check if we have removals
394+
let mut seen_domains = HashSet::new();
395+
let mut has_removals = false;
396+
for dm in &self.domain_metadatas {
397+
if dm.is_internal() {
377398
return Err(Error::Generic(
378399
"Cannot modify domains that start with 'delta.' as those are system controlled"
379400
.to_string(),
380401
));
381402
}
382-
if !domains.insert(domain_metadata.domain()) {
403+
404+
if !seen_domains.insert(dm.domain()) {
383405
return Err(Error::Generic(format!(
384406
"Metadata for domain {} already specified in this transaction",
385-
domain_metadata.domain()
407+
dm.domain()
386408
)));
387409
}
410+
411+
if dm.is_removed() {
412+
has_removals = true;
413+
}
388414
}
389415

416+
// fetch previous configuration values (requires log replay)
417+
let existing_domains = if has_removals {
418+
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?
419+
} else {
420+
HashMap::new()
421+
};
422+
423+
let user_domains = self
424+
.domain_metadatas
425+
.iter()
426+
.filter_map(move |dm: &DomainMetadata| {
427+
if dm.is_removed() {
428+
existing_domains.get(dm.domain()).map(|existing| {
429+
DomainMetadata::remove(
430+
dm.domain().to_string(),
431+
existing.configuration().to_string(),
432+
)
433+
})
434+
} else {
435+
Some(dm.clone())
436+
}
437+
});
438+
390439
let system_domains = row_tracking_high_watermark
391440
.map(DomainMetadata::try_from)
392441
.transpose()?
393442
.into_iter();
394443

395-
Ok(self
396-
.domain_metadatas
397-
.iter()
398-
.cloned()
444+
Ok(user_domains
399445
.chain(system_domains)
400446
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)))
401447
}

kernel/tests/write.rs

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,214 @@ async fn test_set_domain_metadata_unsupported_writer_feature(
14091409
Ok(())
14101410
}
14111411

1412+
#[tokio::test]
1413+
async fn test_remove_domain_metadata_non_existent_domain() -> Result<(), Box<dyn std::error::Error>>
1414+
{
1415+
let _ = tracing_subscriber::fmt::try_init();
1416+
1417+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1418+
"number",
1419+
DataType::INTEGER,
1420+
)])?);
1421+
1422+
let table_name = "test_domain_metadata_unsupported";
1423+
1424+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1425+
let table_url = create_table(
1426+
store.clone(),
1427+
table_location,
1428+
schema.clone(),
1429+
&[],
1430+
true,
1431+
vec![],
1432+
vec!["domainMetadata"],
1433+
)
1434+
.await?;
1435+
1436+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1437+
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()))?;
1438+
1439+
let domain = "app.deprecated";
1440+
1441+
// removing domain metadata that doesn't exist should NOT write a tombstone
1442+
let _ = txn
1443+
.with_domain_metadata_removed(domain.to_string())
1444+
.commit(&engine)?;
1445+
1446+
let commit_data = store
1447+
.get(&Path::from(format!(
1448+
"/{table_name}/_delta_log/00000000000000000001.json"
1449+
)))
1450+
.await?
1451+
.bytes()
1452+
.await?;
1453+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1454+
.into_iter()
1455+
.try_collect()?;
1456+
1457+
let domain_action = actions.iter().find(|v| v.get("domainMetadata").is_some());
1458+
assert!(
1459+
domain_action.is_none(),
1460+
"No tombstone should be written for non-existent domain"
1461+
);
1462+
1463+
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1464+
let config = final_snapshot.get_domain_metadata(domain, &engine)?;
1465+
assert_eq!(config, None);
1466+
1467+
Ok(())
1468+
}
1469+
1470+
#[tokio::test]
1471+
async fn test_domain_metadata_set_remove_conflicts() -> Result<(), Box<dyn std::error::Error>> {
1472+
let _ = tracing_subscriber::fmt::try_init();
1473+
1474+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1475+
"number",
1476+
DataType::INTEGER,
1477+
)])?);
1478+
1479+
let table_name = "test_domain_metadata_unsupported";
1480+
1481+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1482+
let table_url = create_table(
1483+
store.clone(),
1484+
table_location,
1485+
schema.clone(),
1486+
&[],
1487+
true,
1488+
vec![],
1489+
vec!["domainMetadata"],
1490+
)
1491+
.await?;
1492+
1493+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1494+
1495+
// set then remove same domain
1496+
let txn = snapshot
1497+
.clone()
1498+
.transaction(Box::new(FileSystemCommitter::new()))?;
1499+
let err = txn
1500+
.with_domain_metadata("app.config".to_string(), "v1".to_string())
1501+
.with_domain_metadata_removed("app.config".to_string())
1502+
.commit(&engine)
1503+
.unwrap_err();
1504+
assert!(err
1505+
.to_string()
1506+
.contains("already specified in this transaction"));
1507+
1508+
// remove then set same domain
1509+
let txn2 = snapshot
1510+
.clone()
1511+
.transaction(Box::new(FileSystemCommitter::new()))?;
1512+
let err = txn2
1513+
.with_domain_metadata_removed("test.domain".to_string())
1514+
.with_domain_metadata("test.domain".to_string(), "v1".to_string())
1515+
.commit(&engine)
1516+
.unwrap_err();
1517+
assert!(err
1518+
.to_string()
1519+
.contains("already specified in this transaction"));
1520+
1521+
// remove same domain twice
1522+
let txn3 = snapshot
1523+
.clone()
1524+
.transaction(Box::new(FileSystemCommitter::new()))?;
1525+
let err = txn3
1526+
.with_domain_metadata_removed("another.domain".to_string())
1527+
.with_domain_metadata_removed("another.domain".to_string())
1528+
.commit(&engine)
1529+
.unwrap_err();
1530+
assert!(err
1531+
.to_string()
1532+
.contains("already specified in this transaction"));
1533+
1534+
// remove system domain
1535+
let txn4 = snapshot
1536+
.clone()
1537+
.transaction(Box::new(FileSystemCommitter::new()))?;
1538+
let err = txn4
1539+
.with_domain_metadata_removed("delta.system".to_string())
1540+
.commit(&engine)
1541+
.unwrap_err();
1542+
assert!(err
1543+
.to_string()
1544+
.contains("Cannot modify domains that start with 'delta.' as those are system controlled"));
1545+
1546+
Ok(())
1547+
}
1548+
1549+
#[tokio::test]
1550+
async fn test_domain_metadata_set_then_remove() -> Result<(), Box<dyn std::error::Error>> {
1551+
let _ = tracing_subscriber::fmt::try_init();
1552+
1553+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
1554+
"number",
1555+
DataType::INTEGER,
1556+
)])?);
1557+
1558+
let table_name = "test_domain_metadata_unsupported";
1559+
1560+
let (store, engine, table_location) = engine_store_setup(table_name, None);
1561+
let table_url = create_table(
1562+
store.clone(),
1563+
table_location,
1564+
schema.clone(),
1565+
&[],
1566+
true,
1567+
vec![],
1568+
vec!["domainMetadata"],
1569+
)
1570+
.await?;
1571+
1572+
let domain = "app.config";
1573+
let configuration = r#"{"version": 1}"#;
1574+
1575+
// txn 1: set domain metadata
1576+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1577+
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()))?;
1578+
let _ = txn
1579+
.with_domain_metadata(domain.to_string(), configuration.to_string())
1580+
.commit(&engine)?;
1581+
1582+
// txn 2: remove the same domain metadata
1583+
let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1584+
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()))?;
1585+
let _ = txn
1586+
.with_domain_metadata_removed(domain.to_string())
1587+
.commit(&engine)?;
1588+
1589+
// verify removal commit preserves the previous configuration
1590+
let commit_data = store
1591+
.get(&Path::from(format!(
1592+
"/{table_name}/_delta_log/00000000000000000002.json"
1593+
)))
1594+
.await?
1595+
.bytes()
1596+
.await?;
1597+
let actions: Vec<serde_json::Value> = Deserializer::from_slice(&commit_data)
1598+
.into_iter()
1599+
.try_collect()?;
1600+
1601+
let domain_action = actions
1602+
.iter()
1603+
.find(|v| v.get("domainMetadata").is_some())
1604+
.unwrap();
1605+
assert_eq!(domain_action["domainMetadata"]["domain"], domain);
1606+
assert_eq!(
1607+
domain_action["domainMetadata"]["configuration"],
1608+
configuration
1609+
);
1610+
assert_eq!(domain_action["domainMetadata"]["removed"], true);
1611+
1612+
// verify reads see the metadata removal
1613+
let final_snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?;
1614+
let domain_config = final_snapshot.get_domain_metadata(domain, &engine)?;
1615+
assert_eq!(domain_config, None);
1616+
1617+
Ok(())
1618+
}
1619+
14121620
async fn get_ict_at_version(
14131621
store: Arc<dyn ObjectStore>,
14141622
table_url: &Url,

0 commit comments

Comments
 (0)