Skip to content

Commit 7659cef

Browse files
committed
Add CreateTable API with typestate pattern
Implement a builder-pattern API for creating Delta tables that enforces compile-time safety through typestate progression: 1. CreateTableBuilder - Configuration phase 2. CreateTableTransaction - Metadata commit phase 3. Transaction - Data operations (future) Key features: - TableManager entry point with factory method - Fluent API with method chaining - Validation: path, table existence, empty schema - Support for table properties - Protocol 3.7 with empty reader/writer features - Returns Transaction to enable future data operations Changes: - Add kernel/src/table_manager.rs - Entry point module - Add kernel/src/transaction/create_table.rs - Builder and transaction - Add kernel/tests/create_table.rs - Integration tests (5 tests, all passing) - Update kernel/src/lib.rs - Export new types - Update kernel/src/transaction/mod.rs - Add create_table module - Update kernel/examples/write-table - Use new API instead of manual log creation Follows Java Kernel's CreateTableTransactionBuilder pattern but adapted for Rust idioms and typestate safety.
1 parent 4ea0ef6 commit 7659cef

File tree

6 files changed

+631
-63
lines changed

6 files changed

+631
-63
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::fs::{create_dir_all, write};
2+
use std::fs::create_dir_all;
33
use std::path::Path;
44
use std::process::ExitCode;
55
use std::sync::Arc;
@@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
99
use clap::Parser;
1010
use common::{LocationArgs, ParseWithExamples};
1111
use itertools::Itertools;
12-
use serde_json::{json, to_vec};
1312
use url::Url;
14-
use uuid::Uuid;
1513

1614
use delta_kernel::arrow::array::TimestampMicrosecondArray;
1715
use delta_kernel::committer::FileSystemCommitter;
@@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
2018
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
2119
use delta_kernel::engine::default::DefaultEngine;
2220
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
21+
use delta_kernel::table_manager::TableManager;
2322
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
2423
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2524

@@ -155,7 +154,7 @@ async fn create_or_get_base_snapshot(
155154
// Create new table
156155
println!("Creating new Delta table...");
157156
let schema = parse_schema(schema_str)?;
158-
create_table(url, &schema).await?;
157+
create_table(url, &schema, engine).await?;
159158
Snapshot::builder_for(url.clone()).build(engine)
160159
}
161160
}
@@ -195,66 +194,19 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
195194
Ok(Arc::new(StructType::try_new(fields)?))
196195
}
197196

198-
/// Create a new Delta table with the given schema.
199-
///
200-
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
201-
/// initial transaction log.
202-
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
203-
let table_id = Uuid::new_v4().to_string();
204-
let schema_str = serde_json::to_string(&schema)?;
205-
206-
let (reader_features, writer_features) = {
207-
let reader_features: Vec<&'static str> = vec![];
208-
let writer_features: Vec<&'static str> = vec![];
209-
210-
// TODO: Support adding specific table features
211-
(reader_features, writer_features)
212-
};
197+
/// Create a new Delta table with the given schema using the official CreateTable API.
198+
async fn create_table(
199+
table_url: &Url,
200+
schema: &SchemaRef,
201+
engine: &dyn Engine,
202+
) -> DeltaResult<()> {
203+
// Use the new TableManager API to create the table
204+
let table_path = table_url.as_str();
205+
let create_txn = TableManager::create_table(table_path, schema.clone(), "write-table-example/1.0")
206+
.build(engine)?;
213207

214-
let protocol = json!({
215-
"protocol": {
216-
"minReaderVersion": 3,
217-
"minWriterVersion": 7,
218-
"readerFeatures": reader_features,
219-
"writerFeatures": writer_features,
220-
}
221-
});
222-
let partition_columns: Vec<String> = vec![];
223-
let metadata = json!({
224-
"metaData": {
225-
"id": table_id,
226-
"format": {
227-
"provider": "parquet",
228-
"options": {}
229-
},
230-
"schemaString": schema_str,
231-
"partitionColumns": partition_columns,
232-
"configuration": {},
233-
"createdTime": 1677811175819u64
234-
}
235-
});
236-
237-
let data = [
238-
to_vec(&protocol).unwrap(),
239-
b"\n".to_vec(),
240-
to_vec(&metadata).unwrap(),
241-
]
242-
.concat();
243-
244-
// Write the initial transaction with protocol and metadata to 0.json
245-
let delta_log_path = table_url
246-
.join("_delta_log/")?
247-
.to_file_path()
248-
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
249-
let file_path = delta_log_path.join("00000000000000000000.json");
250-
251-
// Create the _delta_log directory if it doesn't exist
252-
create_dir_all(&delta_log_path)
253-
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;
254-
255-
// Write the file using standard filesystem operations
256-
write(&file_path, data)
257-
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
208+
// Commit the metadata - this creates the table
209+
let _txn = create_txn.commit_metadata(engine, Box::new(FileSystemCommitter::new()))?;
258210

259211
println!("✓ Created Delta table with schema: {schema:#?}");
260212
Ok(())

kernel/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub mod snapshot;
9999
pub mod table_changes;
100100
pub mod table_configuration;
101101
pub mod table_features;
102+
pub mod table_manager;
102103
pub mod table_properties;
103104
pub mod transaction;
104105
pub(crate) mod transforms;
@@ -156,6 +157,8 @@ pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
156157
pub use log_compaction::{should_compact, LogCompactionDataIterator, LogCompactionWriter};
157158
pub use snapshot::Snapshot;
158159
pub use snapshot::SnapshotRef;
160+
pub use table_manager::TableManager;
161+
pub use transaction::create_table::{CreateTableBuilder, CreateTableTransaction};
159162

160163
use expressions::literal_expression_transform::LiteralExpressionTransform;
161164
use expressions::Scalar;

kernel/src/table_manager.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//! The `table_manager` module provides the entry point for creating and managing Delta tables.
2+
//!
3+
//! This module exposes the [`TableManager`] struct which provides static factory methods for
4+
//! creating new Delta tables with a fluent builder API.
5+
6+
use crate::schema::SchemaRef;
7+
use crate::transaction::create_table::CreateTableBuilder;
8+
9+
/// Entry point for creating and managing Delta tables.
10+
///
11+
/// `TableManager` provides static factory methods that return builders for configuring
12+
/// and creating new Delta tables.
13+
pub struct TableManager;
14+
15+
impl TableManager {
16+
/// Creates a builder for creating a new Delta table.
17+
///
18+
/// This method returns a [`CreateTableBuilder`] that can be configured with table
19+
/// properties and other options before building the transaction.
20+
///
21+
/// # Arguments
22+
///
23+
/// * `path` - The file system path where the Delta table will be created
24+
/// * `schema` - The schema for the new table
25+
/// * `engine_info` - Information about the engine creating the table (e.g., "MyApp/1.0")
26+
///
27+
/// # Example
28+
///
29+
/// ```rust,no_run
30+
/// use delta_kernel::table_manager::TableManager;
31+
/// use delta_kernel::schema::{StructType, StructField, DataType};
32+
/// use std::sync::Arc;
33+
/// # use delta_kernel::Engine;
34+
/// # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> {
35+
///
36+
/// let schema = Arc::new(StructType::new(vec![
37+
/// StructField::new("id", DataType::INTEGER, false),
38+
/// StructField::new("name", DataType::STRING, true),
39+
/// ]));
40+
///
41+
/// let create_txn = TableManager::create_table("/path/to/table", schema, "MyApp/1.0")
42+
/// .build(engine)?;
43+
/// # Ok(())
44+
/// # }
45+
/// ```
46+
pub fn create_table(
47+
path: impl AsRef<str>,
48+
schema: SchemaRef,
49+
engine_info: impl Into<String>,
50+
) -> CreateTableBuilder {
51+
CreateTableBuilder::new(path, schema, engine_info)
52+
}
53+
}

0 commit comments

Comments
 (0)