Skip to content

Commit 0984176

Browse files
feat: Add write support with streaming API for DuckLake catalogs (#46)
1 parent af19f81 commit 0984176

16 files changed

+2819
-22
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ thiserror = "2.0"
3232
base64 = { version = "0.22", optional = true }
3333
hex = { version = "0.4", optional = true }
3434

35+
# Write support (optional)
36+
uuid = { version = "1.0", features = ["v4"], optional = true }
37+
3538
# Metadata providers (optional)
3639
duckdb = { version = "1.4.1", features = ["bundled"], optional = true }
3740
sqlx = { version = "0.8", features = ["runtime-tokio"], optional = true }
@@ -58,4 +61,8 @@ metadata-mysql = ["dep:sqlx", "sqlx/mysql", "sqlx/chrono"]
5861
metadata-sqlite = ["dep:sqlx", "sqlx/sqlite", "sqlx/chrono"]
5962

6063
# Encryption support for Parquet files
61-
encryption = ["parquet/encryption", "datafusion/parquet_encryption", "dep:base64", "dep:hex"]
64+
encryption = ["parquet/encryption", "datafusion/parquet_encryption", "dep:base64", "dep:hex"]
65+
66+
# Write support
67+
write = ["dep:uuid"]
68+
write-sqlite = ["write", "metadata-sqlite"]

src/catalog.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use crate::Result;
77
use crate::information_schema::InformationSchemaProvider;
88
use crate::metadata_provider::MetadataProvider;
9-
use crate::path_resolver::parse_object_store_url;
9+
use crate::path_resolver::{parse_object_store_url, resolve_path};
1010
use crate::schema::DuckLakeSchema;
1111
use datafusion::catalog::{CatalogProvider, SchemaProvider};
1212
use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -116,14 +116,9 @@ impl CatalogProvider for DuckLakeCatalog {
116116
// Query database with the pinned snapshot_id for data schemas
117117
match self.provider.get_schema_by_name(name, self.snapshot_id) {
118118
Ok(Some(meta)) => {
119-
// Resolve schema path hierarchically
120-
let schema_path = if meta.path_is_relative {
121-
// Schema path is relative to catalog path
122-
format!("{}{}", self.catalog_path, meta.path)
123-
} else {
124-
// Schema path is absolute
125-
meta.path
126-
};
119+
// Resolve schema path hierarchically using path_resolver utility
120+
let schema_path =
121+
resolve_path(&self.catalog_path, &meta.path, meta.path_is_relative);
127122

128123
// Pass the pinned snapshot_id to schema
129124
Some(Arc::new(DuckLakeSchema::new(

src/encryption.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl DuckLakeEncryptionFactory {
126126
/// 2. Hex (if decodes to valid AES length)
127127
/// 3. Raw bytes (if exactly 16, 24, or 32 chars)
128128
#[cfg(feature = "encryption")]
129-
fn decode_key(key: &str) -> Result<Vec<u8>> {
129+
pub fn decode_key(key: &str) -> Result<Vec<u8>> {
130130
use base64::Engine;
131131
use datafusion::error::DataFusionError;
132132

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ pub enum DuckLakeError {
5959
#[error("IO error: {0}")]
6060
Io(#[from] std::io::Error),
6161

62+
/// Parquet error
63+
#[error("Parquet error: {0}")]
64+
Parquet(#[from] parquet::errors::ParquetError),
65+
6266
/// Generic error
6367
#[error("Internal error: {0}")]
6468
Internal(String),

src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ pub mod metadata_provider_postgres;
6060
#[cfg(feature = "metadata-sqlite")]
6161
pub mod metadata_provider_sqlite;
6262

63+
// Write support (feature-gated)
64+
#[cfg(feature = "write")]
65+
pub mod metadata_writer;
66+
#[cfg(feature = "write-sqlite")]
67+
pub mod metadata_writer_sqlite;
68+
#[cfg(feature = "write")]
69+
pub mod table_writer;
70+
6371
// Result type for DuckLake operations
6472
pub type Result<T> = std::result::Result<T, DuckLakeError>;
6573

@@ -80,3 +88,13 @@ pub use metadata_provider_mysql::MySqlMetadataProvider;
8088
pub use metadata_provider_postgres::PostgresMetadataProvider;
8189
#[cfg(feature = "metadata-sqlite")]
8290
pub use metadata_provider_sqlite::SqliteMetadataProvider;
91+
92+
// Re-export write types (feature-gated)
93+
#[cfg(feature = "write")]
94+
pub use metadata_writer::{
95+
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteResult, WriteSetupResult,
96+
};
97+
#[cfg(feature = "write-sqlite")]
98+
pub use metadata_writer_sqlite::SqliteMetadataWriter;
99+
#[cfg(feature = "write")]
100+
pub use table_writer::{DuckLakeTableWriter, TableWriteSession};

src/metadata_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub const SQL_LIST_TABLES: &str =
1919

2020
pub const SQL_GET_TABLE_COLUMNS: &str = "SELECT column_id, column_name, column_type, nulls_allowed
2121
FROM ducklake_column
22-
WHERE table_id = ?
22+
WHERE table_id = ? AND end_snapshot IS NULL
2323
ORDER BY column_order";
2424

2525
pub const SQL_GET_DATA_FILES: &str = "

src/metadata_provider_sqlite.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl MetadataProvider for SqliteMetadataProvider {
142142
let rows = sqlx::query(
143143
"SELECT column_id, column_name, column_type, nulls_allowed
144144
FROM ducklake_column
145-
WHERE table_id = ?
145+
WHERE table_id = ? AND end_snapshot IS NULL
146146
ORDER BY column_order",
147147
)
148148
.bind(table_id)

src/metadata_writer.rs

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
//! Metadata writer trait and common types for DuckLake catalog writes.
2+
//!
3+
//! This module provides the `MetadataWriter` trait for writing metadata to DuckLake catalogs,
4+
//! along with helper types for column definitions and data file registration.
5+
6+
use crate::Result;
7+
8+
/// Write mode for table operations.
9+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10+
pub enum WriteMode {
11+
/// Drop existing data and replace with new data
12+
Replace,
13+
/// Keep existing data and append new records
14+
Append,
15+
}
16+
use crate::types::arrow_to_ducklake_type;
17+
use arrow::datatypes::DataType;
18+
19+
/// Column definition for creating or updating a table's schema.
20+
///
21+
/// Unlike `DuckLakeTableColumn` (used for reading), this struct doesn't have a `column_id`
22+
/// field since IDs are assigned by the catalog during write operations.
23+
#[derive(Debug, Clone)]
24+
pub struct ColumnDef {
25+
/// Column name
26+
pub name: String,
27+
/// DuckLake type string (e.g., "varchar", "int64", "decimal(10,2)")
28+
pub ducklake_type: String,
29+
/// Whether this column allows NULL values
30+
pub is_nullable: bool,
31+
}
32+
33+
impl ColumnDef {
34+
/// Create a new column definition.
35+
pub fn new(
36+
name: impl Into<String>,
37+
ducklake_type: impl Into<String>,
38+
is_nullable: bool,
39+
) -> Self {
40+
Self {
41+
name: name.into(),
42+
ducklake_type: ducklake_type.into(),
43+
is_nullable,
44+
}
45+
}
46+
47+
/// Create a column definition from an Arrow DataType.
48+
///
49+
/// This is a convenience constructor that converts the Arrow type to a DuckLake type string.
50+
pub fn from_arrow(
51+
name: impl Into<String>,
52+
data_type: &DataType,
53+
is_nullable: bool,
54+
) -> Result<Self> {
55+
let ducklake_type = arrow_to_ducklake_type(data_type)?;
56+
Ok(Self::new(name, ducklake_type, is_nullable))
57+
}
58+
}
59+
60+
/// Information about a data file to register in the catalog.
61+
///
62+
/// This struct contains the metadata needed to register a Parquet file in the DuckLake catalog.
63+
#[derive(Debug, Clone)]
64+
pub struct DataFileInfo {
65+
/// Path to the file (relative to table path or absolute)
66+
pub path: String,
67+
/// Whether the path is relative to the table's path
68+
pub path_is_relative: bool,
69+
/// Size of the file in bytes
70+
pub file_size_bytes: i64,
71+
/// Size of the Parquet footer in bytes (optimization hint for reads)
72+
pub footer_size: Option<i64>,
73+
/// Number of records in the file
74+
pub record_count: i64,
75+
}
76+
77+
impl DataFileInfo {
78+
/// Create a new data file info with relative path.
79+
pub fn new(path: impl Into<String>, file_size_bytes: i64, record_count: i64) -> Self {
80+
Self {
81+
path: path.into(),
82+
path_is_relative: true,
83+
file_size_bytes,
84+
footer_size: None,
85+
record_count,
86+
}
87+
}
88+
89+
/// Set the footer size for read optimization.
90+
pub fn with_footer_size(mut self, footer_size: i64) -> Self {
91+
self.footer_size = Some(footer_size);
92+
self
93+
}
94+
95+
/// Mark this file as having an absolute path.
96+
pub fn with_absolute_path(mut self) -> Self {
97+
self.path_is_relative = false;
98+
self
99+
}
100+
}
101+
102+
/// Result of a write operation.
103+
#[derive(Debug)]
104+
pub struct WriteResult {
105+
/// Snapshot ID of the write operation
106+
pub snapshot_id: i64,
107+
/// Table ID (may be newly created)
108+
pub table_id: i64,
109+
/// Schema ID (may be newly created)
110+
pub schema_id: i64,
111+
/// Number of files written
112+
pub files_written: usize,
113+
/// Total records written
114+
pub records_written: i64,
115+
}
116+
117+
/// Result of a transactional write setup operation.
118+
#[derive(Debug)]
119+
pub struct WriteSetupResult {
120+
/// Snapshot ID created for this write
121+
pub snapshot_id: i64,
122+
/// Schema ID (may be newly created)
123+
pub schema_id: i64,
124+
/// Table ID (may be newly created)
125+
pub table_id: i64,
126+
/// Column IDs in order
127+
pub column_ids: Vec<i64>,
128+
}
129+
130+
/// Trait for writing metadata to DuckLake catalogs.
131+
///
132+
/// Implementations must be thread-safe (`Send + Sync`).
133+
pub trait MetadataWriter: Send + Sync + std::fmt::Debug {
134+
/// Create a new snapshot and return its ID.
135+
fn create_snapshot(&self) -> Result<i64>;
136+
137+
/// Get or create a schema, returning `(schema_id, was_created)`.
138+
fn get_or_create_schema(
139+
&self,
140+
name: &str,
141+
path: Option<&str>,
142+
snapshot_id: i64,
143+
) -> Result<(i64, bool)>;
144+
145+
/// Get or create a table, returning `(table_id, was_created)`.
146+
fn get_or_create_table(
147+
&self,
148+
schema_id: i64,
149+
name: &str,
150+
path: Option<&str>,
151+
snapshot_id: i64,
152+
) -> Result<(i64, bool)>;
153+
154+
/// Set columns for a table, returning assigned column IDs.
155+
/// Ends existing columns using end_snapshot pattern for time travel.
156+
fn set_columns(
157+
&self,
158+
table_id: i64,
159+
columns: &[ColumnDef],
160+
snapshot_id: i64,
161+
) -> Result<Vec<i64>>;
162+
163+
/// Register a new data file. Returns the assigned data_file_id.
164+
fn register_data_file(
165+
&self,
166+
table_id: i64,
167+
snapshot_id: i64,
168+
file: &DataFileInfo,
169+
) -> Result<i64>;
170+
171+
/// End all existing data files for a table. Returns count of files ended.
172+
fn end_table_files(&self, table_id: i64, snapshot_id: i64) -> Result<u64>;
173+
174+
/// Get the data path from catalog metadata.
175+
fn get_data_path(&self) -> Result<String>;
176+
177+
/// Set the data path in catalog metadata.
178+
fn set_data_path(&self, path: &str) -> Result<()>;
179+
180+
/// Initialize DuckLake schema tables if they don't exist.
181+
fn initialize_schema(&self) -> Result<()>;
182+
183+
/// Atomically set up catalog metadata for a write operation.
184+
/// Creates snapshot, schema, table, columns in a single transaction.
185+
/// If mode is `WriteMode::Replace`, ends existing data files.
186+
fn begin_write_transaction(
187+
&self,
188+
schema_name: &str,
189+
table_name: &str,
190+
columns: &[ColumnDef],
191+
mode: WriteMode,
192+
) -> Result<WriteSetupResult>;
193+
}
194+
195+
#[cfg(test)]
196+
mod tests {
197+
use super::*;
198+
199+
#[test]
200+
fn test_column_def_new() {
201+
let col = ColumnDef::new("test_col", "int32", true);
202+
assert_eq!(col.name, "test_col");
203+
assert_eq!(col.ducklake_type, "int32");
204+
assert!(col.is_nullable);
205+
}
206+
207+
#[test]
208+
fn test_column_def_from_arrow() {
209+
let col = ColumnDef::from_arrow("id", &DataType::Int64, false).unwrap();
210+
assert_eq!(col.name, "id");
211+
assert_eq!(col.ducklake_type, "int64");
212+
assert!(!col.is_nullable);
213+
}
214+
215+
#[test]
216+
fn test_data_file_info_new() {
217+
let file = DataFileInfo::new("test.parquet", 1024, 100);
218+
assert_eq!(file.path, "test.parquet");
219+
assert!(file.path_is_relative);
220+
assert_eq!(file.file_size_bytes, 1024);
221+
assert_eq!(file.record_count, 100);
222+
assert!(file.footer_size.is_none());
223+
}
224+
225+
#[test]
226+
fn test_data_file_info_with_footer_size() {
227+
let file = DataFileInfo::new("test.parquet", 1024, 100).with_footer_size(256);
228+
assert_eq!(file.footer_size, Some(256));
229+
}
230+
231+
#[test]
232+
fn test_data_file_info_with_absolute_path() {
233+
let file = DataFileInfo::new("/absolute/path.parquet", 1024, 100).with_absolute_path();
234+
assert!(!file.path_is_relative);
235+
}
236+
}

0 commit comments

Comments
 (0)