diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index ebfed00f8a..651f8413a7 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -197,7 +197,10 @@ async fn find_files_scan( .log_data() .iter() .map(|f| f.add_action()) - .map(|add| (add.path.clone(), add.to_owned())) + .map(|add| { + let path = add.path.clone(); + (path, add) + }) .collect(); let scan_config = DeltaScanConfigBuilder::default() diff --git a/crates/core/src/kernel/snapshot/iterators/scan_row.rs b/crates/core/src/kernel/snapshot/iterators/scan_row.rs index d897d49aa5..b172d4cd05 100644 --- a/crates/core/src/kernel/snapshot/iterators/scan_row.rs +++ b/crates/core/src/kernel/snapshot/iterators/scan_row.rs @@ -208,7 +208,7 @@ pub(crate) fn parse_partitions( let arr = match p { PrimitiveType::String => { Arc::new(StringArray::from_iter(values.iter().map(|v| match v { - Scalar::String(s) => Some(s.clone()), + Scalar::String(s) => Some(s.as_str()), Scalar::Null(_) => None, _ => panic!("unexpected scalar type"), }))) as ArrayRef diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 7799b368df..0ec6bdf080 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -489,7 +489,7 @@ impl EagerSnapshot { log_store, None, current_version, - Box::new(self.files.clone().into_iter()), + Box::new(std::mem::take(&mut self.files).into_iter()), None, ) .try_collect() diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index d505f0dfaa..b653ee2301 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -48,7 +48,6 @@ //! ## Configuration //! use std::collections::HashMap; -use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; use bytes::Bytes; @@ -569,20 +568,20 @@ pub async fn get_actions( commit_log_bytes: bytes::Bytes, ) -> Result, DeltaTableError> { debug!("parsing commit with version {version}..."); - let reader = BufReader::new(Cursor::new(commit_log_bytes)); - - let mut actions = Vec::new(); - for re_line in reader.lines() { - let line = re_line?; - let lstr = line.as_str(); - let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog { - json_err: e, - line, - version, - })?; - actions.push(action); - } - Ok(actions) + + serde_json::Deserializer::from_slice(&commit_log_bytes) + .into_iter::() + .map(|result| { + result.map_err(|e| { + let line = format!("Error at line {}, column {}", e.line(), e.column()); + DeltaTableError::InvalidJsonLog { + json_err: e, + line, + version, + } + }) + }) + .collect() } // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders @@ -944,6 +943,37 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_get_actions_non_blocking() -> DeltaResult<()> { + let valid_commit = r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"commitInfo":{"timestamp":1234567890}} +{"add":{"path":"part-00000.parquet","size":1234,"modificationTime":1234567890,"dataChange":true,"partitionValues":{}}}"#; + + let commit_bytes = Bytes::from(valid_commit); + let actions = get_actions(0, commit_bytes).await?; + + assert_eq!(actions.len(), 3); + assert!(matches!(actions[0], Action::Protocol(_))); + assert!(matches!(actions[1], Action::CommitInfo(_))); + assert!(matches!(actions[2], Action::Add(_))); + + Ok(()) + } + + #[tokio::test] + async fn test_get_actions_invalid_json_error() { + let invalid_commit = r#"{"invalid_json"#; + let commit_bytes = Bytes::from(invalid_commit); + let result = get_actions(0, commit_bytes).await; + + assert!(result.is_err()); + if let Err(DeltaTableError::InvalidJsonLog { version, .. }) = result { + assert_eq!(version, 0); + } else { + panic!("Expected InvalidJsonLog error"); + } + } + /// Collect list stream pub(crate) async fn flatten_list_stream( storage: &object_store::DynObjectStore, diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 94f6114733..ad20fded56 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -944,11 +944,12 @@ async fn build_compaction_plan( let partition_values = file .partition_values() .map(|v| { - v.fields() - .iter() - .zip(v.values().iter()) - .map(|(k, v)| (k.name().to_string(), v.clone())) - .collect::>() + let fields_count = v.fields().len(); + let mut map = IndexMap::with_capacity(fields_count); + for (field, value) in v.fields().iter().zip(v.values().iter()) { + map.insert(field.name().to_string(), value.clone()); + } + map }) .unwrap_or_default(); @@ -964,7 +965,8 @@ async fn build_compaction_plan( file.sort_by(|a, b| b.size.cmp(&a.size)); } - let mut operations: HashMap, Vec)> = HashMap::new(); + let mut operations: HashMap, Vec)> = + HashMap::with_capacity(partition_files.len()); for (part, (partition, files)) in partition_files { let mut merge_bins = vec![MergeBin::new()]; diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index c977824a1a..544da5a47e 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -261,12 +261,9 @@ impl VacuumBuilder { .await?; for version in sorted_versions { state.update(&self.log_store, Some(version)).await?; - let files: Vec = state - .file_paths_iter() - .map(|path| path.to_string()) - .collect(); - debug!("keep version:{version}\n, {files:#?}"); - keep_files.extend(files); + let files_iter = state.file_paths_iter().map(|path| path.to_string()); + debug!("keep version:{version}"); + keep_files.extend(files_iter); } keep_files diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 0ffb5d2aba..c485242116 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -391,10 +391,9 @@ pub(crate) async fn write_execution_plan_v2( let write_start = std::time::Instant::now(); // split batch since we unioned upstream the operation write and cdf plan - let table_provider: Arc = Arc::new(MemTable::try_new( - batch.schema(), - vec![vec![batch.clone()]], - )?); + let batch_schema = batch.schema(); + let table_provider: Arc = + Arc::new(MemTable::try_new(batch_schema, vec![vec![batch]])?); let batch_df = session_context.read_table(table_provider).unwrap(); let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list( diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index f8677932ca..c0861f536a 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::*; -use bytes::Bytes; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; @@ -122,8 +121,8 @@ impl DataArrowWriter { self.partition_values = partition_values; } - // Copy current buffered bytes so we can recover from failures - let buffer_bytes = self.buffer.to_vec(); + // Save buffer length for rollback without copying data + let buffer_len = self.buffer.snapshot_len(); let record_batch = record_batch_without_partitions(&record_batch, partition_columns)?; let result = self.arrow_writer.write(&record_batch); @@ -135,8 +134,8 @@ impl DataArrowWriter { } // If a write fails we need to reset the state of the DeltaArrowWriter Err(e) => { - let new_buffer = ShareableBuffer::from_bytes(buffer_bytes.as_slice()); - let _ = std::mem::replace(&mut self.buffer, new_buffer.clone()); + self.buffer.truncate(buffer_len); + let new_buffer = self.buffer.clone(); let arrow_writer = Self::new_underlying_writer( new_buffer, self.arrow_schema.clone(), @@ -383,7 +382,7 @@ impl DeltaWriter> for JsonWriter { let uuid = Uuid::new_v4(); let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties); - let obj_bytes = Bytes::from(writer.buffer.to_vec()); + let obj_bytes = writer.buffer.to_bytes(); let file_size = obj_bytes.len() as i64; self.table .object_store() diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index cb82778fc6..3c25194dc8 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -10,9 +10,8 @@ use std::{collections::HashMap, sync::Arc}; use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, UInt32Array}; use arrow_ord::partition::partition; use arrow_row::{RowConverter, SortField}; -use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow_select::take::take; -use bytes::Bytes; use delta_kernel::engine::arrow_conversion::{TryIntoArrow, TryIntoKernel}; use delta_kernel::expressions::Scalar; use delta_kernel::table_properties::DataSkippingNumIndexedCols; @@ -267,7 +266,7 @@ impl DeltaWriter for RecordBatchWriter { let prefix = Path::parse(writer.partition_values.hive_partition_path())?; let uuid = Uuid::new_v4(); let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties); - let obj_bytes = Bytes::from(writer.buffer.to_vec()); + let obj_bytes = writer.buffer.to_bytes(); let file_size = obj_bytes.len() as i64; self.storage .put_with_retries(&path, obj_bytes.into(), 15) @@ -400,8 +399,8 @@ impl PartitionWriter { None }; - // Copy current cursor bytes so we can recover from failures - let buffer_bytes = self.buffer.to_vec(); + // Save buffer length so we can rollback on failure + let buffer_len = self.buffer.len(); let record_batch = merged_batch.as_ref().unwrap_or(record_batch); match self.arrow_writer.write(record_batch) { @@ -411,10 +410,9 @@ impl PartitionWriter { } // If a write fails we need to reset the state of the PartitionWriter Err(e) => { - let new_buffer = ShareableBuffer::from_bytes(buffer_bytes.as_slice()); - let _ = std::mem::replace(&mut self.buffer, new_buffer.clone()); + self.buffer.truncate(buffer_len); let arrow_writer = ArrowWriter::try_new( - new_buffer, + self.buffer.clone(), self.arrow_schema.clone(), Some(self.writer_properties.clone()), )?; @@ -463,12 +461,16 @@ pub(crate) fn divide_by_partition_values( let partition_ranges = partition(sorted_partition_columns.as_slice())?; - for range in partition_ranges.ranges().into_iter() { - // get row indices for current partition - let idx: UInt32Array = (range.start..range.end) - .map(|i| Some(indices.value(i))) - .collect(); + let sorted_data_columns: Vec = arrow_schema + .fields() + .iter() + .map(|f| { + let col_idx = schema.index_of(f.name())?; + Ok(take(values.column(col_idx), &indices, None)?) + }) + .collect::, DeltaWriterError>>()?; + for range in partition_ranges.ranges().into_iter() { let partition_key_iter = sorted_partition_columns .iter() .map(|col| { @@ -483,12 +485,11 @@ pub(crate) fn divide_by_partition_values( .into_iter() .zip(partition_key_iter) .collect(); - let batch_data = arrow_schema - .fields() + + let batch_data: Vec = sorted_data_columns .iter() - .map(|f| Ok(values.column(schema.index_of(f.name())?).clone())) - .map(move |col: Result<_, ArrowError>| take(col?.as_ref(), &idx, None)) - .collect::, _>>()?; + .map(|col| col.slice(range.start, range.end - range.start)) + .collect(); partitions.push(PartitionResult { partition_values, diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 34ffdcdc46..29c85f0ee0 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -131,9 +131,6 @@ fn stats_from_metadata( num_indexed_cols: DataSkippingNumIndexedCols, stats_columns: &Option>>, ) -> Result { - let mut min_values: HashMap = HashMap::new(); - let mut max_values: HashMap = HashMap::new(); - let mut null_count: HashMap = HashMap::new(); let dialect = sqlparser::dialect::GenericDialect {}; let idx_to_iterate = if let Some(stats_cols) = stats_columns { @@ -177,6 +174,14 @@ fn stats_from_metadata( ))); }; + let estimated_capacity = idx_to_iterate.len(); + let mut min_values: HashMap = + HashMap::with_capacity(estimated_capacity); + let mut max_values: HashMap = + HashMap::with_capacity(estimated_capacity); + let mut null_count: HashMap = + HashMap::with_capacity(estimated_capacity); + for idx in idx_to_iterate { let column_descr = schema_descriptor.column(idx); @@ -549,19 +554,22 @@ fn apply_min_max_for_column( match (column_path_parts.len(), column_path_parts.first()) { // Base case - we are at the leaf struct level in the path (1, _) => { - let key = column_descr.name().to_string(); + let key = column_descr.name(); if let Some(min) = statistics.min { let min = ColumnValueStat::Value(min.into()); - min_values.insert(key.clone(), min); + min_values.insert(key.to_string(), min); } if let Some(max) = statistics.max { let max = ColumnValueStat::Value(max.into()); - max_values.insert(key.clone(), max); + max_values.insert(key.to_string(), max); } - null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64)); + null_counts.insert( + key.to_string(), + ColumnCountStat::Value(statistics.null_count as i64), + ); Ok(()) } diff --git a/crates/core/src/writer/utils.rs b/crates/core/src/writer/utils.rs index 2f810db520..01acf4cff7 100644 --- a/crates/core/src/writer/utils.rs +++ b/crates/core/src/writer/utils.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_json::ReaderBuilder; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use bytes::{BufMut, Bytes, BytesMut}; use object_store::path::Path; use parking_lot::RwLock; use parquet::basic::Compression; @@ -105,53 +106,71 @@ pub(crate) fn arrow_schema_without_partitions( /// allows multiple owners to have access to the same underlying buffer. #[derive(Debug, Default, Clone)] pub struct ShareableBuffer { - buffer: Arc>>, + buffer: Arc>, } impl ShareableBuffer { - /// Consumes this instance and returns the underlying buffer. + /// Consumes this instance and returns the underlying buffer as Vec. /// Returns None if there are other references to the instance. pub fn into_inner(self) -> Option> { Arc::try_unwrap(self.buffer) .ok() - .map(|lock| lock.into_inner()) + .map(|lock| lock.into_inner().to_vec()) + } + + /// Returns a zero-copy reference to the buffer as `Bytes`. + pub fn to_bytes(&self) -> Bytes { + let inner = self.buffer.read(); + inner.clone().freeze() } /// Returns a clone of the underlying buffer as a `Vec`. pub fn to_vec(&self) -> Vec { - let inner = self.buffer.read(); - (*inner).to_vec() + self.to_bytes().to_vec() } /// Returns the number of bytes in the underlying buffer. pub fn len(&self) -> usize { let inner = self.buffer.read(); - (*inner).len() + inner.len() } /// Returns true if the underlying buffer is empty. pub fn is_empty(&self) -> bool { let inner = self.buffer.read(); - (*inner).is_empty() + inner.is_empty() } - /// Creates a new instance with buffer initialized from the underylying bytes. + /// Creates a new instance with buffer initialized from the provided bytes. pub fn from_bytes(bytes: &[u8]) -> Self { Self { - buffer: Arc::new(RwLock::new(bytes.to_vec())), + buffer: Arc::new(RwLock::new(BytesMut::from(bytes))), } } + + /// Creates a snapshot of the current buffer for rollback purposes. + /// Returns the current length for efficient restoration. + pub fn snapshot_len(&self) -> usize { + self.len() + } + + /// Truncates the buffer to the specified length. + /// If the buffer is shorter than the specified length, this is a no-op. + pub fn truncate(&self, len: usize) { + let mut inner = self.buffer.write(); + inner.truncate(len); + } } impl Write for ShareableBuffer { fn write(&mut self, buf: &[u8]) -> std::io::Result { let mut inner = self.buffer.write(); - (*inner).write(buf) + inner.put_slice(buf); + Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { - let mut inner = self.buffer.write(); - (*inner).flush() + Ok(()) } }