Skip to content

Commit 678f280

Browse files
feat: Allow writes to CDF tables for add-only, remove-only, and non-data-change transactions (#1490)
### What changes are proposed in this pull request? * This PR removes the overly restrictive check that required appendOnly mode to be enabled for writes to CDF-enabled tables. Now writers can perform add-only or remove-only operations on CDF tables without requiring appendOnly at the table level. * A more targeted validation is introduced at commit time: mixed add+remove operations with dataChange=true are blocked (since kernel doesn't yet support writing CDC files), while add-only and remove-only operations are allowed. The error message has been improved to explain why the operation is blocked and suggest using separate transactions. ### How was this change tested? * Added 4 comprehensive unit tests covering all CDF write scenarios: * Add-only transactions (allowed) * Remove-only transactions (allowed) * Mixed add+remove with dataChange=false (allowed for optimization/vacuum) * Mixed add+remove with dataChange=true (blocked with helpful error message) --------- Co-authored-by: Zach Schuermann <[email protected]>
1 parent 0946f04 commit 678f280

File tree

4 files changed

+255
-27
lines changed

4 files changed

+255
-27
lines changed

kernel/src/table_configuration.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use crate::table_features::{
2020
ColumnMappingMode, EnablementCheck, FeatureInfo, FeatureType, TableFeature,
2121
};
2222
use crate::table_properties::TableProperties;
23-
use crate::utils::require;
2423
use crate::{DeltaResult, Error, Version};
2524
use delta_kernel_derive::internal_api;
2625

@@ -183,27 +182,6 @@ impl TableConfiguration {
183182
pub(crate) fn ensure_write_supported(&self) -> DeltaResult<()> {
184183
self.protocol.ensure_write_supported()?;
185184

186-
// We allow Change Data Feed to be enabled only if AppendOnly is enabled.
187-
// This is because kernel does not yet support writing `.cdc` files for DML operations.
188-
if self
189-
.table_properties()
190-
.enable_change_data_feed
191-
.unwrap_or(false)
192-
{
193-
require!(
194-
self.is_append_only_enabled(),
195-
Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled")
196-
);
197-
require!(
198-
self.is_cdf_read_supported(),
199-
Error::unsupported(
200-
"Change data feed is enabled on this table, but found invalid table
201-
table_configuration. Ensure that column mapping is disabled and ensure correct
202-
protocol reader/writer features"
203-
)
204-
);
205-
}
206-
207185
// for now we don't allow invariants so although we support writer version 2 and the
208186
// ColumnInvariant TableFeature we _must_ check here that they are not actually in use
209187
if self.is_invariants_supported()
@@ -686,17 +664,17 @@ mod test {
686664
use TableFeature::*;
687665
let cases = [
688666
(
689-
// Should fail since AppendOnly is not supported
667+
// Writing to CDF-enabled table is supported for writes
690668
create_mock_table_config(&["delta.enableChangeDataFeed"], &[ChangeDataFeed]),
691-
Err(Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled"))
669+
Ok(())
692670
),
693671
(
694-
// Should fail since AppendOnly is supported but not enabled
672+
// Should succeed even if AppendOnly is supported but not enabled
695673
create_mock_table_config(
696674
&["delta.enableChangeDataFeed"],
697675
&[ChangeDataFeed, AppendOnly],
698676
),
699-
Err(Error::unsupported("Writing to table with Change Data Feed is only supported if append only mode is enabled"))
677+
Ok(())
700678
),
701679
(
702680
// Should succeed since AppendOnly is enabled

kernel/src/transaction/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::scan::log_replay::{
2626
use crate::scan::scan_row_schema;
2727
use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType};
2828
use crate::snapshot::SnapshotRef;
29-
use crate::utils::current_time_ms;
29+
use crate::utils::{current_time_ms, require};
3030
use crate::{
3131
DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData,
3232
RowVisitor, SchemaTransform, Version,
@@ -232,6 +232,31 @@ impl Transaction {
232232
dup.app_id
233233
)));
234234
}
235+
236+
// If there are add and remove files with data change in the same transaction, we block it.
237+
// This is because kernel does not yet have a way to discern DML operations. For DML
238+
// operations that perform updates on rows, ChangeDataFeed requires that a `cdc` file be
239+
// written to the delta log.
240+
if !self.add_files_metadata.is_empty()
241+
&& !self.remove_files_metadata.is_empty()
242+
&& self.data_change
243+
{
244+
let cdf_enabled = self
245+
.read_snapshot
246+
.table_configuration()
247+
.table_properties()
248+
.enable_change_data_feed
249+
.unwrap_or(false);
250+
require!(
251+
!cdf_enabled,
252+
Error::generic(
253+
"Cannot add and remove data in the same transaction when Change Data Feed is enabled (delta.enableChangeDataFeed = true). \
254+
This would require writing CDC files for DML operations, which is not yet supported. \
255+
Consider using separate transactions: one to add files, another to remove files."
256+
)
257+
);
258+
}
259+
235260
// Step 1: Generate SetTransaction actions
236261
let set_transaction_actions = self
237262
.set_transactions

kernel/tests/write.rs

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2208,3 +2208,225 @@ async fn test_remove_files_with_modified_selection_vector() -> Result<(), Box<dy
22082208
}
22092209
Ok(())
22102210
}
2211+
2212+
// Helper function to create a table with CDF enabled
2213+
async fn create_cdf_table(
2214+
table_name: &str,
2215+
schema: SchemaRef,
2216+
) -> Result<(Url, Arc<DefaultEngine<TokioBackgroundExecutor>>, TempDir), Box<dyn std::error::Error>>
2217+
{
2218+
let tmp_dir = tempdir()?;
2219+
let tmp_test_dir_url = Url::from_directory_path(tmp_dir.path()).unwrap();
2220+
2221+
let (store, engine, table_location) = engine_store_setup(table_name, Some(&tmp_test_dir_url));
2222+
2223+
let table_url = create_table(
2224+
store.clone(),
2225+
table_location,
2226+
schema.clone(),
2227+
&[],
2228+
true, // use protocol 3.7
2229+
vec![],
2230+
vec!["changeDataFeed"],
2231+
)
2232+
.await?;
2233+
2234+
Ok((table_url, Arc::new(engine), tmp_dir))
2235+
}
2236+
2237+
// Helper function to write data to a table
2238+
async fn write_data_to_table(
2239+
table_url: &Url,
2240+
engine: &Arc<DefaultEngine<TokioBackgroundExecutor>>,
2241+
schema: SchemaRef,
2242+
values: Vec<i32>,
2243+
) -> Result<Version, Box<dyn std::error::Error>> {
2244+
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
2245+
let mut txn = snapshot
2246+
.transaction(Box::new(FileSystemCommitter::new()))?
2247+
.with_engine_info("test");
2248+
2249+
add_files_to_transaction(&mut txn, engine, schema, values).await?;
2250+
2251+
let result = txn.commit(engine.as_ref())?;
2252+
match result {
2253+
CommitResult::CommittedTransaction(committed) => Ok(committed.commit_version()),
2254+
_ => panic!("Transaction should be committed"),
2255+
}
2256+
}
2257+
2258+
// Helper function to add files to an existing transaction
2259+
async fn add_files_to_transaction(
2260+
txn: &mut delta_kernel::transaction::Transaction,
2261+
engine: &Arc<DefaultEngine<TokioBackgroundExecutor>>,
2262+
schema: SchemaRef,
2263+
values: Vec<i32>,
2264+
) -> Result<(), Box<dyn std::error::Error>> {
2265+
let data = RecordBatch::try_new(
2266+
Arc::new(schema.as_ref().try_into_arrow()?),
2267+
vec![Arc::new(Int32Array::from(values))],
2268+
)?;
2269+
2270+
let write_context = Arc::new(txn.get_write_context());
2271+
let add_files_metadata = engine
2272+
.write_parquet(
2273+
&ArrowEngineData::new(data),
2274+
write_context.as_ref(),
2275+
HashMap::new(),
2276+
)
2277+
.await?;
2278+
txn.add_files(add_files_metadata);
2279+
Ok(())
2280+
}
2281+
2282+
#[tokio::test]
2283+
async fn test_cdf_write_all_adds_succeeds() -> Result<(), Box<dyn std::error::Error>> {
2284+
// This test verifies that add-only transactions work with CDF enabled
2285+
let _ = tracing_subscriber::fmt::try_init();
2286+
2287+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
2288+
"number",
2289+
DataType::INTEGER,
2290+
)])?);
2291+
2292+
let (table_url, engine, _tmp_dir) =
2293+
create_cdf_table("test_cdf_all_adds", schema.clone()).await?;
2294+
2295+
// Add files - this should succeed
2296+
let version = write_data_to_table(&table_url, &engine, schema, vec![1, 2, 3]).await?;
2297+
assert_eq!(version, 1);
2298+
2299+
Ok(())
2300+
}
2301+
2302+
#[tokio::test]
2303+
async fn test_cdf_write_all_removes_succeeds() -> Result<(), Box<dyn std::error::Error>> {
2304+
// This test verifies that remove-only transactions work with CDF enabled
2305+
let _ = tracing_subscriber::fmt::try_init();
2306+
2307+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
2308+
"number",
2309+
DataType::INTEGER,
2310+
)])?);
2311+
2312+
let (table_url, engine, _tmp_dir) =
2313+
create_cdf_table("test_cdf_all_removes", schema.clone()).await?;
2314+
2315+
// First, add some data
2316+
write_data_to_table(&table_url, &engine, schema, vec![1, 2, 3]).await?;
2317+
2318+
// Now remove the files
2319+
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
2320+
let mut txn = snapshot
2321+
.clone()
2322+
.transaction(Box::new(FileSystemCommitter::new()))?
2323+
.with_engine_info("cdf remove test")
2324+
.with_data_change(true);
2325+
2326+
let scan = snapshot.scan_builder().build()?;
2327+
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
2328+
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
2329+
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
2330+
2331+
// This should succeed - remove-only transactions are allowed with CDF
2332+
let result = txn.commit(engine.as_ref())?;
2333+
match result {
2334+
CommitResult::CommittedTransaction(committed) => {
2335+
assert_eq!(committed.commit_version(), 2);
2336+
}
2337+
_ => panic!("Transaction should be committed"),
2338+
}
2339+
2340+
Ok(())
2341+
}
2342+
2343+
#[tokio::test]
2344+
async fn test_cdf_write_mixed_no_data_change_succeeds() -> Result<(), Box<dyn std::error::Error>> {
2345+
// This test verifies that mixed add+remove transactions work when dataChange=false.
2346+
// It's allowed because the transaction does not contain any logical data changes.
2347+
// This can happen when a table is being optimized/compacted.
2348+
let _ = tracing_subscriber::fmt::try_init();
2349+
2350+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
2351+
"number",
2352+
DataType::INTEGER,
2353+
)])?);
2354+
2355+
let (table_url, engine, _tmp_dir) =
2356+
create_cdf_table("test_cdf_mixed_no_data_change", schema.clone()).await?;
2357+
2358+
// First, add some data
2359+
write_data_to_table(&table_url, &engine, schema.clone(), vec![1, 2, 3]).await?;
2360+
2361+
// Now create a transaction with both add AND remove files, but dataChange=false
2362+
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
2363+
let mut txn = snapshot
2364+
.clone()
2365+
.transaction(Box::new(FileSystemCommitter::new()))?
2366+
.with_engine_info("cdf mixed test")
2367+
.with_data_change(false); // dataChange=false is key here
2368+
2369+
// Add new files
2370+
add_files_to_transaction(&mut txn, &engine, schema, vec![4, 5, 6]).await?;
2371+
2372+
// Also remove existing files
2373+
let scan = snapshot.scan_builder().build()?;
2374+
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
2375+
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
2376+
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
2377+
2378+
// This should succeed - mixed operations are allowed when dataChange=false
2379+
let result = txn.commit(engine.as_ref())?;
2380+
match result {
2381+
CommitResult::CommittedTransaction(committed) => {
2382+
assert_eq!(committed.commit_version(), 2);
2383+
}
2384+
_ => panic!("Transaction should be committed"),
2385+
}
2386+
2387+
Ok(())
2388+
}
2389+
2390+
#[tokio::test]
2391+
async fn test_cdf_write_mixed_with_data_change_fails() -> Result<(), Box<dyn std::error::Error>> {
2392+
// This test verifies that mixed add+remove transactions fail with helpful error when dataChange=true
2393+
let _ = tracing_subscriber::fmt::try_init();
2394+
2395+
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
2396+
"number",
2397+
DataType::INTEGER,
2398+
)])?);
2399+
2400+
let (table_url, engine, _tmp_dir) =
2401+
create_cdf_table("test_cdf_mixed_with_data_change", schema.clone()).await?;
2402+
2403+
// First, add some data
2404+
write_data_to_table(&table_url, &engine, schema.clone(), vec![1, 2, 3]).await?;
2405+
2406+
// Now create a transaction with both add AND remove files with dataChange=true
2407+
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
2408+
let mut txn = snapshot
2409+
.clone()
2410+
.transaction(Box::new(FileSystemCommitter::new()))?
2411+
.with_engine_info("cdf mixed fail test")
2412+
.with_data_change(true); // dataChange=true - this should fail
2413+
2414+
// Add new files
2415+
add_files_to_transaction(&mut txn, &engine, schema, vec![4, 5, 6]).await?;
2416+
2417+
// Also remove existing files
2418+
let scan = snapshot.scan_builder().build()?;
2419+
let scan_metadata = scan.scan_metadata(engine.as_ref())?.next().unwrap()?;
2420+
let (data, selection_vector) = scan_metadata.scan_files.into_parts();
2421+
txn.remove_files(FilteredEngineData::try_new(data, selection_vector)?);
2422+
2423+
// This should fail with our new error message
2424+
assert_result_error_with_message(
2425+
txn.commit(engine.as_ref()),
2426+
"Cannot add and remove data in the same transaction when Change Data Feed is enabled (delta.enableChangeDataFeed = true). \
2427+
This would require writing CDC files for DML operations, which is not yet supported. \
2428+
Consider using separate transactions: one to add files, another to remove files."
2429+
);
2430+
2431+
Ok(())
2432+
}

test-utils/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ pub async fn create_table(
325325
json!("1612345678"),
326326
);
327327
}
328+
if writer_features.contains(&"changeDataFeed") {
329+
config.insert("delta.enableChangeDataFeed".to_string(), json!("true"));
330+
}
328331

329332
config
330333
};

0 commit comments

Comments
 (0)