Skip to content

Commit 3d57629

Browse files
feat: Add SQL write support (INSERT INTO) for DuckLake catalogs (#47)
1 parent 0984176 commit 3d57629

File tree

6 files changed

+1195
-5
lines changed

6 files changed

+1195
-5
lines changed

src/catalog.rs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@ use crate::schema::DuckLakeSchema;
1111
use datafusion::catalog::{CatalogProvider, SchemaProvider};
1212
use datafusion::datasource::object_store::ObjectStoreUrl;
1313

14+
#[cfg(feature = "write")]
15+
use crate::metadata_writer::MetadataWriter;
16+
#[cfg(feature = "write")]
17+
use std::path::PathBuf;
18+
19+
/// Configuration for write operations (when write feature is enabled)
20+
#[cfg(feature = "write")]
21+
#[derive(Debug, Clone)]
22+
struct WriteConfig {
23+
/// Metadata writer for catalog operations
24+
writer: Arc<dyn MetadataWriter>,
25+
/// Base data path for writing files
26+
data_path: PathBuf,
27+
}
28+
1429
/// DuckLake catalog provider
1530
///
1631
/// Connects to a DuckLake catalog database and provides access to schemas and tables.
@@ -26,6 +41,9 @@ pub struct DuckLakeCatalog {
2641
object_store_url: Arc<ObjectStoreUrl>,
2742
/// Catalog base path component for resolving relative schema paths (e.g., /prefix/)
2843
catalog_path: String,
44+
/// Write configuration (when write feature is enabled)
45+
#[cfg(feature = "write")]
46+
write_config: Option<WriteConfig>,
2947
}
3048

3149
impl DuckLakeCatalog {
@@ -44,6 +62,8 @@ impl DuckLakeCatalog {
4462
snapshot_id,
4563
object_store_url: Arc::new(object_store_url),
4664
catalog_path,
65+
#[cfg(feature = "write")]
66+
write_config: None,
4767
})
4868
}
4969

@@ -61,6 +81,52 @@ impl DuckLakeCatalog {
6181
snapshot_id,
6282
object_store_url: Arc::new(object_store_url),
6383
catalog_path,
84+
#[cfg(feature = "write")]
85+
write_config: None,
86+
})
87+
}
88+
89+
/// Create a catalog with write support.
90+
///
91+
/// This constructor enables write operations (INSERT INTO, CREATE TABLE AS)
92+
/// by attaching a metadata writer. The catalog will pass the writer to all
93+
/// schemas and tables it creates.
94+
///
95+
/// # Arguments
96+
/// * `provider` - Metadata provider for reading catalog metadata
97+
/// * `writer` - Metadata writer for write operations
98+
///
99+
/// # Example
100+
/// ```no_run
101+
/// # async fn example() -> datafusion_ducklake::Result<()> {
102+
/// use datafusion_ducklake::{DuckLakeCatalog, SqliteMetadataProvider, SqliteMetadataWriter};
103+
/// use std::sync::Arc;
104+
///
105+
/// let provider = SqliteMetadataProvider::new("sqlite:catalog.db?mode=rwc").await?;
106+
/// let writer = SqliteMetadataWriter::new("sqlite:catalog.db?mode=rwc").await?;
107+
///
108+
/// let catalog = DuckLakeCatalog::with_writer(Arc::new(provider), Arc::new(writer))?;
109+
/// # Ok(())
110+
/// # }
111+
/// ```
112+
#[cfg(feature = "write")]
113+
pub fn with_writer(
114+
provider: Arc<dyn MetadataProvider>,
115+
writer: Arc<dyn MetadataWriter>,
116+
) -> Result<Self> {
117+
let snapshot_id = provider.get_current_snapshot()?;
118+
let data_path_str = provider.get_data_path()?;
119+
let (object_store_url, catalog_path) = parse_object_store_url(&data_path_str)?;
120+
121+
Ok(Self {
122+
provider,
123+
snapshot_id,
124+
object_store_url: Arc::new(object_store_url),
125+
catalog_path,
126+
write_config: Some(WriteConfig {
127+
writer,
128+
data_path: PathBuf::from(&data_path_str),
129+
}),
64130
})
65131
}
66132

@@ -121,14 +187,24 @@ impl CatalogProvider for DuckLakeCatalog {
121187
resolve_path(&self.catalog_path, &meta.path, meta.path_is_relative);
122188

123189
// Pass the pinned snapshot_id to schema
124-
Some(Arc::new(DuckLakeSchema::new(
190+
let schema = DuckLakeSchema::new(
125191
meta.schema_id,
126192
meta.schema_name,
127193
Arc::clone(&self.provider),
128194
self.snapshot_id, // Propagate pinned snapshot_id
129195
self.object_store_url.clone(),
130196
schema_path,
131-
)) as Arc<dyn SchemaProvider>)
197+
);
198+
199+
// Configure writer if this catalog is writable
200+
#[cfg(feature = "write")]
201+
let schema = if let Some(ref config) = self.write_config {
202+
schema.with_writer(Arc::clone(&config.writer), config.data_path.clone())
203+
} else {
204+
schema
205+
};
206+
207+
Some(Arc::new(schema) as Arc<dyn SchemaProvider>)
132208
},
133209
_ => None,
134210
}

src/insert_exec.rs

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
//! DuckLake INSERT execution plan.
2+
//!
3+
//! Limitations:
4+
//! - Collects all batches into memory before writing (no streaming yet)
5+
//! - Single partition only (partition 0)
6+
7+
use std::any::Any;
8+
use std::fmt::{self, Debug};
9+
use std::path::PathBuf;
10+
use std::sync::Arc;
11+
12+
use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
13+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
14+
use datafusion::error::{DataFusionError, Result as DataFusionResult};
15+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
16+
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
17+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
18+
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
19+
use futures::stream::{self, TryStreamExt};
20+
21+
use crate::metadata_writer::{MetadataWriter, WriteMode};
22+
use crate::table_writer::DuckLakeTableWriter;
23+
24+
/// Schema for the output of insert operations (count of rows inserted)
25+
fn make_insert_count_schema() -> SchemaRef {
26+
Arc::new(Schema::new(vec![Field::new(
27+
"count",
28+
DataType::UInt64,
29+
false,
30+
)]))
31+
}
32+
33+
/// Execution plan that writes input data to a DuckLake table.
34+
pub struct DuckLakeInsertExec {
35+
input: Arc<dyn ExecutionPlan>,
36+
writer: Arc<dyn MetadataWriter>,
37+
schema_name: String,
38+
table_name: String,
39+
arrow_schema: SchemaRef,
40+
write_mode: WriteMode,
41+
data_path: PathBuf,
42+
cache: PlanProperties,
43+
}
44+
45+
impl DuckLakeInsertExec {
46+
/// Create a new DuckLakeInsertExec
47+
pub fn new(
48+
input: Arc<dyn ExecutionPlan>,
49+
writer: Arc<dyn MetadataWriter>,
50+
schema_name: String,
51+
table_name: String,
52+
arrow_schema: SchemaRef,
53+
write_mode: WriteMode,
54+
data_path: PathBuf,
55+
) -> Self {
56+
let cache = Self::compute_properties();
57+
Self {
58+
input,
59+
writer,
60+
schema_name,
61+
table_name,
62+
arrow_schema,
63+
write_mode,
64+
data_path,
65+
cache,
66+
}
67+
}
68+
69+
fn compute_properties() -> PlanProperties {
70+
PlanProperties::new(
71+
EquivalenceProperties::new(make_insert_count_schema()),
72+
Partitioning::UnknownPartitioning(1),
73+
datafusion::physical_plan::execution_plan::EmissionType::Final,
74+
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
75+
)
76+
}
77+
}
78+
79+
impl Debug for DuckLakeInsertExec {
80+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81+
f.debug_struct("DuckLakeInsertExec")
82+
.field("schema_name", &self.schema_name)
83+
.field("table_name", &self.table_name)
84+
.field("write_mode", &self.write_mode)
85+
.field("data_path", &self.data_path)
86+
.finish_non_exhaustive()
87+
}
88+
}
89+
90+
impl DisplayAs for DuckLakeInsertExec {
91+
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
92+
match t {
93+
DisplayFormatType::Default
94+
| DisplayFormatType::Verbose
95+
| DisplayFormatType::TreeRender => {
96+
write!(
97+
f,
98+
"DuckLakeInsertExec: schema={}, table={}, mode={:?}",
99+
self.schema_name, self.table_name, self.write_mode
100+
)
101+
},
102+
}
103+
}
104+
}
105+
106+
impl ExecutionPlan for DuckLakeInsertExec {
107+
fn name(&self) -> &str {
108+
"DuckLakeInsertExec"
109+
}
110+
111+
fn as_any(&self) -> &dyn Any {
112+
self
113+
}
114+
115+
fn properties(&self) -> &PlanProperties {
116+
&self.cache
117+
}
118+
119+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
120+
vec![&self.input]
121+
}
122+
123+
fn with_new_children(
124+
self: Arc<Self>,
125+
children: Vec<Arc<dyn ExecutionPlan>>,
126+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
127+
if children.len() != 1 {
128+
return Err(DataFusionError::Plan(
129+
"DuckLakeInsertExec requires exactly one child".to_string(),
130+
));
131+
}
132+
Ok(Arc::new(Self::new(
133+
Arc::clone(&children[0]),
134+
Arc::clone(&self.writer),
135+
self.schema_name.clone(),
136+
self.table_name.clone(),
137+
Arc::clone(&self.arrow_schema),
138+
self.write_mode,
139+
self.data_path.clone(),
140+
)))
141+
}
142+
143+
fn execute(
144+
&self,
145+
partition: usize,
146+
context: Arc<TaskContext>,
147+
) -> DataFusionResult<SendableRecordBatchStream> {
148+
if partition != 0 {
149+
return Err(DataFusionError::Internal(format!(
150+
"DuckLakeInsertExec only supports partition 0, got {}",
151+
partition
152+
)));
153+
}
154+
155+
let input = Arc::clone(&self.input);
156+
let writer = Arc::clone(&self.writer);
157+
let schema_name = self.schema_name.clone();
158+
let table_name = self.table_name.clone();
159+
let arrow_schema = Arc::clone(&self.arrow_schema);
160+
let write_mode = self.write_mode;
161+
let data_path = self.data_path.clone();
162+
let output_schema = make_insert_count_schema();
163+
164+
let stream = stream::once(async move {
165+
let input_stream = input.execute(0, context)?;
166+
let batches: Vec<RecordBatch> = input_stream.try_collect().await?;
167+
168+
if batches.is_empty() {
169+
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![0u64]));
170+
return Ok(RecordBatch::try_new(output_schema, vec![count_array])?);
171+
}
172+
173+
let table_writer = DuckLakeTableWriter::new(writer)
174+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
175+
176+
let schema_without_metadata =
177+
Schema::new(arrow_schema.fields().iter().cloned().collect::<Vec<_>>());
178+
179+
let mut session = table_writer
180+
.begin_write(
181+
&schema_name,
182+
&table_name,
183+
&schema_without_metadata,
184+
write_mode,
185+
)
186+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
187+
188+
let _ = data_path;
189+
190+
for batch in &batches {
191+
session
192+
.write_batch(batch)
193+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
194+
}
195+
196+
let row_count = session.row_count() as u64;
197+
198+
session
199+
.finish()
200+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
201+
202+
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![row_count]));
203+
Ok(RecordBatch::try_new(output_schema, vec![count_array])?)
204+
});
205+
206+
Ok(Box::pin(RecordBatchStreamAdapter::new(
207+
make_insert_count_schema(),
208+
stream.map_err(|e: DataFusionError| e),
209+
)))
210+
}
211+
}
212+
213+
#[cfg(test)]
214+
mod tests {
215+
use super::*;
216+
217+
#[test]
218+
fn test_insert_count_schema() {
219+
let schema = make_insert_count_schema();
220+
assert_eq!(schema.fields().len(), 1);
221+
assert_eq!(schema.field(0).name(), "count");
222+
assert_eq!(schema.field(0).data_type(), &DataType::UInt64);
223+
}
224+
}

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub mod metadata_provider_sqlite;
6262

6363
// Write support (feature-gated)
6464
#[cfg(feature = "write")]
65+
pub mod insert_exec;
66+
#[cfg(feature = "write")]
6567
pub mod metadata_writer;
6668
#[cfg(feature = "write-sqlite")]
6769
pub mod metadata_writer_sqlite;
@@ -91,6 +93,8 @@ pub use metadata_provider_sqlite::SqliteMetadataProvider;
9193

9294
// Re-export write types (feature-gated)
9395
#[cfg(feature = "write")]
96+
pub use insert_exec::DuckLakeInsertExec;
97+
#[cfg(feature = "write")]
9498
pub use metadata_writer::{
9599
ColumnDef, DataFileInfo, MetadataWriter, WriteMode, WriteResult, WriteSetupResult,
96100
};

0 commit comments

Comments
 (0)