Skip to content

Commit 98b1335

Browse files
authored
refactor: remove log_data call sites in find_files (#4026)
# Description Getting rid of more call sites of `log_data` which is blocking squashing our snapshots. While at it, I also did some pruning in the code to better isolate it and make it a little easier to read. Signed-off-by: Robert Pack <robstar.pack@gmail.com>
1 parent 5e645cd commit 98b1335

File tree

2 files changed

+74
-86
lines changed

2 files changed

+74
-86
lines changed

crates/core/src/delta_datafusion/find_files.rs

Lines changed: 72 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,22 @@ use std::fmt::Debug;
33
use std::sync::Arc;
44

55
use arrow_array::{Array, RecordBatch, StringArray};
6-
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema};
6+
use arrow_schema::{DataType, Field, Schema, SchemaRef};
77
use datafusion::catalog::Session;
88
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
99
use datafusion::datasource::MemTable;
10-
use datafusion::execution::context::{SessionContext, TaskContext};
10+
use datafusion::execution::context::SessionContext;
1111
use datafusion::logical_expr::{Expr, Volatility, col};
1212
use datafusion::physical_plan::ExecutionPlan;
1313
use datafusion::physical_plan::filter::FilterExec;
1414
use datafusion::physical_plan::limit::LocalLimitExec;
15+
use futures::TryStreamExt;
1516
use itertools::Itertools;
1617
use percent_encoding::percent_decode_str;
1718
use tracing::*;
1819

1920
use crate::delta_datafusion::{
20-
DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, df_logical_schema, get_path_column,
21+
DataFusionMixins as _, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, get_path_column,
2122
};
2223
use crate::errors::{DeltaResult, DeltaTableError};
2324
use crate::kernel::{Add, EagerSnapshot};
@@ -86,7 +87,11 @@ pub(crate) async fn find_files(
8687
}
8788
None => {
8889
let result = FindFiles {
89-
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
90+
candidates: snapshot
91+
.file_views(&log_store, None)
92+
.map_ok(|f| f.add_action())
93+
.try_collect()
94+
.await?,
9095
partition_scan: true,
9196
};
9297
Span::current().record("partition_scan", result.partition_scan);
@@ -226,32 +231,39 @@ async fn find_files_scan(
226231
session: &dyn Session,
227232
expression: Expr,
228233
) -> DeltaResult<Vec<Add>> {
229-
let candidate_map: HashMap<String, Add> = snapshot
230-
.log_data()
231-
.iter()
232-
.map(|f| f.add_action())
233-
.map(|add| {
234-
let path = add.path.clone();
235-
(path, add)
234+
// let kernel_predicate = to_predicate(&expression).ok().map(Arc::new);
235+
let candidate_map: HashMap<_, _> = snapshot
236+
.file_views(&log_store, None)
237+
.map_ok(|f| {
238+
let add = f.add_action();
239+
(add.path.clone(), add)
236240
})
237-
.collect();
241+
.try_collect()
242+
.await?;
238243

239244
Span::current().record("total_files", candidate_map.len());
240245

241246
let scan_config = DeltaScanConfigBuilder::default()
242247
.with_file_column(true)
243248
.build(snapshot)?;
249+
let file_column_name = scan_config
250+
.file_column_name
251+
.as_ref()
252+
.ok_or(DeltaTableError::Generic(
253+
"File column name must be set in scan config".to_string(),
254+
))?
255+
.clone();
244256

245-
let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?;
257+
let logical_schema = df_logical_schema(snapshot, &file_column_name)?;
246258

247259
// Identify which columns we need to project
248-
let mut used_columns = expression
260+
let mut used_columns: Vec<_> = expression
249261
.column_refs()
250262
.into_iter()
251263
.map(|column| logical_schema.index_of(&column.name))
252-
.collect::<Result<Vec<usize>, ArrowError>>()?;
264+
.try_collect()?;
253265
// Add path column
254-
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);
266+
used_columns.push(logical_schema.index_of(&file_column_name)?);
255267

256268
let scan = DeltaScanBuilder::new(snapshot, log_store, session)
257269
.with_filter(Some(expression.clone()))
@@ -261,25 +273,18 @@ async fn find_files_scan(
261273
.await?;
262274
let scan = Arc::new(scan);
263275

264-
let config = &scan.config;
265276
let input_dfschema = scan.logical_schema.as_ref().to_owned().try_into()?;
266-
267277
let predicate_expr = session
268278
.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;
269279

270280
let filter: Arc<dyn ExecutionPlan> =
271281
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
272282
let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));
273283

274-
let task_ctx = Arc::new(TaskContext::from(session));
275-
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
284+
let path_batches = datafusion::physical_plan::collect(limit, session.task_ctx()).await?;
276285

277-
let result = join_batches_with_add_actions(
278-
path_batches,
279-
candidate_map,
280-
config.file_column_name.as_ref().unwrap(),
281-
true,
282-
)?;
286+
let result =
287+
join_batches_with_add_actions(path_batches, candidate_map, &file_column_name, true)?;
283288

284289
Span::current().record("matching_files", result.len());
285290
Ok(result)
@@ -305,24 +310,20 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR
305310
))?
306311
.to_owned(),
307312
);
308-
fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false));
313+
fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
309314

310315
for field in schema.fields() {
311-
if field.name().starts_with("partition.") {
312-
let name = field.name().strip_prefix("partition.").unwrap();
313-
316+
if let Some(name) = field.name().strip_prefix("partition.") {
314317
arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
315-
fields.push(Field::new(
316-
name,
317-
field.data_type().to_owned(),
318-
field.is_nullable(),
319-
));
318+
fields.push(field.as_ref().clone().with_name(name));
320319
}
321320
}
322321

323-
let schema = Arc::new(ArrowSchema::new(fields));
324-
let batch = RecordBatch::try_new(schema, arrays)?;
325-
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
322+
let schema = Arc::new(Schema::new(fields));
323+
let mem_table = MemTable::try_new(
324+
schema.clone(),
325+
vec![vec![RecordBatch::try_new(schema, arrays)?]],
326+
)?;
326327

327328
let ctx = SessionContext::new();
328329
let mut df = ctx.read_table(Arc::new(mem_table))?;
@@ -333,11 +334,39 @@ async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaR
333334

334335
let map = actions
335336
.into_iter()
336-
.map(|action| {
337-
let path = action.path.clone();
338-
(path, action)
339-
})
340-
.collect::<HashMap<String, Add>>();
337+
.map(|action| (action.path.clone(), action))
338+
.collect::<HashMap<_, _>>();
341339

342340
join_batches_with_add_actions(batches, map, PATH_COLUMN, false)
343341
}
342+
343+
/// The logical schema for a Deltatable is different from the protocol level schema since partition
344+
/// columns must appear at the end of the schema. This is to align with how partition are handled
345+
/// at the physical level
346+
fn df_logical_schema(
347+
snapshot: &EagerSnapshot,
348+
file_column_name: &String,
349+
) -> DeltaResult<SchemaRef> {
350+
let input_schema = snapshot.input_schema();
351+
let table_partition_cols = snapshot.metadata().partition_columns();
352+
353+
let mut fields: Vec<_> = input_schema
354+
.fields()
355+
.iter()
356+
.filter(|f| !table_partition_cols.contains(f.name()))
357+
.cloned()
358+
.collect();
359+
360+
for partition_col in table_partition_cols.iter() {
361+
fields.push(Arc::new(
362+
input_schema
363+
.field_with_name(partition_col)
364+
.unwrap()
365+
.to_owned(),
366+
));
367+
}
368+
369+
fields.push(Arc::new(Field::new(file_column_name, DataType::Utf8, true)));
370+
371+
Ok(Arc::new(Schema::new(fields)))
372+
}

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictio
3131
use arrow_cast::display::array_value_to_string;
3232
use arrow_cast::{CastOptions, cast_with_options};
3333
use arrow_schema::{
34-
DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef,
35-
SchemaRef as ArrowSchemaRef, TimeUnit,
34+
DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
35+
TimeUnit,
3636
};
3737
use datafusion::catalog::{Session, TableProviderFactory};
3838
use datafusion::common::scalar::ScalarValue;
@@ -312,47 +312,6 @@ pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) {
312312
env.register_object_store(url, store.object_store(None));
313313
}
314314

315-
/// The logical schema for a Deltatable is different from the protocol level schema since partition
316-
/// columns must appear at the end of the schema. This is to align with how partition are handled
317-
/// at the physical level
318-
pub(crate) fn df_logical_schema(
319-
snapshot: &EagerSnapshot,
320-
file_column_name: &Option<String>,
321-
schema: Option<ArrowSchemaRef>,
322-
) -> DeltaResult<SchemaRef> {
323-
let input_schema = match schema {
324-
Some(schema) => schema,
325-
None => snapshot.input_schema(),
326-
};
327-
let table_partition_cols = snapshot.metadata().partition_columns();
328-
329-
let mut fields: Vec<Arc<Field>> = input_schema
330-
.fields()
331-
.iter()
332-
.filter(|f| !table_partition_cols.contains(f.name()))
333-
.cloned()
334-
.collect();
335-
336-
for partition_col in table_partition_cols.iter() {
337-
fields.push(Arc::new(
338-
input_schema
339-
.field_with_name(partition_col)
340-
.unwrap()
341-
.to_owned(),
342-
));
343-
}
344-
345-
if let Some(file_column_name) = file_column_name {
346-
fields.push(Arc::new(Field::new(
347-
file_column_name,
348-
ArrowDataType::Utf8,
349-
true,
350-
)));
351-
}
352-
353-
Ok(Arc::new(ArrowSchema::new(fields)))
354-
}
355-
356315
pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
357316
match t {
358317
ArrowDataType::Null => Ok(ScalarValue::Null),

0 commit comments

Comments
 (0)