Skip to content

Commit ad91db7

Browse files
roeaprtyler
authored andcommitted
feat: consolidate datafusion session setup
Signed-off-by: Robert Pack <[email protected]>
1 parent 09e2a9b commit ad91db7

File tree

15 files changed

+281
-283
lines changed

15 files changed

+281
-283
lines changed

crates/core/src/delta_datafusion/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion::sql::sqlparser::parser::Parser;
4949
use datafusion::sql::sqlparser::tokenizer::Tokenizer;
5050
use tracing::log::*;
5151

52-
use super::DeltaParserOptions;
52+
use crate::delta_datafusion::session::DeltaParserOptions;
5353
use crate::{DeltaResult, DeltaTableError};
5454

5555
/// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item

crates/core/src/delta_datafusion/mod.rs

Lines changed: 10 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
use std::fmt::Debug;
2727
use std::sync::Arc;
2828

29-
use arrow_array::types::UInt16Type;
30-
use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
29+
use arrow::array::types::UInt16Type;
30+
use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
3131
use arrow_cast::display::array_value_to_string;
3232
use arrow_cast::{cast_with_options, CastOptions};
3333
use arrow_schema::{
@@ -41,7 +41,7 @@ use datafusion::common::{
4141
};
4242
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
4343
use datafusion::datasource::{MemTable, TableProvider};
44-
use datafusion::execution::context::{SessionConfig, SessionContext};
44+
use datafusion::execution::context::SessionContext;
4545
use datafusion::execution::runtime_env::RuntimeEnv;
4646
use datafusion::execution::FunctionRegistry;
4747
use datafusion::logical_expr::logical_plan::CreateExternalTable;
@@ -50,7 +50,6 @@ use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
5050
use datafusion::physical_optimizer::pruning::PruningPredicate;
5151
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
5252
use datafusion::physical_plan::{ExecutionPlan, Statistics};
53-
use datafusion::sql::planner::ParserOptions;
5453
use datafusion_proto::logical_plan::LogicalExtensionCodec;
5554
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
5655
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
@@ -71,6 +70,7 @@ use crate::table::state::DeltaTableState;
7170
use crate::table::{Constraint, GeneratedColumn};
7271
use crate::{open_table, open_table_with_storage_options, DeltaTable};
7372

73+
pub use self::session::*;
7474
pub(crate) use find_files::*;
7575

7676
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
@@ -83,6 +83,7 @@ pub mod logical;
8383
pub mod physical;
8484
pub mod planner;
8585
mod schema_adapter;
86+
mod session;
8687
mod table_provider;
8788

8889
pub use cdf::scan::DeltaCdfTableProvider;
@@ -125,7 +126,7 @@ pub trait DataFusionMixins {
125126
fn parse_predicate_expression(
126127
&self,
127128
expr: impl AsRef<str>,
128-
session: &impl Session,
129+
session: &dyn Session,
129130
) -> DeltaResult<Expr>;
130131
}
131132

@@ -149,7 +150,7 @@ impl DataFusionMixins for Snapshot {
149150
fn parse_predicate_expression(
150151
&self,
151152
expr: impl AsRef<str>,
152-
session: &impl Session,
153+
session: &dyn Session,
153154
) -> DeltaResult<Expr> {
154155
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
155156
parse_predicate_expression(&schema, expr, session)
@@ -188,7 +189,7 @@ impl DataFusionMixins for LogDataHandler<'_> {
188189
fn parse_predicate_expression(
189190
&self,
190191
expr: impl AsRef<str>,
191-
session: &impl Session,
192+
session: &dyn Session,
192193
) -> DeltaResult<Expr> {
193194
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
194195
parse_predicate_expression(&schema, expr, session)
@@ -207,7 +208,7 @@ impl DataFusionMixins for EagerSnapshot {
207208
fn parse_predicate_expression(
208209
&self,
209210
expr: impl AsRef<str>,
210-
session: &impl Session,
211+
session: &dyn Session,
211212
) -> DeltaResult<Expr> {
212213
self.snapshot().parse_predicate_expression(expr, session)
213214
}
@@ -792,67 +793,6 @@ impl TableProviderFactory for DeltaTableFactory {
792793
}
793794
}
794795

795-
/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
796-
pub struct DeltaParserOptions {
797-
inner: ParserOptions,
798-
}
799-
800-
impl Default for DeltaParserOptions {
801-
fn default() -> Self {
802-
DeltaParserOptions {
803-
inner: ParserOptions {
804-
enable_ident_normalization: false,
805-
..ParserOptions::default()
806-
},
807-
}
808-
}
809-
}
810-
811-
impl From<DeltaParserOptions> for ParserOptions {
812-
fn from(value: DeltaParserOptions) -> Self {
813-
value.inner
814-
}
815-
}
816-
817-
/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
818-
pub struct DeltaSessionConfig {
819-
inner: SessionConfig,
820-
}
821-
822-
impl Default for DeltaSessionConfig {
823-
fn default() -> Self {
824-
DeltaSessionConfig {
825-
inner: SessionConfig::default()
826-
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
827-
}
828-
}
829-
}
830-
831-
impl From<DeltaSessionConfig> for SessionConfig {
832-
fn from(value: DeltaSessionConfig) -> Self {
833-
value.inner
834-
}
835-
}
836-
837-
/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
838-
pub struct DeltaSessionContext {
839-
inner: SessionContext,
840-
}
841-
842-
impl Default for DeltaSessionContext {
843-
fn default() -> Self {
844-
DeltaSessionContext {
845-
inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()),
846-
}
847-
}
848-
}
849-
850-
impl From<DeltaSessionContext> for SessionContext {
851-
fn from(value: DeltaSessionContext) -> Self {
852-
value.inner
853-
}
854-
}
855-
856796
/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
857797
pub struct DeltaColumn {
858798
inner: Column,
@@ -914,7 +854,7 @@ mod tests {
914854
use datafusion::logical_expr::lit;
915855
use datafusion::physical_plan::empty::EmptyExec;
916856
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
917-
use datafusion::prelude::col;
857+
use datafusion::prelude::{col, SessionConfig};
918858
use datafusion_proto::physical_plan::AsExecutionPlan;
919859
use datafusion_proto::protobuf;
920860
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use datafusion::{
2+
catalog::Session,
3+
common::{exec_datafusion_err, Result as DataFusionResult},
4+
execution::{SessionState, SessionStateBuilder},
5+
prelude::{SessionConfig, SessionContext},
6+
sql::planner::ParserOptions,
7+
};
8+
9+
use crate::delta_datafusion::planner::DeltaPlanner;
10+
11+
pub fn create_session() -> DeltaSessionContext {
12+
DeltaSessionContext::default()
13+
}
14+
15+
// Given a `Session` reference, get the concrete `SessionState` reference
16+
// Note: this may stop working in future versions,
17+
#[deprecated(
18+
since = "0.29.1",
19+
note = "Stop gap to get rid of all explicit session state references"
20+
)]
21+
pub(crate) fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
22+
session
23+
.as_any()
24+
.downcast_ref::<SessionState>()
25+
.ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
26+
}
27+
28+
/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
29+
pub struct DeltaParserOptions {
30+
inner: ParserOptions,
31+
}
32+
33+
impl Default for DeltaParserOptions {
34+
fn default() -> Self {
35+
DeltaParserOptions {
36+
inner: ParserOptions {
37+
enable_ident_normalization: false,
38+
..ParserOptions::default()
39+
},
40+
}
41+
}
42+
}
43+
44+
impl From<DeltaParserOptions> for ParserOptions {
45+
fn from(value: DeltaParserOptions) -> Self {
46+
value.inner
47+
}
48+
}
49+
50+
/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
51+
pub struct DeltaSessionConfig {
52+
inner: SessionConfig,
53+
}
54+
55+
impl Default for DeltaSessionConfig {
56+
fn default() -> Self {
57+
DeltaSessionConfig {
58+
inner: SessionConfig::default()
59+
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
60+
}
61+
}
62+
}
63+
64+
impl From<DeltaSessionConfig> for SessionConfig {
65+
fn from(value: DeltaSessionConfig) -> Self {
66+
value.inner
67+
}
68+
}
69+
70+
/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
71+
pub struct DeltaSessionContext {
72+
inner: SessionContext,
73+
}
74+
75+
impl DeltaSessionContext {
76+
pub fn new() -> Self {
77+
let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into());
78+
let planner = DeltaPlanner::new();
79+
let state = SessionStateBuilder::new_from_existing(ctx.state())
80+
.with_query_planner(planner)
81+
.build();
82+
let inner = SessionContext::new_with_state(state);
83+
Self { inner }
84+
}
85+
86+
pub fn into_inner(self) -> SessionContext {
87+
self.inner
88+
}
89+
}
90+
91+
impl Default for DeltaSessionContext {
92+
fn default() -> Self {
93+
Self::new()
94+
}
95+
}
96+
97+
impl From<DeltaSessionContext> for SessionContext {
98+
fn from(value: DeltaSessionContext) -> Self {
99+
value.inner
100+
}
101+
}

crates/core/src/logstore/factories.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ fn default_parse_url_opts(
6666
/// Access global registry of object store factories
6767
pub fn object_store_factories() -> ObjectStoreFactoryRegistry {
6868
static REGISTRY: OnceLock<ObjectStoreFactoryRegistry> = OnceLock::new();
69-
let factory = Arc::new(DefaultObjectStoreFactory::default());
7069
REGISTRY
7170
.get_or_init(|| {
71+
let factory = Arc::new(DefaultObjectStoreFactory::default());
7272
let registry = ObjectStoreFactoryRegistry::default();
7373
registry.insert(Url::parse("memory://").unwrap(), factory.clone());
7474
registry.insert(Url::parse("file://").unwrap(), factory);

crates/core/src/operations/constraints.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,16 @@ use std::sync::Arc;
44

55
use datafusion::catalog::Session;
66
use datafusion::common::ToDFSchema;
7-
use datafusion::execution::{SendableRecordBatchStream, SessionState};
7+
use datafusion::execution::SendableRecordBatchStream;
88
use datafusion::physical_plan::ExecutionPlan;
9-
use datafusion::prelude::SessionContext;
109
use delta_kernel::table_features::WriterFeature;
1110
use futures::future::BoxFuture;
1211
use futures::StreamExt;
1312

1413
use super::datafusion_utils::into_expr;
1514
use super::{CustomExecuteHandler, Operation};
1615
use crate::delta_datafusion::expr::fmt_expr_to_sql;
17-
use crate::delta_datafusion::{
18-
register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext,
19-
};
16+
use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder};
2017
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
2118
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner};
2219
use crate::logstore::LogStoreRef;
@@ -132,19 +129,16 @@ impl std::future::IntoFuture for ConstraintBuilder {
132129

133130
let session = this
134131
.session
135-
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
136-
.unwrap_or_else(|| {
137-
let session: SessionContext = DeltaSessionContext::default().into();
138-
session.state()
139-
});
132+
.unwrap_or_else(|| Arc::new(create_session().into_inner().state()));
140133
register_store(this.log_store.clone(), session.runtime_env().as_ref());
141134

142-
let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &session)
143-
.build()
144-
.await?;
135+
let scan =
136+
DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref())
137+
.build()
138+
.await?;
145139

146140
let schema = scan.schema().to_dfschema()?;
147-
let expr = into_expr(expr, &schema, &session)?;
141+
let expr = into_expr(expr, &schema, session.as_ref())?;
148142
let expr_str = fmt_expr_to_sql(&expr)?;
149143

150144
// Checker built here with the one time constraint to check.
@@ -156,9 +150,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
156150
for p in 0..plan.properties().output_partitioning().partition_count() {
157151
let inner_plan = plan.clone();
158152
let inner_checker = checker.clone();
159-
let task_ctx = Arc::new((&session).into());
160153
let mut record_stream: SendableRecordBatchStream =
161-
inner_plan.execute(p, task_ctx)?;
154+
inner_plan.execute(p, session.task_ctx())?;
162155
let handle: tokio::task::JoinHandle<DeltaResult<()>> =
163156
tokio::task::spawn(async move {
164157
while let Some(maybe_batch) = record_stream.next().await {

0 commit comments

Comments
 (0)