diff --git a/core/src/duckdb/creator.rs b/core/src/duckdb/creator.rs index 6eb7b626..8c4c699e 100644 --- a/core/src/duckdb/creator.rs +++ b/core/src/duckdb/creator.rs @@ -290,6 +290,28 @@ impl TableManager { Ok(()) } + /// Creates the internal staging table without PRIMARY KEY constraints. + /// This is used for overwrite operations where we're loading data into a temporary table + /// before swapping with the main table. The incoming data may have duplicates that would + /// violate PRIMARY KEY constraints. + #[tracing::instrument(level = "debug", skip_all)] + pub fn create_table_without_constraints( + &self, + pool: Arc, + tx: &Transaction<'_>, + ) -> super::Result<()> { + let mut db_conn = pool.connect_sync().context(super::DbConnectionPoolSnafu)?; + let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn)?; + + let create_stmt = self.get_table_create_statement(duckdb_conn)?; + tracing::debug!("{create_stmt}"); + + tx.execute(&create_stmt, []) + .context(super::UnableToCreateDuckDBTableSnafu)?; + + Ok(()) + } + /// Drops indexes from the table, then drops the table itself. #[tracing::instrument(level = "debug", skip_all)] pub fn delete_table(&self, tx: &Transaction<'_>) -> super::Result<()> { diff --git a/core/src/duckdb/write.rs b/core/src/duckdb/write.rs index 5e5f6dcc..60771981 100644 --- a/core/src/duckdb/write.rs +++ b/core/src/duckdb/write.rs @@ -325,7 +325,11 @@ impl DataSink for DuckDBDataSink { while let Some(batch) = data.next().await { let batch = batch.map_err(check_and_mark_retriable_error)?; - if let Some(constraints) = self.table_definition.constraints() { + // Skip constraint validation for Overwrite operations since we're replacing all data + // and uniqueness constraints don't apply to the incoming data in isolation. + let batches = if self.overwrite == InsertOp::Overwrite { + vec![batch] + } else if let Some(constraints) = self.table_definition.constraints() { constraints::validate_batch_with_constraints( vec![batch.clone()], constraints, @@ -571,7 +575,7 @@ fn insert_overwrite( .map_err(to_retriable_data_write_error)?; new_table - .create_table(cloned_pool, &tx) + .create_table_without_constraints(cloned_pool, &tx) .map_err(to_retriable_data_write_error)?; let existing_tables = new_table @@ -618,22 +622,15 @@ fn insert_overwrite( )); } + // Note: We skip primary key verification for insert_overwrite because + // the internal staging table is intentionally created without constraints + // to allow loading data with potential duplicates. if !should_apply_indexes { - // compare indexes and primary keys - let primary_keys_match = new_table - .verify_primary_keys_match(last_table, &tx) - .map_err(to_retriable_data_write_error)?; + // Only verify indexes match, skip primary key verification let indexes_match = new_table .verify_indexes_match(last_table, &tx) .map_err(to_retriable_data_write_error)?; - if !primary_keys_match { - return Err(DataFusionError::Execution( - "Primary keys do not match between the new table and the existing table.\nEnsure primary key configuration is the same as the existing table, or manually migrate the table." - .to_string(), - )); - } - if !indexes_match { return Err(DataFusionError::Execution( "Indexes do not match between the new table and the existing table.\nEnsure index configuration is the same as the existing table, or manually migrate the table.".to_string(), diff --git a/core/src/sql/arrow_sql_gen/statement.rs b/core/src/sql/arrow_sql_gen/statement.rs index 6e3fe2f0..7a8486fe 100644 --- a/core/src/sql/arrow_sql_gen/statement.rs +++ b/core/src/sql/arrow_sql_gen/statement.rs @@ -1030,7 +1030,7 @@ impl InsertBuilder { /// /// Returns an error if any `RecordBatch` fails to convert into a valid postgres insert statement. pub fn build_postgres(self, on_conflict: Option) -> Result { - self.build(PostgresQueryBuilder, on_conflict) + self.build(PostgresQueryBuilder, on_conflict, false) } /// @@ -1038,7 +1038,16 @@ impl InsertBuilder { /// /// Returns an error if any `RecordBatch` fails to convert into a valid sqlite insert statement. pub fn build_sqlite(self, on_conflict: Option) -> Result { - self.build(SqliteQueryBuilder, on_conflict) + self.build(SqliteQueryBuilder, on_conflict, false) + } + + /// + /// # Errors + /// + /// Returns an error if any `RecordBatch` fails to convert into a valid sqlite REPLACE statement. + /// Uses SQLite's REPLACE INTO syntax which deletes existing rows with conflicting keys. + pub fn build_sqlite_replace(self) -> Result { + self.build(SqliteQueryBuilder, None, true) } /// @@ -1046,17 +1055,20 @@ impl InsertBuilder { /// /// Returns an error if any `RecordBatch` fails to convert into a valid `MySQL` insert statement. pub fn build_mysql(self, on_conflict: Option) -> Result { - self.build(MysqlQueryBuilder, on_conflict) + self.build(MysqlQueryBuilder, on_conflict, false) } /// # Errors /// /// Returns an error if any `RecordBatch` fails to convert into a valid insert statement. Upon /// error, no further `RecordBatch` is processed. + /// + /// If `replace` is true, uses REPLACE INTO syntax (SQLite/MySQL) instead of INSERT INTO. pub fn build( &self, query_builder: T, on_conflict: Option, + replace: bool, ) -> Result { let columns: Vec = (self.record_batches[0]) .schema() @@ -1070,6 +1082,10 @@ impl InsertBuilder { .columns(columns) .to_owned(); + if replace { + insert_stmt.replace(); + } + for record_batch in &self.record_batches { self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?; } diff --git a/core/src/sqlite.rs b/core/src/sqlite.rs index 6656cbe5..9b60f7f2 100644 --- a/core/src/sqlite.rs +++ b/core/src/sqlite.rs @@ -21,7 +21,7 @@ use datafusion::{ common::Constraints, datasource::TableProvider, error::{DataFusionError, Result as DataFusionResult}, - logical_expr::CreateExternalTable, + logical_expr::{dml::InsertOp, CreateExternalTable}, sql::TableReference, }; use futures::TryStreamExt; @@ -740,15 +740,31 @@ impl Sqlite { transaction: &Transaction<'_>, batch: RecordBatch, on_conflict: Option<&OnConflict>, + insert_op: InsertOp, ) -> rusqlite::Result<()> { let insert_table_builder = InsertBuilder::new(&self.table, vec![batch]); - let sea_query_on_conflict = - on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema)); - - let sql = insert_table_builder - .build_sqlite(sea_query_on_conflict) - .map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?; + // Validate supported insert operations + let sql = match insert_op { + InsertOp::Overwrite => { + // Use REPLACE INTO for overwrite mode + insert_table_builder + .build_sqlite_replace() + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))? + } + InsertOp::Append => { + let sea_query_on_conflict = + on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema)); + insert_table_builder + .build_sqlite(sea_query_on_conflict) + .map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))? + } + _ => { + return Err(rusqlite::Error::ToSqlConversionFailure( + format!("Unsupported insert operation: {insert_op:?}").into(), + )); + } + }; transaction.execute(&sql, [])?; @@ -785,6 +801,7 @@ impl Sqlite { transaction: &Transaction<'_>, batch: RecordBatch, on_conflict: Option<&OnConflict>, + insert_op: InsertOp, ) -> rusqlite::Result<()> { use arrow::array::*; use arrow::datatypes::DataType; @@ -793,6 +810,17 @@ impl Sqlite { return Ok(()); } + // Validate supported insert operations + let insert_keyword = match insert_op { + InsertOp::Overwrite => "REPLACE INTO", + InsertOp::Append => "INSERT INTO", + _ => { + return Err(rusqlite::Error::ToSqlConversionFailure( + format!("Unsupported insert operation: {insert_op:?}").into(), + )); + } + }; + // Build the prepared statement SQL let schema = batch.schema(); let column_names: Vec = schema @@ -807,7 +835,8 @@ impl Sqlite { .collect(); let mut sql = format!( - "INSERT INTO {} ({}) VALUES ({})", + "{} {} ({}) VALUES ({})", + insert_keyword, self.table.to_quoted_string(), column_names.join(", "), placeholders.join(", ") diff --git a/core/src/sqlite/write.rs b/core/src/sqlite/write.rs index e0190837..63760ab1 100644 --- a/core/src/sqlite/write.rs +++ b/core/src/sqlite/write.rs @@ -158,6 +158,13 @@ impl DataSink for SqliteDataSink { let constraints = self.sqlite.constraints().clone(); let mut data = data; + let upsert_options = self + .on_conflict + .as_ref() + .map_or_else(UpsertOptions::default, |conflict| { + conflict.get_upsert_options() + }); + let overwrite_for_task = self.overwrite; let task = tokio::spawn(async move { let mut num_rows: u64 = 0; while let Some(data_batch) = data.next().await { @@ -166,14 +173,20 @@ impl DataSink for SqliteDataSink { DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) })?; - constraints::validate_batch_with_constraints( - vec![data_batch.clone()], - &constraints, - &crate::util::constraints::UpsertOptions::default(), - ) - .await - .context(super::ConstraintViolationSnafu) - .map_err(to_datafusion_error)?; + // Skip constraint validation for Overwrite operations since we're replacing all data + // and uniqueness constraints don't apply to the incoming data in isolation. + let batches = if overwrite_for_task == InsertOp::Overwrite { + vec![data_batch] + } else { + constraints::validate_batch_with_constraints( + vec![data_batch], + &constraints, + &upsert_options, + ) + .await + .context(super::ConstraintViolationSnafu) + .map_err(to_datafusion_error)? + }; batch_tx.send(data_batch).await.map_err(|err| { DataFusionError::Execution(format!("Error sending data batch: {err}")) @@ -213,10 +226,16 @@ impl DataSink for SqliteDataSink { &transaction, data_batch, on_conflict.as_ref(), + overwrite, )?; } else { #[allow(deprecated)] - sqlite.insert_batch(&transaction, data_batch, on_conflict.as_ref())?; + sqlite.insert_batch( + &transaction, + data_batch, + on_conflict.as_ref(), + overwrite, + )?; } } } diff --git a/core/src/util/constraints.rs b/core/src/util/constraints.rs index 68b4fcce..a60b1851 100644 --- a/core/src/util/constraints.rs +++ b/core/src/util/constraints.rs @@ -102,6 +102,20 @@ impl TryFrom<&str> for UpsertOptions { } } +/// Filters out `Unique` constraints, keeping only `PrimaryKey` constraints. +/// This is useful for overwrite operations where we want to validate primary key +/// uniqueness in the incoming data, but don't need to enforce additional unique constraints +/// since we're replacing all data anyway. +#[must_use] +pub fn filter_unique_constraints(constraints: &Constraints) -> Constraints { + let filtered: Vec = constraints + .iter() + .filter(|c| matches!(c, Constraint::PrimaryKey(_))) + .cloned() + .collect(); + Constraints::new_unverified(filtered) +} + /// The goal for this function is to determine if all of the data described in `batches` conforms to the constraints described in `constraints`. /// /// It does this by creating a memory table from the record batches and then running a query against the table to validate the constraints.