Skip to content

Commit e0077ff

Browse files
ion-elgrecoLiam Brannigan
authored andcommitted
refactor: simplify writer, schema evolution and generated columns
Signed-off-by: Ion Koutsouris <[email protected]> Signed-off-by: Liam Brannigan <[email protected]>
1 parent e5c95b7 commit e0077ff

File tree

10 files changed

+714
-836
lines changed

10 files changed

+714
-836
lines changed

crates/core/src/operations/delete.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ use crate::delta_datafusion::{
5353
use crate::errors::DeltaResult;
5454
use crate::kernel::{Action, Add, Remove};
5555
use crate::logstore::LogStoreRef;
56-
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
56+
use crate::operations::write::execution::{write_execution_plan, write_execution_plan_cdc};
57+
use crate::operations::write::WriterStatsConfig;
5758
use crate::operations::CustomExecuteHandler;
5859
use crate::protocol::DeltaOperation;
5960
use crate::table::state::DeltaTableState;

crates/core/src/operations/merge/mod.rs

Lines changed: 5 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
5252
use datafusion_common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference};
5353
use datafusion_expr::{col, conditional_expressions::CaseBuilder, lit, when, Expr, JoinType};
5454
use datafusion_expr::{
55-
ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode,
56-
UNNAMED_TABLE,
55+
Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, UNNAMED_TABLE,
5756
};
5857

5958
use delta_kernel::schema::{ColumnMetadataKey, StructType};
@@ -78,16 +77,18 @@ use crate::delta_datafusion::{
7877
DeltaSessionConfig, DeltaTableProvider,
7978
};
8079

81-
use crate::kernel::{Action, DataCheck, Metadata, StructTypeExt};
80+
use crate::kernel::{Action, Metadata, StructTypeExt};
8281
use crate::logstore::LogStoreRef;
8382
use crate::operations::cast::merge_schema::{merge_arrow_field, merge_arrow_schema};
8483
use crate::operations::cdc::*;
8584
use crate::operations::merge::barrier::find_node;
8685
use crate::operations::transaction::CommitBuilder;
86+
use crate::operations::write::generated_columns::{
87+
add_generated_columns, add_missing_generated_columns,
88+
};
8789
use crate::operations::write::WriterStatsConfig;
8890
use crate::protocol::{DeltaOperation, MergePredicate};
8991
use crate::table::state::DeltaTableState;
90-
use crate::table::GeneratedColumn;
9192
use crate::{DeltaResult, DeltaTable, DeltaTableError};
9293
use writer::write_execution_plan_v2;
9394

@@ -776,72 +777,6 @@ async fn execute(
776777
None => TableReference::bare(UNNAMED_TABLE),
777778
};
778779

779-
/// Add generated column expressions to a dataframe
780-
fn add_missing_generated_columns(
781-
mut df: DataFrame,
782-
generated_cols: &Vec<GeneratedColumn>,
783-
) -> DeltaResult<(DataFrame, Vec<String>)> {
784-
let mut missing_cols = vec![];
785-
for generated_col in generated_cols {
786-
let col_name = generated_col.get_name();
787-
788-
if df
789-
.clone()
790-
.schema()
791-
.field_with_unqualified_name(col_name)
792-
.is_err()
793-
// implies it doesn't exist
794-
{
795-
debug!(
796-
"Adding missing generated column {} in source as placeholder",
797-
col_name
798-
);
799-
// If column doesn't exist, we add a null column, later we will generate the values after
800-
// all the merge is projected.
801-
// Other generated columns that were provided upon the start we only validate during write
802-
missing_cols.push(col_name.to_string());
803-
df = df
804-
.clone()
805-
.with_column(col_name, Expr::Literal(ScalarValue::Null))?;
806-
}
807-
}
808-
Ok((df, missing_cols))
809-
}
810-
811-
/// Add generated column expressions to a dataframe
812-
fn add_generated_columns(
813-
mut df: DataFrame,
814-
generated_cols: &Vec<GeneratedColumn>,
815-
generated_cols_missing_in_source: &[String],
816-
state: &SessionState,
817-
) -> DeltaResult<DataFrame> {
818-
debug!("Generating columns in dataframe");
819-
for generated_col in generated_cols {
820-
// We only validate columns that were missing from the start. We don't update
821-
// update generated columns that were provided during runtime
822-
if !generated_cols_missing_in_source.contains(&generated_col.name) {
823-
continue;
824-
}
825-
826-
let generation_expr = state.create_logical_expr(
827-
generated_col.get_generation_expression(),
828-
df.clone().schema(),
829-
)?;
830-
let col_name = generated_col.get_name();
831-
832-
df = df.clone().with_column(
833-
generated_col.get_name(),
834-
when(col(col_name).is_null(), generation_expr)
835-
.otherwise(col(col_name))?
836-
.cast_to(
837-
&arrow_schema::DataType::try_from(&generated_col.data_type)?,
838-
df.schema(),
839-
)?,
840-
)?
841-
}
842-
Ok(df)
843-
}
844-
845780
let generated_col_expressions = snapshot
846781
.schema()
847782
.get_generated_columns()

crates/core/src/operations/update.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use super::{
5151
};
5252
use super::{transaction::PROTOCOL, write::WriterStatsConfig};
5353
use super::{
54-
write::{write_execution_plan, write_execution_plan_cdc},
54+
write::execution::{write_execution_plan, write_execution_plan_cdc},
5555
CustomExecuteHandler, Operation,
5656
};
5757
use crate::delta_datafusion::{find_files, planner::DeltaPlanner, register_store};
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/// Configuration for the writer on how to collect stats
2+
#[derive(Clone)]
3+
pub struct WriterStatsConfig {
4+
/// Number of columns to collect stats for, idx based
5+
pub num_indexed_cols: i32,
6+
/// Optional list of columns which to collect stats for, takes precedende over num_index_cols
7+
pub stats_columns: Option<Vec<String>>,
8+
}
9+
10+
impl WriterStatsConfig {
11+
/// Create new writer stats config
12+
pub fn new(num_indexed_cols: i32, stats_columns: Option<Vec<String>>) -> Self {
13+
Self {
14+
num_indexed_cols,
15+
stats_columns,
16+
}
17+
}
18+
}

0 commit comments

Comments
 (0)