Skip to content

Commit 98d5ae5

Browse files
feat(catalog-managed): UCCommitter (#1418)
## What changes are proposed in this pull request? Adds a new `UCCommitter` for writing to tables in UC (in `uc-catalog` crate for now, final resting place TBD). Additionally, adds the following RW table features in supported writer features (did we never enable `V2Checkpoint` write?) - `VacuumProtocolCheck` - `V2Checkpoint` - `CatalogManaged` (behind `catalog-managed` feature flag) - `CatalogOwnedPreview` (behind `catalog-managed` feature flag) ## How was this change tested? added an ignored test to write to cc tables; run with `ENDPOINT=".." TABLENAME=".." TOKEN=".." cargo t write_uc_table --nocapture -- --ignored`
1 parent 678f280 commit 98d5ae5

File tree

11 files changed

+283
-52
lines changed

11 files changed

+283
-52
lines changed

kernel/src/actions/mod.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,6 @@ impl Protocol {
590590
/// Check if writing to a table with this protocol is supported. That is: does the kernel
591591
/// support the specified protocol writer version and all enabled writer features?
592592
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
593-
#[cfg(feature = "catalog-managed")]
594-
require!(
595-
!self.is_catalog_managed(),
596-
Error::unsupported("Writes are not yet supported for catalog-managed tables")
597-
);
598593
match &self.writer_features {
599594
Some(writer_features) if self.min_writer_version == 7 => {
600595
// if we're on version 7, make sure we support all the specified features
@@ -1610,7 +1605,7 @@ mod tests {
16101605
.unwrap();
16111606
assert_result_error_with_message(
16121607
protocol.ensure_write_supported(),
1613-
r#"Unsupported: Found unsupported TableFeatures: "identityColumns". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
1608+
r#"Unsupported: Found unsupported TableFeatures: "identityColumns". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#,
16141609
);
16151610

16161611
// Unknown writer features are allowed during creation for forward compatibility,
@@ -1624,7 +1619,7 @@ mod tests {
16241619
.unwrap();
16251620
assert_result_error_with_message(
16261621
protocol.ensure_write_supported(),
1627-
r#"Unsupported: Found unsupported TableFeatures: "unsupported writer". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#,
1622+
r#"Unsupported: Found unsupported TableFeatures: "unsupported writer". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#,
16281623
);
16291624
}
16301625

@@ -1680,24 +1675,25 @@ mod tests {
16801675
assert_eq!(parse_features::<TableFeature>(features), expected);
16811676
}
16821677

1678+
#[cfg(feature = "catalog-managed")]
16831679
#[test]
1684-
fn test_no_catalog_managed_writes() {
1680+
fn test_catalog_managed_writes() {
16851681
let protocol = Protocol::try_new(
16861682
3,
16871683
7,
16881684
Some([TableFeature::CatalogManaged]),
16891685
Some([TableFeature::CatalogManaged]),
16901686
)
16911687
.unwrap();
1692-
assert!(protocol.ensure_write_supported().is_err());
1688+
assert!(protocol.ensure_write_supported().is_ok());
16931689
let protocol = Protocol::try_new(
16941690
3,
16951691
7,
16961692
Some([TableFeature::CatalogOwnedPreview]),
16971693
Some([TableFeature::CatalogOwnedPreview]),
16981694
)
16991695
.unwrap();
1700-
assert!(protocol.ensure_write_supported().is_err());
1696+
assert!(protocol.ensure_write_supported().is_ok());
17011697
}
17021698

17031699
#[test]

kernel/src/committer.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,17 @@ use url::Url;
4545
pub struct CommitMetadata {
4646
pub(crate) log_root: LogRoot,
4747
pub(crate) version: Version,
48+
pub(crate) in_commit_timestamp: i64,
4849
// in the future this will include Protocol, Metadata, CommitInfo, Domain Metadata, etc.
4950
}
5051

5152
impl CommitMetadata {
52-
pub(crate) fn new(log_root: LogRoot, version: Version) -> Self {
53-
Self { log_root, version }
53+
pub(crate) fn new(log_root: LogRoot, version: Version, in_commit_timestamp: i64) -> Self {
54+
Self {
55+
log_root,
56+
version,
57+
in_commit_timestamp,
58+
}
5459
}
5560

5661
/// The commit path is the absolute path (e.g. s3://bucket/table/_delta_log/{version}.json) to
@@ -73,6 +78,16 @@ impl CommitMetadata {
7378
pub fn version(&self) -> Version {
7479
self.version
7580
}
81+
82+
/// The in-commit timestamp for the commit. Note that this may differ from the actual commit
83+
/// file modification time.
84+
pub fn in_commit_timestamp(&self) -> i64 {
85+
self.in_commit_timestamp
86+
}
87+
88+
pub fn table_root(&self) -> &Url {
89+
self.log_root.table_root()
90+
}
7691
}
7792

7893
/// `CommitResponse` is the result of committing a transaction via a catalog. The committer uses
@@ -171,11 +186,14 @@ mod tests {
171186
let table_root = Url::parse("s3://my-bucket/path/to/table/").unwrap();
172187
let log_root = LogRoot::new(table_root).unwrap();
173188
let version = 42;
189+
let ts = 1234;
174190

175-
let commit_metadata = CommitMetadata::new(log_root, version);
191+
let commit_metadata = CommitMetadata::new(log_root, version, ts);
176192

177193
// version
178194
assert_eq!(commit_metadata.version(), 42);
195+
// in_commit_timestamp
196+
assert_eq!(commit_metadata.in_commit_timestamp(), 1234);
179197

180198
// published commit path
181199
let published_path = commit_metadata.published_commit_path().unwrap();
@@ -189,8 +207,9 @@ mod tests {
189207
let staged_path_str = staged_path.as_str();
190208

191209
assert!(
192-
staged_path_str
193-
.starts_with("s3://my-bucket/path/to/table/_delta_log/00000000000000000042."),
210+
staged_path_str.starts_with(
211+
"s3://my-bucket/path/to/table/_delta_log/_staged_commits/00000000000000000042."
212+
),
194213
"Staged path should start with the correct prefix, got: {}",
195214
staged_path_str
196215
);
@@ -200,15 +219,17 @@ mod tests {
200219
staged_path_str
201220
);
202221
let uuid_str = staged_path_str
203-
.strip_prefix("s3://my-bucket/path/to/table/_delta_log/00000000000000000042.")
222+
.strip_prefix(
223+
"s3://my-bucket/path/to/table/_delta_log/_staged_commits/00000000000000000042.",
224+
)
204225
.and_then(|s| s.strip_suffix(".json"))
205226
.expect("Staged path should have expected format");
206227
uuid::Uuid::parse_str(uuid_str).expect("Staged path should contain a valid UUID");
207228
}
208229

209230
#[cfg(feature = "catalog-managed")]
210231
#[tokio::test]
211-
async fn catalog_managed_tables_block_transactions() {
232+
async fn disallow_filesystem_committer_for_catalog_managed_tables() {
212233
let storage = Arc::new(InMemory::new());
213234
let table_root = Url::parse("memory:///").unwrap();
214235
let engine = DefaultEngine::new(storage.clone());
@@ -225,18 +246,16 @@ mod tests {
225246
let snapshot = crate::snapshot::SnapshotBuilder::new_for(table_root)
226247
.build(&engine)
227248
.unwrap();
228-
// Try to create a transaction with FileSystemCommitter
249+
// Try to commit a transaction with FileSystemCommitter
229250
let committer = Box::new(FileSystemCommitter::new());
230-
let err = snapshot.transaction(committer).unwrap_err();
251+
let err = snapshot
252+
.transaction(committer)
253+
.unwrap()
254+
.commit(&engine)
255+
.unwrap_err();
231256
assert!(matches!(
232257
err,
233-
crate::Error::Unsupported(e) if e.contains("Writes are not yet supported for catalog-managed tables")
258+
crate::Error::Generic(e) if e.contains("The FileSystemCommitter cannot be used to commit to catalog-managed tables. Please provide a committer for your catalog via Transaction::with_committer().")
234259
));
235-
// after allowing writes, we will check that this disallows default committer for
236-
// catalog-managed tables.
237-
// assert!(matches!(
238-
// err,
239-
// crate::Error::Generic(e) if e.contains("Cannot use the default committer for a catalog-managed table")
240-
// ));
241260
}
242261
}

kernel/src/log_path.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,27 @@ impl LogPath {
4444
last_modified: i64,
4545
size: FileSize,
4646
) -> DeltaResult<LogPath> {
47+
let commit_path = Self::staged_commit_url(table_root, filename)?;
48+
let file_meta = FileMeta {
49+
location: commit_path,
50+
last_modified,
51+
size,
52+
};
53+
LogPath::try_new(file_meta)
54+
}
55+
56+
/// Create the URL for a staged commit file given the table root and filename. The table_root
57+
/// must point to the root of the table and end with a '/'.
58+
pub fn staged_commit_url(table_root: Url, filename: &str) -> DeltaResult<Url> {
4759
// TODO: we should introduce TablePath/LogPath types which enforce checks like ending '/'
4860
if !table_root.path().ends_with('/') {
4961
return Err(Error::invalid_table_location(table_root));
5062
}
51-
52-
let commit_path = table_root
63+
table_root
5364
.join("_delta_log/")
5465
.and_then(|url| url.join("_staged_commits/"))
5566
.and_then(|url| url.join(filename))
56-
.map_err(|_| Error::invalid_table_location(table_root))?;
57-
58-
let file_meta = FileMeta {
59-
location: commit_path,
60-
last_modified,
61-
size,
62-
};
63-
LogPath::try_new(file_meta)
67+
.map_err(|_| Error::invalid_table_location(table_root))
6468
}
6569
}
6670

kernel/src/path.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const UUID_PART_LEN: usize = 36;
2323
/// The subdirectory name within the table root where the delta log resides
2424
const DELTA_LOG_DIR: &str = "_delta_log";
2525
const DELTA_LOG_DIR_WITH_SLASH: &str = "_delta_log/";
26+
/// The subdirectory name within the delta log where staged commits reside
27+
const STAGED_COMMITS_DIR: &str = "_staged_commits/";
2628

2729
#[derive(Debug, Clone, PartialEq, Eq)]
2830
#[internal_api]
@@ -387,22 +389,40 @@ impl ParsedLogPath<Url> {
387389
/// A wrapper around parsed log path to provide more structure/safety when handling
388390
/// table/log/commit paths.
389391
#[derive(Debug, Clone)]
390-
pub(crate) struct LogRoot(Url);
392+
pub(crate) struct LogRoot {
393+
table_root: Url,
394+
log_root: Url,
395+
}
391396

392397
impl LogRoot {
393398
/// Create a new LogRoot from the table root URL (e.g. s3://bucket/table ->
394399
/// s3://bucket/table/_delta_log/)
395400
///
396401
/// TODO: could take a `table_root: TableRoot`
397-
pub(crate) fn new(table_root: Url) -> DeltaResult<Self> {
398-
// FIXME: need to check for trailing slash
399-
Ok(Self(table_root.join(DELTA_LOG_DIR_WITH_SLASH)?))
402+
pub(crate) fn new(mut table_root: Url) -> DeltaResult<Self> {
403+
if !table_root.path().ends_with('/') {
404+
let new_path = format!("{}/", table_root.path());
405+
table_root.set_path(&new_path);
406+
}
407+
let log_root = table_root.join(DELTA_LOG_DIR_WITH_SLASH)?;
408+
Ok(Self {
409+
table_root,
410+
log_root,
411+
})
412+
}
413+
414+
pub(crate) fn table_root(&self) -> &Url {
415+
&self.table_root
416+
}
417+
418+
pub(crate) fn log_root(&self) -> &Url {
419+
&self.log_root
400420
}
401421

402422
/// Create a new commit path (absolute path) for the given version.
403423
pub(crate) fn new_commit_path(&self, version: Version) -> DeltaResult<ParsedLogPath<Url>> {
404424
let filename = format!("{version:020}.json");
405-
let path = self.0.join(&filename)?;
425+
let path = self.log_root().join(&filename)?;
406426
ParsedLogPath::try_from(path)?.ok_or_else(|| {
407427
Error::internal_error(format!("Attempted to create an invalid path: {filename}"))
408428
})
@@ -416,7 +436,7 @@ impl LogRoot {
416436
) -> DeltaResult<ParsedLogPath<Url>> {
417437
let uuid = uuid::Uuid::new_v4();
418438
let filename = format!("{version:020}.{uuid}.json");
419-
let path = self.0.join(&filename)?;
439+
let path = self.log_root().join(STAGED_COMMITS_DIR)?.join(&filename)?;
420440
ParsedLogPath::try_from(path)?.ok_or_else(|| {
421441
Error::internal_error(format!("Attempted to create an invalid path: {filename}"))
422442
})

kernel/src/table_configuration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,15 +702,15 @@ mod test {
702702
&["delta.enableChangeDataFeed", "delta.appendOnly"],
703703
&[ChangeDataFeed, ColumnMapping, AppendOnly],
704704
),
705-
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#)),
705+
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#)),
706706
),
707707
(
708708
// The table does not require writing CDC files, so it is safe to write to it.
709709
create_mock_table_config(
710710
&["delta.appendOnly"],
711711
&[ChangeDataFeed, ColumnMapping, AppendOnly],
712712
),
713-
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "variantType", "variantType-preview", "variantShredding-preview""#)),
713+
Err(Error::unsupported(r#"Found unsupported TableFeatures: "columnMapping". Supported TableFeatures: "changeDataFeed", "appendOnly", "catalogManaged", "catalogOwned-preview", "deletionVectors", "domainMetadata", "inCommitTimestamp", "invariants", "rowTracking", "timestampNtz", "v2Checkpoint", "vacuumProtocolCheck", "variantType", "variantType-preview", "variantShredding-preview""#)),
714714
),
715715
(
716716
// Should succeed since change data feed is not enabled

kernel/src/table_features/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ static CATALOG_MANAGED_INFO: FeatureInfo = FeatureInfo {
450450
read_support: KernelSupport::Supported,
451451
#[cfg(not(feature = "catalog-managed"))]
452452
read_support: KernelSupport::NotSupported,
453+
#[cfg(feature = "catalog-managed")]
454+
write_support: KernelSupport::Supported,
455+
#[cfg(not(feature = "catalog-managed"))]
453456
write_support: KernelSupport::NotSupported,
454457
enablement_check: EnablementCheck::AlwaysIfSupported,
455458
};
@@ -465,6 +468,9 @@ static CATALOG_OWNED_PREVIEW_INFO: FeatureInfo = FeatureInfo {
465468
read_support: KernelSupport::Supported,
466469
#[cfg(not(feature = "catalog-managed"))]
467470
read_support: KernelSupport::NotSupported,
471+
#[cfg(feature = "catalog-managed")]
472+
write_support: KernelSupport::Supported,
473+
#[cfg(not(feature = "catalog-managed"))]
468474
write_support: KernelSupport::NotSupported,
469475
enablement_check: EnablementCheck::AlwaysIfSupported,
470476
};
@@ -554,7 +560,7 @@ static VACUUM_PROTOCOL_CHECK_INFO: FeatureInfo = FeatureInfo {
554560
feature_type: FeatureType::ReaderWriter,
555561
feature_requirements: &[],
556562
read_support: KernelSupport::Supported,
557-
write_support: KernelSupport::NotSupported,
563+
write_support: KernelSupport::Supported,
558564
enablement_check: EnablementCheck::AlwaysIfSupported,
559565
};
560566

@@ -717,12 +723,18 @@ pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<Vec<TableFeature>> = LazyL
717723
vec![
718724
TableFeature::ChangeDataFeed,
719725
TableFeature::AppendOnly,
726+
#[cfg(feature = "catalog-managed")]
727+
TableFeature::CatalogManaged,
728+
#[cfg(feature = "catalog-managed")]
729+
TableFeature::CatalogOwnedPreview,
720730
TableFeature::DeletionVectors,
721731
TableFeature::DomainMetadata,
722732
TableFeature::InCommitTimestamp,
723733
TableFeature::Invariants,
724734
TableFeature::RowTracking,
725735
TableFeature::TimestampWithoutTimezone,
736+
TableFeature::V2Checkpoint,
737+
TableFeature::VacuumProtocolCheck,
726738
TableFeature::VariantType,
727739
TableFeature::VariantTypePreview,
728740
TableFeature::VariantShreddingPreview,

kernel/src/transaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ impl Transaction {
316316
return Err(Error::generic("The FileSystemCommitter cannot be used to commit to catalog-managed tables. Please provide a committer for your catalog via Transaction::with_committer()."));
317317
}
318318
let log_root = LogRoot::new(self.read_snapshot.table_root().clone())?;
319-
let commit_metadata = CommitMetadata::new(log_root, commit_version);
319+
let commit_metadata = CommitMetadata::new(log_root, commit_version, self.commit_timestamp);
320320
match self
321321
.committer
322322
.commit(engine, Box::new(filtered_actions), commit_metadata)

uc-catalog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ serde_json = "1.0"
2323
tokio = { version = "1", features = ["full"] }
2424
tracing = "0.1"
2525
url = "2"
26+
uuid = { version = "1", features = ["v4"] }
2627

2728
[dev-dependencies]
2829
delta_kernel = { path = "../kernel", features = ["arrow-56", "default-engine-rustls", "catalog-managed"] }

0 commit comments

Comments
 (0)