Skip to content
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ async fn find_files_scan(
.log_data()
.iter()
.map(|f| f.add_action())
.map(|add| (add.path.clone(), add.to_owned()))
.map(|add| {
let path = add.path.clone();
(path, add)
})
.collect();

let scan_config = DeltaScanConfigBuilder::default()
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/iterators/scan_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub(crate) fn parse_partitions(
let arr = match p {
PrimitiveType::String => {
Arc::new(StringArray::from_iter(values.iter().map(|v| match v {
Scalar::String(s) => Some(s.clone()),
Scalar::String(s) => Some(s.as_str()),
Scalar::Null(_) => None,
_ => panic!("unexpected scalar type"),
}))) as ArrayRef
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl EagerSnapshot {
log_store,
None,
current_version,
Box::new(self.files.clone().into_iter()),
Box::new(std::mem::take(&mut self.files).into_iter()),
None,
)
.try_collect()
Expand Down
67 changes: 52 additions & 15 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
//! ## Configuration
//!
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use bytes::Bytes;
Expand Down Expand Up @@ -569,20 +568,27 @@ pub async fn get_actions(
commit_log_bytes: bytes::Bytes,
) -> Result<Vec<Action>, DeltaTableError> {
debug!("parsing commit with version {version}...");
let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for re_line in reader.lines() {
let line = re_line?;
let lstr = line.as_str();
let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
})?;
actions.push(action);
}
Ok(actions)

tokio::task::spawn_blocking(move || {
let mut actions = Vec::new();
for line_bytes in commit_log_bytes.split(|&b| b == b'\n') {
if line_bytes.is_empty() {
continue;
}
let action = serde_json::from_slice(line_bytes).map_err(|e| {
let line = String::from_utf8_lossy(line_bytes).to_string();
DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
}
})?;
actions.push(action);
}
Ok(actions)
})
.await
.map_err(|e| DeltaTableError::Generic(format!("Failed to parse commit log {version}: {e}")))?
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
Expand Down Expand Up @@ -944,6 +950,37 @@ pub(crate) mod tests {
Ok(())
}

#[tokio::test]
async fn test_get_actions_non_blocking() -> DeltaResult<()> {
let valid_commit = r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"commitInfo":{"timestamp":1234567890}}
{"add":{"path":"part-00000.parquet","size":1234,"modificationTime":1234567890,"dataChange":true,"partitionValues":{}}}"#;

let commit_bytes = Bytes::from(valid_commit);
let actions = get_actions(0, commit_bytes).await?;

assert_eq!(actions.len(), 3);
assert!(matches!(actions[0], Action::Protocol(_)));
assert!(matches!(actions[1], Action::CommitInfo(_)));
assert!(matches!(actions[2], Action::Add(_)));

Ok(())
}

#[tokio::test]
async fn test_get_actions_invalid_json_error() {
let invalid_commit = r#"{"invalid_json"#;
let commit_bytes = Bytes::from(invalid_commit);
let result = get_actions(0, commit_bytes).await;

assert!(result.is_err());
if let Err(DeltaTableError::InvalidJsonLog { version, .. }) = result {
assert_eq!(version, 0);
} else {
panic!("Expected InvalidJsonLog error");
}
}

/// Collect list stream
pub(crate) async fn flatten_list_stream(
storage: &object_store::DynObjectStore,
Expand Down
14 changes: 8 additions & 6 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,11 +944,12 @@ async fn build_compaction_plan(
let partition_values = file
.partition_values()
.map(|v| {
v.fields()
.iter()
.zip(v.values().iter())
.map(|(k, v)| (k.name().to_string(), v.clone()))
.collect::<IndexMap<_, _>>()
let fields_count = v.fields().len();
let mut map = IndexMap::with_capacity(fields_count);
for (field, value) in v.fields().iter().zip(v.values().iter()) {
map.insert(field.name().to_string(), value.clone());
}
map
})
.unwrap_or_default();

Expand All @@ -964,7 +965,8 @@ async fn build_compaction_plan(
file.sort_by(|a, b| b.size.cmp(&a.size));
}

let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> =
HashMap::with_capacity(partition_files.len());
for (part, (partition, files)) in partition_files {
let mut merge_bins = vec![MergeBin::new()];

Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,9 @@ impl VacuumBuilder {
.await?;
for version in sorted_versions {
state.update(&self.log_store, Some(version)).await?;
let files: Vec<String> = state
.file_paths_iter()
.map(|path| path.to_string())
.collect();
debug!("keep version:{version}\n, {files:#?}");
keep_files.extend(files);
let files_iter = state.file_paths_iter().map(|path| path.to_string());
debug!("keep version:{version}");
keep_files.extend(files_iter);
}

keep_files
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/operations/write/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,9 @@ pub(crate) async fn write_execution_plan_v2(

let write_start = std::time::Instant::now();
// split batch since we unioned upstream the operation write and cdf plan
let table_provider: Arc<dyn TableProvider> = Arc::new(MemTable::try_new(
batch.schema(),
vec![vec![batch.clone()]],
)?);
let batch_schema = batch.schema();
let table_provider: Arc<dyn TableProvider> =
Arc::new(MemTable::try_new(batch_schema, vec![vec![batch]])?);
let batch_df = session_context.read_table(table_provider).unwrap();

let normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).in_list(
Expand Down
11 changes: 5 additions & 6 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::Scalar;
use indexmap::IndexMap;
Expand Down Expand Up @@ -122,8 +121,8 @@ impl DataArrowWriter {
self.partition_values = partition_values;
}

// Copy current buffered bytes so we can recover from failures
let buffer_bytes = self.buffer.to_vec();
// Save buffer length for rollback without copying data
let buffer_len = self.buffer.snapshot_len();

let record_batch = record_batch_without_partitions(&record_batch, partition_columns)?;
let result = self.arrow_writer.write(&record_batch);
Expand All @@ -135,8 +134,8 @@ impl DataArrowWriter {
}
// If a write fails we need to reset the state of the DeltaArrowWriter
Err(e) => {
let new_buffer = ShareableBuffer::from_bytes(buffer_bytes.as_slice());
let _ = std::mem::replace(&mut self.buffer, new_buffer.clone());
self.buffer.truncate(buffer_len);
let new_buffer = self.buffer.clone();
let arrow_writer = Self::new_underlying_writer(
new_buffer,
self.arrow_schema.clone(),
Expand Down Expand Up @@ -383,7 +382,7 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
let uuid = Uuid::new_v4();

let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let obj_bytes = writer.buffer.to_bytes();
let file_size = obj_bytes.len() as i64;
self.table
.object_store()
Expand Down
37 changes: 19 additions & 18 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use std::{collections::HashMap, sync::Arc};
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, UInt32Array};
use arrow_ord::partition::partition;
use arrow_row::{RowConverter, SortField};
use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_select::take::take;
use bytes::Bytes;
use delta_kernel::engine::arrow_conversion::{TryIntoArrow, TryIntoKernel};
use delta_kernel::expressions::Scalar;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
Expand Down Expand Up @@ -267,7 +266,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
let prefix = Path::parse(writer.partition_values.hive_partition_path())?;
let uuid = Uuid::new_v4();
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let obj_bytes = writer.buffer.to_bytes();
let file_size = obj_bytes.len() as i64;
self.storage
.put_with_retries(&path, obj_bytes.into(), 15)
Expand Down Expand Up @@ -400,8 +399,8 @@ impl PartitionWriter {
None
};

// Copy current cursor bytes so we can recover from failures
let buffer_bytes = self.buffer.to_vec();
// Save buffer length so we can rollback on failure
let buffer_len = self.buffer.len();
let record_batch = merged_batch.as_ref().unwrap_or(record_batch);

match self.arrow_writer.write(record_batch) {
Expand All @@ -411,10 +410,9 @@ impl PartitionWriter {
}
// If a write fails we need to reset the state of the PartitionWriter
Err(e) => {
let new_buffer = ShareableBuffer::from_bytes(buffer_bytes.as_slice());
let _ = std::mem::replace(&mut self.buffer, new_buffer.clone());
self.buffer.truncate(buffer_len);
let arrow_writer = ArrowWriter::try_new(
new_buffer,
self.buffer.clone(),
self.arrow_schema.clone(),
Some(self.writer_properties.clone()),
)?;
Expand Down Expand Up @@ -463,12 +461,16 @@ pub(crate) fn divide_by_partition_values(

let partition_ranges = partition(sorted_partition_columns.as_slice())?;

for range in partition_ranges.ranges().into_iter() {
// get row indices for current partition
let idx: UInt32Array = (range.start..range.end)
.map(|i| Some(indices.value(i)))
.collect();
let sorted_data_columns: Vec<ArrayRef> = arrow_schema
.fields()
.iter()
.map(|f| {
let col_idx = schema.index_of(f.name())?;
Ok(take(values.column(col_idx), &indices, None)?)
})
.collect::<Result<Vec<_>, DeltaWriterError>>()?;

for range in partition_ranges.ranges().into_iter() {
let partition_key_iter = sorted_partition_columns
.iter()
.map(|col| {
Expand All @@ -483,12 +485,11 @@ pub(crate) fn divide_by_partition_values(
.into_iter()
.zip(partition_key_iter)
.collect();
let batch_data = arrow_schema
.fields()

let batch_data: Vec<ArrayRef> = sorted_data_columns
.iter()
.map(|f| Ok(values.column(schema.index_of(f.name())?).clone()))
.map(move |col: Result<_, ArrowError>| take(col?.as_ref(), &idx, None))
.collect::<Result<Vec<_>, _>>()?;
.map(|col| col.slice(range.start, range.end - range.start))
.collect();

partitions.push(PartitionResult {
partition_values,
Expand Down
22 changes: 15 additions & 7 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ fn stats_from_metadata(
num_indexed_cols: DataSkippingNumIndexedCols,
stats_columns: &Option<Vec<impl AsRef<str>>>,
) -> Result<Stats, DeltaWriterError> {
let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();
let dialect = sqlparser::dialect::GenericDialect {};

let idx_to_iterate = if let Some(stats_cols) = stats_columns {
Expand Down Expand Up @@ -177,6 +174,14 @@ fn stats_from_metadata(
)));
};

let estimated_capacity = idx_to_iterate.len();
let mut min_values: HashMap<String, ColumnValueStat> =
HashMap::with_capacity(estimated_capacity);
let mut max_values: HashMap<String, ColumnValueStat> =
HashMap::with_capacity(estimated_capacity);
let mut null_count: HashMap<String, ColumnCountStat> =
HashMap::with_capacity(estimated_capacity);

for idx in idx_to_iterate {
let column_descr = schema_descriptor.column(idx);

Expand Down Expand Up @@ -549,19 +554,22 @@ fn apply_min_max_for_column(
match (column_path_parts.len(), column_path_parts.first()) {
// Base case - we are at the leaf struct level in the path
(1, _) => {
let key = column_descr.name().to_string();
let key = column_descr.name();

if let Some(min) = statistics.min {
let min = ColumnValueStat::Value(min.into());
min_values.insert(key.clone(), min);
min_values.insert(key.to_string(), min);
}

if let Some(max) = statistics.max {
let max = ColumnValueStat::Value(max.into());
max_values.insert(key.clone(), max);
max_values.insert(key.to_string(), max);
}

null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64));
null_counts.insert(
key.to_string(),
ColumnCountStat::Value(statistics.null_count as i64),
);

Ok(())
}
Expand Down
Loading
Loading