Skip to content

Commit 613f7d9

Browse files
committed
get row counts from data files directly
1 parent 71d52ff commit 613f7d9

File tree

2 files changed

+18
-38
lines changed

2 files changed

+18
-38
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl ExecutionPlan for IcebergCommitExec {
187187
// Process the input streams from all partitions and commit the data files
188188
let stream = futures::stream::once(async move {
189189
let mut data_files: Vec<DataFile> = Vec::new();
190-
let mut total_count: u64 = 0;
190+
let mut total_record_count: u64 = 0;
191191

192192
// Execute and collect results from all partitions of the input plan
193193
let batches = execute_stream_partitioned(input_plan, context)?;
@@ -197,21 +197,6 @@ impl ExecutionPlan for IcebergCommitExec {
197197
while let Some(batch_result) = batch_stream.as_mut().next().await {
198198
let batch = batch_result?;
199199

200-
let count_array = batch
201-
.column_by_name("count")
202-
.ok_or_else(|| {
203-
DataFusionError::Internal(
204-
"Expected 'count' column in input batch".to_string(),
205-
)
206-
})?
207-
.as_any()
208-
.downcast_ref::<UInt64Array>()
209-
.ok_or_else(|| {
210-
DataFusionError::Internal(
211-
"Expected 'count' column to be UInt64Array".to_string(),
212-
)
213-
})?;
214-
215200
let files_array = batch
216201
.column_by_name("data_files")
217202
.ok_or_else(|| {
@@ -230,9 +215,6 @@ impl ExecutionPlan for IcebergCommitExec {
230215
// todo remove log
231216
println!("files_array to deserialize: {:?}", files_array);
232217

233-
// Sum all values in the count_array
234-
total_count += count_array.iter().flatten().sum::<u64>();
235-
236218
// Deserialize all data files from the StringArray
237219
let batch_files: Vec<DataFile> = files_array
238220
.into_iter()
@@ -249,6 +231,9 @@ impl ExecutionPlan for IcebergCommitExec {
249231
})
250232
.collect::<datafusion::common::Result<_>>()?;
251233

234+
// add record_counts from the current batch to total record count
235+
total_record_count += batch_files.iter().map(|f| f.record_count()).sum::<u64>();
236+
252237
// Add all deserialized files to our collection
253238
data_files.extend(batch_files);
254239
}
@@ -272,7 +257,7 @@ impl ExecutionPlan for IcebergCommitExec {
272257
// .await
273258
// .map_err(to_datafusion_error)?;
274259

275-
Self::make_count_batch(total_count)
260+
Self::make_count_batch(total_record_count)
276261
})
277262
.boxed();
278263

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter};
2020
use std::str::FromStr;
2121
use std::sync::Arc;
2222

23-
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
23+
use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
2424
use datafusion::arrow::datatypes::{
2525
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
2626
};
@@ -45,7 +45,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
4545
use iceberg::writer::file_writer::location_generator::{
4646
DefaultFileNameGenerator, DefaultLocationGenerator,
4747
};
48-
use iceberg::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
48+
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
4949
use iceberg::{Error, ErrorKind};
5050
use parquet::file::properties::WriterProperties;
5151
use uuid::Uuid;
@@ -83,26 +83,22 @@ impl IcebergWriteExec {
8383
)
8484
}
8585

86-
// Create a record batch with count and serialized data files
87-
fn make_result_batch(count: u64, data_files: Vec<String>) -> DFResult<RecordBatch> {
88-
let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
86+
// Create a record batch with serialized data files
87+
fn make_result_batch(data_files: Vec<String>) -> DFResult<RecordBatch> {
8988
let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef;
9089

91-
RecordBatch::try_from_iter_with_nullable(vec![
92-
("count", count_array, false),
93-
("data_files", files_array, false),
94-
])
95-
.map_err(|e| {
96-
DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string()))
97-
})
90+
RecordBatch::try_from_iter_with_nullable(vec![("data_files", files_array, false)]).map_err(
91+
|e| DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())),
92+
)
9893
}
9994

10095
fn make_result_schema() -> ArrowSchemaRef {
10196
// Define a schema.
102-
Arc::new(ArrowSchema::new(vec![
103-
Field::new("count", DataType::UInt64, false),
104-
Field::new("data_files", DataType::Utf8, false),
105-
]))
97+
Arc::new(ArrowSchema::new(vec![Field::new(
98+
"data_files",
99+
DataType::Utf8,
100+
false,
101+
)]))
106102
}
107103
}
108104

@@ -238,7 +234,6 @@ impl ExecutionPlan for IcebergWriteExec {
238234
writer.write(batch?).await.map_err(to_datafusion_error)?;
239235
}
240236

241-
let count = writer.current_row_num() as u64;
242237
let data_file_builders = writer.close().await.map_err(to_datafusion_error)?;
243238

244239
// Convert builders to data files and then to JSON strings
@@ -255,7 +250,7 @@ impl ExecutionPlan for IcebergWriteExec {
255250
})
256251
.collect::<DFResult<Vec<String>>>()?;
257252

258-
Self::make_result_batch(count, data_files)
253+
Self::make_result_batch(data_files)
259254
})
260255
.boxed();
261256

0 commit comments

Comments
 (0)