Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/duckdb/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ impl TableManager {
Ok(())
}

/// Creates the internal staging table without PRIMARY KEY constraints.
/// This is used for overwrite operations where we're loading data into a temporary table
/// before swapping with the main table. The incoming data may have duplicates that would
/// violate PRIMARY KEY constraints.
#[tracing::instrument(level = "debug", skip_all)]
pub fn create_table_without_constraints(
&self,
pool: Arc<DuckDbConnectionPool>,
tx: &Transaction<'_>,
) -> super::Result<()> {
let mut db_conn = pool.connect_sync().context(super::DbConnectionPoolSnafu)?;
let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn)?;

let create_stmt = self.get_table_create_statement(duckdb_conn)?;
tracing::debug!("{create_stmt}");

tx.execute(&create_stmt, [])
.context(super::UnableToCreateDuckDBTableSnafu)?;

Ok(())
}

/// Drops indexes from the table, then drops the table itself.
#[tracing::instrument(level = "debug", skip_all)]
pub fn delete_table(&self, tx: &Transaction<'_>) -> super::Result<()> {
Expand Down
23 changes: 10 additions & 13 deletions core/src/duckdb/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,11 @@ impl DataSink for DuckDBDataSink {
while let Some(batch) = data.next().await {
let batch = batch.map_err(check_and_mark_retriable_error)?;

if let Some(constraints) = self.table_definition.constraints() {
// Skip constraint validation for Overwrite operations since we're replacing all data
// and uniqueness constraints don't apply to the incoming data in isolation.
let batches = if self.overwrite == InsertOp::Overwrite {
vec![batch]
} else if let Some(constraints) = self.table_definition.constraints() {
constraints::validate_batch_with_constraints(
vec![batch.clone()],
constraints,
Expand Down Expand Up @@ -571,7 +575,7 @@ fn insert_overwrite(
.map_err(to_retriable_data_write_error)?;

new_table
.create_table(cloned_pool, &tx)
.create_table_without_constraints(cloned_pool, &tx)
.map_err(to_retriable_data_write_error)?;

let existing_tables = new_table
Expand Down Expand Up @@ -618,22 +622,15 @@ fn insert_overwrite(
));
}

// Note: We skip primary key verification for insert_overwrite because
// the internal staging table is intentionally created without constraints
// to allow loading data with potential duplicates.
if !should_apply_indexes {
// compare indexes and primary keys
let primary_keys_match = new_table
.verify_primary_keys_match(last_table, &tx)
.map_err(to_retriable_data_write_error)?;
// Only verify indexes match, skip primary key verification
let indexes_match = new_table
.verify_indexes_match(last_table, &tx)
.map_err(to_retriable_data_write_error)?;

if !primary_keys_match {
return Err(DataFusionError::Execution(
"Primary keys do not match between the new table and the existing table.\nEnsure primary key configuration is the same as the existing table, or manually migrate the table."
.to_string(),
));
}

if !indexes_match {
return Err(DataFusionError::Execution(
"Indexes do not match between the new table and the existing table.\nEnsure index configuration is the same as the existing table, or manually migrate the table.".to_string(),
Expand Down
22 changes: 19 additions & 3 deletions core/src/sql/arrow_sql_gen/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,33 +1030,45 @@ impl InsertBuilder {
///
/// Returns an error if any `RecordBatch` fails to convert into a valid postgres insert statement.
pub fn build_postgres(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(PostgresQueryBuilder, on_conflict)
self.build(PostgresQueryBuilder, on_conflict, false)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid sqlite insert statement.
pub fn build_sqlite(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(SqliteQueryBuilder, on_conflict)
self.build(SqliteQueryBuilder, on_conflict, false)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid sqlite REPLACE statement.
/// Uses SQLite's REPLACE INTO syntax which deletes existing rows with conflicting keys.
pub fn build_sqlite_replace(self) -> Result<String> {
self.build(SqliteQueryBuilder, None, true)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid `MySQL` insert statement.
pub fn build_mysql(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(MysqlQueryBuilder, on_conflict)
self.build(MysqlQueryBuilder, on_conflict, false)
}

/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid insert statement. Upon
/// error, no further `RecordBatch` is processed.
///
/// If `replace` is true, uses REPLACE INTO syntax (SQLite/MySQL) instead of INSERT INTO.
pub fn build<T: GenericBuilder + 'static>(
&self,
query_builder: T,
on_conflict: Option<OnConflict>,
replace: bool,
) -> Result<String> {
let columns: Vec<Alias> = (self.record_batches[0])
.schema()
Expand All @@ -1070,6 +1082,10 @@ impl InsertBuilder {
.columns(columns)
.to_owned();

if replace {
insert_stmt.replace();
}

for record_batch in &self.record_batches {
self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?;
}
Expand Down
45 changes: 37 additions & 8 deletions core/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::{
common::Constraints,
datasource::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
logical_expr::CreateExternalTable,
logical_expr::{dml::InsertOp, CreateExternalTable},
sql::TableReference,
};
use futures::TryStreamExt;
Expand Down Expand Up @@ -740,15 +740,31 @@ impl Sqlite {
transaction: &Transaction<'_>,
batch: RecordBatch,
on_conflict: Option<&OnConflict>,
insert_op: InsertOp,
) -> rusqlite::Result<()> {
let insert_table_builder = InsertBuilder::new(&self.table, vec![batch]);

let sea_query_on_conflict =
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));

let sql = insert_table_builder
.build_sqlite(sea_query_on_conflict)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?;
// Validate supported insert operations
let sql = match insert_op {
InsertOp::Overwrite => {
// Use REPLACE INTO for overwrite mode
insert_table_builder
.build_sqlite_replace()
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?
}
InsertOp::Append => {
let sea_query_on_conflict =
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));
insert_table_builder
.build_sqlite(sea_query_on_conflict)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?
}
_ => {
return Err(rusqlite::Error::ToSqlConversionFailure(
format!("Unsupported insert operation: {insert_op:?}").into(),
));
}
};

transaction.execute(&sql, [])?;

Expand Down Expand Up @@ -785,6 +801,7 @@ impl Sqlite {
transaction: &Transaction<'_>,
batch: RecordBatch,
on_conflict: Option<&OnConflict>,
insert_op: InsertOp,
) -> rusqlite::Result<()> {
use arrow::array::*;
use arrow::datatypes::DataType;
Expand All @@ -793,6 +810,17 @@ impl Sqlite {
return Ok(());
}

// Validate supported insert operations
let insert_keyword = match insert_op {
InsertOp::Overwrite => "REPLACE INTO",
InsertOp::Append => "INSERT INTO",
_ => {
return Err(rusqlite::Error::ToSqlConversionFailure(
format!("Unsupported insert operation: {insert_op:?}").into(),
));
}
};

// Build the prepared statement SQL
let schema = batch.schema();
let column_names: Vec<String> = schema
Expand All @@ -807,7 +835,8 @@ impl Sqlite {
.collect();

let mut sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
"{} {} ({}) VALUES ({})",
insert_keyword,
self.table.to_quoted_string(),
column_names.join(", "),
placeholders.join(", ")
Expand Down
37 changes: 28 additions & 9 deletions core/src/sqlite/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl DataSink for SqliteDataSink {

let constraints = self.sqlite.constraints().clone();
let mut data = data;
let upsert_options = self
.on_conflict
.as_ref()
.map_or_else(UpsertOptions::default, |conflict| {
conflict.get_upsert_options()
});
let overwrite_for_task = self.overwrite;
let task = tokio::spawn(async move {
let mut num_rows: u64 = 0;
while let Some(data_batch) = data.next().await {
Expand All @@ -166,14 +173,20 @@ impl DataSink for SqliteDataSink {
DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}"))
})?;

constraints::validate_batch_with_constraints(
vec![data_batch.clone()],
&constraints,
&crate::util::constraints::UpsertOptions::default(),
)
.await
.context(super::ConstraintViolationSnafu)
.map_err(to_datafusion_error)?;
// Skip constraint validation for Overwrite operations since we're replacing all data
// and uniqueness constraints don't apply to the incoming data in isolation.
let batches = if overwrite_for_task == InsertOp::Overwrite {
vec![data_batch]
} else {
constraints::validate_batch_with_constraints(
vec![data_batch],
&constraints,
&upsert_options,
)
.await
.context(super::ConstraintViolationSnafu)
.map_err(to_datafusion_error)?
};

batch_tx.send(data_batch).await.map_err(|err| {
DataFusionError::Execution(format!("Error sending data batch: {err}"))
Expand Down Expand Up @@ -213,10 +226,16 @@ impl DataSink for SqliteDataSink {
&transaction,
data_batch,
on_conflict.as_ref(),
overwrite,
)?;
} else {
#[allow(deprecated)]
sqlite.insert_batch(&transaction, data_batch, on_conflict.as_ref())?;
sqlite.insert_batch(
&transaction,
data_batch,
on_conflict.as_ref(),
overwrite,
)?;
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/util/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ impl TryFrom<&str> for UpsertOptions {
}
}

/// Filters out `Unique` constraints, keeping only `PrimaryKey` constraints.
/// This is useful for overwrite operations where we want to validate primary key
/// uniqueness in the incoming data, but don't need to enforce additional unique constraints
/// since we're replacing all data anyway.
#[must_use]
pub fn filter_unique_constraints(constraints: &Constraints) -> Constraints {
let filtered: Vec<Constraint> = constraints
.iter()
.filter(|c| matches!(c, Constraint::PrimaryKey(_)))
.cloned()
.collect();
Constraints::new_unverified(filtered)
}

/// The goal for this function is to determine if all of the data described in `batches` conforms to the constraints described in `constraints`.
///
/// It does this by creating a memory table from the record batches and then running a query against the table to validate the constraints.
Expand Down
Loading