Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 12 additions & 63 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::fs::{create_dir_all, write};
use std::fs::create_dir_all;
use std::path::Path;
use std::process::ExitCode;
use std::sync::Arc;
Expand All @@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
use itertools::Itertools;
use serde_json::{json, to_vec};
use url::Url;
use uuid::Uuid;

use delta_kernel::arrow::array::TimestampMicrosecondArray;
use delta_kernel::committer::FileSystemCommitter;
Expand All @@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::table_manager::TableManager;
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

Expand Down Expand Up @@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
// Create new table
println!("Creating new Delta table...");
let schema = parse_schema(schema_str)?;
create_table(url, &schema).await?;
create_table(url, &schema, engine).await?;
Snapshot::builder_for(url.clone()).build(engine)
}
}
Expand Down Expand Up @@ -192,66 +191,16 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
Ok(Arc::new(StructType::try_new(fields)?))
}

/// Create a new Delta table with the given schema.
///
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
/// initial transaction log.
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
let table_id = Uuid::new_v4().to_string();
let schema_str = serde_json::to_string(&schema)?;
/// Create a new Delta table with the given schema using the official CreateTable API.
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
// Use the new TableManager API to create the table
let table_path = table_url.as_str();
let create_txn =
TableManager::create_table(table_path, schema.clone(), "write-table-example/1.0")
.build(engine)?;

let (reader_features, writer_features) = {
let reader_features: Vec<&'static str> = vec![];
let writer_features: Vec<&'static str> = vec![];

// TODO: Support adding specific table features
(reader_features, writer_features)
};

let protocol = json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": reader_features,
"writerFeatures": writer_features,
}
});
let partition_columns: Vec<String> = vec![];
let metadata = json!({
"metaData": {
"id": table_id,
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": schema_str,
"partitionColumns": partition_columns,
"configuration": {},
"createdTime": 1677811175819u64
}
});

let data = [
to_vec(&protocol).unwrap(),
b"\n".to_vec(),
to_vec(&metadata).unwrap(),
]
.concat();

// Write the initial transaction with protocol and metadata to 0.json
let delta_log_path = table_url
.join("_delta_log/")?
.to_file_path()
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
let file_path = delta_log_path.join("00000000000000000000.json");

// Create the _delta_log directory if it doesn't exist
create_dir_all(&delta_log_path)
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;

// Write the file using standard filesystem operations
write(&file_path, data)
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
// Commit the metadata - this creates the table
let _txn = create_txn.commit_metadata(engine, Box::new(FileSystemCommitter::new()))?;

println!("✓ Created Delta table with schema: {schema:#?}");
Ok(())
Expand Down
10 changes: 10 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";

pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";

/// Minimum reader version for tables that use table features.
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
#[internal_api]
pub(crate) const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;

/// Minimum writer version for tables that use table features.
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
#[internal_api]
pub(crate) const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;

static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub mod snapshot;
pub mod table_changes;
pub mod table_configuration;
pub mod table_features;
pub mod table_manager;
pub mod table_properties;
pub mod transaction;
pub(crate) mod transforms;
Expand Down Expand Up @@ -156,6 +157,8 @@ pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
pub use log_compaction::{should_compact, LogCompactionWriter};
pub use snapshot::Snapshot;
pub use snapshot::SnapshotRef;
pub use table_manager::TableManager;
pub use transaction::create_table::{CreateTableBuilder, CreateTableTransaction};

use expressions::literal_expression_transform::LiteralExpressionTransform;
use expressions::Scalar;
Expand Down
53 changes: 53 additions & 0 deletions kernel/src/table_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! The `table_manager` module provides the entry point for creating and managing Delta tables.
//!
//! This module exposes the [`TableManager`] struct which provides static factory methods for
//! creating new Delta tables with a fluent builder API.

use crate::schema::SchemaRef;
use crate::transaction::create_table::CreateTableBuilder;

/// Entry point for creating and managing Delta tables.
///
/// `TableManager` provides static factory methods that return builders for configuring
/// and creating new Delta tables.
pub struct TableManager;

impl TableManager {
/// Creates a builder for creating a new Delta table.
///
/// This method returns a [`CreateTableBuilder`] that can be configured with table
/// properties and other options before building the transaction.
///
/// # Arguments
///
/// * `path` - The file system path where the Delta table will be created
/// * `schema` - The schema for the new table
/// * `engine_info` - Information about the engine creating the table (e.g., "MyApp/1.0")
///
/// # Example
///
/// ```rust,no_run
/// use delta_kernel::table_manager::TableManager;
/// use delta_kernel::schema::{StructType, StructField, DataType};
/// use std::sync::Arc;
/// # use delta_kernel::Engine;
/// # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> {
///
/// let schema = Arc::new(StructType::try_new(vec![
/// StructField::new("id", DataType::INTEGER, false),
/// StructField::new("name", DataType::STRING, true),
/// ])?);
///
/// let create_txn = TableManager::create_table("/path/to/table", schema, "MyApp/1.0")
/// .build(engine)?;
/// # Ok(())
/// # }
/// ```
pub fn create_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fly by comment:

  • Kernel Java: We have TableManager as our one entry point into Kernel. That is: all Snapshot loading (which is used for reads and writes), all table creation, and all streaming goes through this API (it exposes APIs to other classes, like SnapshotBuiler, CreateTableTransactionBuilder, etc.)
  • Kernel Rust: No TableManager, instead has APIs on underlying classes/traits/impls e.g. SnapshotBuilder

So I encourage: either fully adopting TableManager, or not adopting TableManager, and deleting this TableManager.create_table API

path: impl AsRef<str>,
schema: SchemaRef,
engine_info: impl Into<String>,
) -> CreateTableBuilder {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Kernel Rust have TransctionBuilders yet?

Though verbose, Kernel Java's CreateTableTransactionBuilder naming convention makes it clear that: this is a builder for a transaction (yields a Transaction) that should be used to create a table.

The callout being: my_create_table_instance_builder.build() might make a caller think that that has "built" the table -- wheras instead all it has done is "built" the Transaction instance

CreateTableBuilder::new(path, schema, engine_info)
}
}
Loading
Loading