Skip to content

Commit a6f5715

Browse files
committed
Add CreateTable API with typestate pattern and comprehensive testing
This commit introduces a complete CreateTable API for Delta Kernel Rust, enabling programmatic creation of Delta tables with proper metadata tracking and validation. The API uses a typestate pattern to enforce correct usage at compile time: 1. **CreateTableBuilder** (configuration stage) - Configure table properties, schema, partition columns - Builder methods: with_table_properties(), with_partition_columns() - Validates configuration before transitioning 2. **CreateTableTransaction** (commit stage) - Commits Protocol, Metadata, and CommitInfo actions - Returns Transaction for future data operations - Enforces that metadata is committed before data operations This pattern is extensible for future table features: - Add builder methods for clustering, constraints, invariants - Add table feature flags to Protocol action - Extend validation in build() method - Maintain type safety throughout Example extensibility: ```rust CreateTableBuilder::new(path, schema, engine_info) .with_table_properties(props) .with_partition_columns(vec!["date", "region"]) // ✓ Supported .with_clustering(vec!["user_id"]) // Future .with_check_constraints(constraints) // Future .build(engine)? ``` - test_create_simple_table: Basic table creation with events schema - test_create_table_with_properties: Financial table with Delta properties - test_create_table_already_exists: Duplicate creation prevention - test_create_table_multiple_properties: Builder pattern chaining **Coverage**: Verifies tables load correctly and schemas are preserved - test_commit_info_is_written_to_log: Validates all 3 actions present * Parses actual JSON log file * Verifies Protocol (minReaderVersion=3, minWriterVersion=7) * Verifies Metadata (id, schemaString, createdTime) * Verifies CommitInfo (timestamp, operation, engineInfo, txnId, kernelVersion) - test_log_action_order: Ensures correct action ordering **Coverage**: Guarantees Delta protocol compliance and complete audit trail - test_create_table_empty_schema: Schema validation - Realistic schemas: DECIMAL types, TIMESTAMP, DATE, BOOLEAN **Coverage**: Prevents invalid table configurations ```rust use delta_kernel::table_manager::TableManager; use delta_kernel::schema::{StructType, StructField, DataType}; use delta_kernel::committer::FileSystemCommitter; use std::sync::Arc; use std::collections::HashMap; // Define schema let schema = Arc::new(StructType::try_new(vec![ StructField::new("user_id", DataType::LONG, false), StructField::new("event_type", DataType::STRING, false), StructField::new("timestamp", DataType::TIMESTAMP, false), ])?); // Configure table properties let mut properties = HashMap::new(); properties.insert("delta.appendOnly".to_string(), "true".to_string()); properties.insert("delta.checkpointInterval".to_string(), "10".to_string()); // Create table with typestate pattern let create_txn = TableManager::create_table( "/path/to/table", schema, "MyApplication/1.0" ) .with_table_properties(properties) .build(&engine)?; // Commit metadata (creates _delta_log/00000000000000000000.json) let txn = create_txn.commit_metadata(&engine, Box::new(FileSystemCommitter::new()))?; // Future: Use returned transaction for data operations // txn.add_files(data); // txn.commit(&engine)?; ``` - kernel/src/actions/mod.rs: Protocol version constants - kernel/src/transaction/create_table.rs: CommitInfo generation - kernel/tests/create_table.rs: Comprehensive test suite (7 tests) None - this is a new API addition. - Add support for partition columns in builder - Add clustering configuration - Add check constraints and invariants - Support column mapping modes - Enable schema evolution from CreateTableTransaction
1 parent 4ea0ef6 commit a6f5715

File tree

7 files changed

+876
-63
lines changed

7 files changed

+876
-63
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::fs::{create_dir_all, write};
2+
use std::fs::create_dir_all;
33
use std::path::Path;
44
use std::process::ExitCode;
55
use std::sync::Arc;
@@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
99
use clap::Parser;
1010
use common::{LocationArgs, ParseWithExamples};
1111
use itertools::Itertools;
12-
use serde_json::{json, to_vec};
1312
use url::Url;
14-
use uuid::Uuid;
1513

1614
use delta_kernel::arrow::array::TimestampMicrosecondArray;
1715
use delta_kernel::committer::FileSystemCommitter;
@@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
2018
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
2119
use delta_kernel::engine::default::DefaultEngine;
2220
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
21+
use delta_kernel::table_manager::TableManager;
2322
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
2423
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2524

@@ -155,7 +154,7 @@ async fn create_or_get_base_snapshot(
155154
// Create new table
156155
println!("Creating new Delta table...");
157156
let schema = parse_schema(schema_str)?;
158-
create_table(url, &schema).await?;
157+
create_table(url, &schema, engine).await?;
159158
Snapshot::builder_for(url.clone()).build(engine)
160159
}
161160
}
@@ -195,66 +194,16 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
195194
Ok(Arc::new(StructType::try_new(fields)?))
196195
}
197196

198-
/// Create a new Delta table with the given schema.
199-
///
200-
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
201-
/// initial transaction log.
202-
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
203-
let table_id = Uuid::new_v4().to_string();
204-
let schema_str = serde_json::to_string(&schema)?;
197+
/// Create a new Delta table with the given schema using the official CreateTable API.
198+
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
199+
// Use the new TableManager API to create the table
200+
let table_path = table_url.as_str();
201+
let create_txn =
202+
TableManager::create_table(table_path, schema.clone(), "write-table-example/1.0")
203+
.build(engine)?;
205204

206-
let (reader_features, writer_features) = {
207-
let reader_features: Vec<&'static str> = vec![];
208-
let writer_features: Vec<&'static str> = vec![];
209-
210-
// TODO: Support adding specific table features
211-
(reader_features, writer_features)
212-
};
213-
214-
let protocol = json!({
215-
"protocol": {
216-
"minReaderVersion": 3,
217-
"minWriterVersion": 7,
218-
"readerFeatures": reader_features,
219-
"writerFeatures": writer_features,
220-
}
221-
});
222-
let partition_columns: Vec<String> = vec![];
223-
let metadata = json!({
224-
"metaData": {
225-
"id": table_id,
226-
"format": {
227-
"provider": "parquet",
228-
"options": {}
229-
},
230-
"schemaString": schema_str,
231-
"partitionColumns": partition_columns,
232-
"configuration": {},
233-
"createdTime": 1677811175819u64
234-
}
235-
});
236-
237-
let data = [
238-
to_vec(&protocol).unwrap(),
239-
b"\n".to_vec(),
240-
to_vec(&metadata).unwrap(),
241-
]
242-
.concat();
243-
244-
// Write the initial transaction with protocol and metadata to 0.json
245-
let delta_log_path = table_url
246-
.join("_delta_log/")?
247-
.to_file_path()
248-
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
249-
let file_path = delta_log_path.join("00000000000000000000.json");
250-
251-
// Create the _delta_log directory if it doesn't exist
252-
create_dir_all(&delta_log_path)
253-
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;
254-
255-
// Write the file using standard filesystem operations
256-
write(&file_path, data)
257-
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
205+
// Commit the metadata - this creates the table
206+
let _txn = create_txn.commit_metadata(engine, Box::new(FileSystemCommitter::new()))?;
258207

259208
println!("✓ Created Delta table with schema: {schema:#?}");
260209
Ok(())

kernel/src/actions/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";
6868

6969
pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";
7070

71+
/// Minimum reader version for tables that use table features.
72+
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
73+
#[internal_api]
74+
pub(crate) const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;
75+
76+
/// Minimum writer version for tables that use table features.
77+
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
78+
#[internal_api]
79+
pub(crate) const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;
80+
7181
static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
7282
Arc::new(StructType::new_unchecked([
7383
StructField::nullable(ADD_NAME, Add::to_schema()),

kernel/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub mod snapshot;
9999
pub mod table_changes;
100100
pub mod table_configuration;
101101
pub mod table_features;
102+
pub mod table_manager;
102103
pub mod table_properties;
103104
pub mod transaction;
104105
pub(crate) mod transforms;
@@ -156,6 +157,8 @@ pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
156157
pub use log_compaction::{should_compact, LogCompactionDataIterator, LogCompactionWriter};
157158
pub use snapshot::Snapshot;
158159
pub use snapshot::SnapshotRef;
160+
pub use table_manager::TableManager;
161+
pub use transaction::create_table::{CreateTableBuilder, CreateTableTransaction};
159162

160163
use expressions::literal_expression_transform::LiteralExpressionTransform;
161164
use expressions::Scalar;

kernel/src/table_manager.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//! The `table_manager` module provides the entry point for creating and managing Delta tables.
2+
//!
3+
//! This module exposes the [`TableManager`] struct which provides static factory methods for
4+
//! creating new Delta tables with a fluent builder API.
5+
6+
use crate::schema::SchemaRef;
7+
use crate::transaction::create_table::CreateTableBuilder;
8+
9+
/// Entry point for creating and managing Delta tables.
10+
///
11+
/// `TableManager` provides static factory methods that return builders for configuring
12+
/// and creating new Delta tables.
13+
pub struct TableManager;
14+
15+
impl TableManager {
16+
/// Creates a builder for creating a new Delta table.
17+
///
18+
/// This method returns a [`CreateTableBuilder`] that can be configured with table
19+
/// properties and other options before building the transaction.
20+
///
21+
/// # Arguments
22+
///
23+
/// * `path` - The file system path where the Delta table will be created
24+
/// * `schema` - The schema for the new table
25+
/// * `engine_info` - Information about the engine creating the table (e.g., "MyApp/1.0")
26+
///
27+
/// # Example
28+
///
29+
/// ```rust,no_run
30+
/// use delta_kernel::table_manager::TableManager;
31+
/// use delta_kernel::schema::{StructType, StructField, DataType};
32+
/// use std::sync::Arc;
33+
/// # use delta_kernel::Engine;
34+
/// # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> {
35+
///
36+
/// let schema = Arc::new(StructType::new(vec![
37+
/// StructField::new("id", DataType::INTEGER, false),
38+
/// StructField::new("name", DataType::STRING, true),
39+
/// ]));
40+
///
41+
/// let create_txn = TableManager::create_table("/path/to/table", schema, "MyApp/1.0")
42+
/// .build(engine)?;
43+
/// # Ok(())
44+
/// # }
45+
/// ```
46+
pub fn create_table(
47+
path: impl AsRef<str>,
48+
schema: SchemaRef,
49+
engine_info: impl Into<String>,
50+
) -> CreateTableBuilder {
51+
CreateTableBuilder::new(path, schema, engine_info)
52+
}
53+
}

0 commit comments

Comments
 (0)