Skip to content

Commit f17253b

Browse files
committed
Merge branch 'main' into nth_value_func_ref
2 parents 5c4e0f6 + 7ebd993 commit f17253b

File tree

49 files changed

+1149
-456
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1149
-456
lines changed

datafusion-cli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/table_reference.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::utils::{parse_identifiers_normalized, quote_identifier};
1919
use std::sync::Arc;
2020

2121
/// A fully resolved path to a table of the form "catalog.schema.table"
22-
#[derive(Debug, Clone)]
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
2323
pub struct ResolvedTableReference {
2424
/// The catalog (aka database) containing the table
2525
pub catalog: Arc<str>,

datafusion/core/benches/sql_planner.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ mod data_utils;
2424

2525
use crate::criterion::Criterion;
2626
use arrow::datatypes::{DataType, Field, Fields, Schema};
27+
use criterion::Bencher;
2728
use datafusion::datasource::MemTable;
2829
use datafusion::execution::context::SessionContext;
30+
use datafusion_common::ScalarValue;
2931
use itertools::Itertools;
3032
use std::fs::File;
3133
use std::io::{BufRead, BufReader};
@@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext {
122124
ctx
123125
}
124126

127+
/// Target of this benchmark: control that placeholders replacing does not get slower,
128+
/// if the query does not contain placeholders at all.
129+
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {
130+
const COLUMNS_NUM: usize = 200;
131+
let mut aggregates = String::new();
132+
for i in 0..COLUMNS_NUM {
133+
if i > 0 {
134+
aggregates.push_str(", ");
135+
}
136+
aggregates.push_str(format!("MAX(a{})", i).as_str());
137+
}
138+
// SELECT max(attr0), ..., max(attrN) FROM t1.
139+
let query = format!("SELECT {} FROM t1", aggregates);
140+
let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
141+
let rt = Runtime::new().unwrap();
142+
let plan =
143+
rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() });
144+
b.iter(|| {
145+
let plan = plan.clone();
146+
criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap());
147+
});
148+
}
149+
125150
fn criterion_benchmark(c: &mut Criterion) {
126151
// verify that we can load the clickbench data prior to running the benchmark
127152
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
@@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) {
388413
}
389414
})
390415
});
416+
417+
c.bench_function("with_param_values_many_columns", |b| {
418+
benchmark_with_param_values_many_columns(&ctx, b);
419+
});
391420
}
392421

393422
criterion_group!(benches, criterion_benchmark);

datafusion/core/src/execution/context/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -688,11 +688,11 @@ impl SessionContext {
688688
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
689689
self.set_variable(stmt).await
690690
}
691-
LogicalPlan::Prepare(Prepare {
691+
LogicalPlan::Statement(Statement::Prepare(Prepare {
692692
name,
693693
input,
694694
data_types,
695-
}) => {
695+
})) => {
696696
// The number of parameters must match the specified data types length.
697697
if !data_types.is_empty() {
698698
let param_names = input.get_parameter_names()?;
@@ -712,7 +712,15 @@ impl SessionContext {
712712
self.state.write().store_prepared(name, data_types, input)?;
713713
self.return_empty_dataframe()
714714
}
715-
LogicalPlan::Execute(execute) => self.execute_prepared(execute),
715+
LogicalPlan::Statement(Statement::Execute(execute)) => {
716+
self.execute_prepared(execute)
717+
}
718+
LogicalPlan::Statement(Statement::Deallocate(deallocate)) => {
719+
self.state
720+
.write()
721+
.remove_prepared(deallocate.name.as_str())?;
722+
self.return_empty_dataframe()
723+
}
716724
plan => Ok(DataFrame::new(self.state(), plan)),
717725
}
718726
}
@@ -1773,14 +1781,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
17731781
LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
17741782
plan_err!("Statement not supported: {}", stmt.name())
17751783
}
1776-
// TODO: Implement PREPARE as a LogicalPlan::Statement
1777-
LogicalPlan::Prepare(_) if !self.options.allow_statements => {
1778-
plan_err!("Statement not supported: PREPARE")
1779-
}
1780-
// TODO: Implement EXECUTE as a LogicalPlan::Statement
1781-
LogicalPlan::Execute(_) if !self.options.allow_statements => {
1782-
plan_err!("Statement not supported: EXECUTE")
1783-
}
17841784
_ => Ok(TreeNodeRecursion::Continue),
17851785
}
17861786
}

datafusion/core/src/execution/session_state.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,9 @@ impl SessionState {
540540
};
541541

542542
for reference in references {
543-
let resolved = &self.resolve_table_ref(reference);
544-
if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) {
543+
let resolved = self.resolve_table_ref(reference);
544+
if let Entry::Vacant(v) = provider.tables.entry(resolved) {
545+
let resolved = v.key();
545546
if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
546547
if let Some(table) = schema.table(&resolved.table).await? {
547548
v.insert(provider_as_source(table));
@@ -933,6 +934,17 @@ impl SessionState {
933934
pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> {
934935
self.prepared_plans.get(name).map(Arc::clone)
935936
}
937+
938+
/// Remove the prepared plan with the given name.
939+
pub(crate) fn remove_prepared(
940+
&mut self,
941+
name: &str,
942+
) -> datafusion_common::Result<()> {
943+
match self.prepared_plans.remove(name) {
944+
Some(_) => Ok(()),
945+
None => exec_err!("Prepared statement '{}' does not exist", name),
946+
}
947+
}
936948
}
937949

938950
/// A builder to be used for building [`SessionState`]'s. Defaults will
@@ -1599,7 +1611,7 @@ impl From<SessionState> for SessionStateBuilder {
15991611
/// having a direct dependency on the [`SessionState`] struct (and core crate)
16001612
struct SessionContextProvider<'a> {
16011613
state: &'a SessionState,
1602-
tables: HashMap<String, Arc<dyn TableSource>>,
1614+
tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
16031615
}
16041616

16051617
impl<'a> ContextProvider for SessionContextProvider<'a> {
@@ -1611,7 +1623,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
16111623
&self,
16121624
name: TableReference,
16131625
) -> datafusion_common::Result<Arc<dyn TableSource>> {
1614-
let name = self.state.resolve_table_ref(name).to_string();
1626+
let name = self.state.resolve_table_ref(name);
16151627
self.tables
16161628
.get(&name)
16171629
.cloned()

datafusion/core/src/physical_planner.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner {
11951195
let name = statement.name();
11961196
return not_impl_err!("Unsupported logical plan: Statement({name})");
11971197
}
1198-
LogicalPlan::Prepare(_) => {
1199-
// There is no default plan for "PREPARE" -- it must be
1200-
// handled at a higher level (so that the appropriate
1201-
// statement can be prepared)
1202-
return not_impl_err!("Unsupported logical plan: Prepare");
1203-
}
1204-
LogicalPlan::Execute(_) => {
1205-
return not_impl_err!("Unsupported logical plan: Execute");
1206-
}
12071198
LogicalPlan::Dml(dml) => {
12081199
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
12091200
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);

datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use datafusion_common::HashMap;
5050
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5151
use rand::rngs::StdRng;
5252
use rand::{thread_rng, Rng, SeedableRng};
53+
use std::str;
5354
use tokio::task::JoinSet;
5455

5556
// ========================================================================
@@ -171,6 +172,21 @@ fn baseline_config() -> DatasetGeneratorConfig {
171172
ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)),
172173
ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)),
173174
ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)),
175+
ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)),
176+
ColumnDescr::new(
177+
"timestamp_ms",
178+
DataType::Timestamp(TimeUnit::Millisecond, None),
179+
),
180+
ColumnDescr::new(
181+
"timestamp_us",
182+
DataType::Timestamp(TimeUnit::Microsecond, None),
183+
),
184+
ColumnDescr::new(
185+
"timestamp_ns",
186+
DataType::Timestamp(TimeUnit::Nanosecond, None),
187+
),
188+
ColumnDescr::new("float32", DataType::Float32),
189+
ColumnDescr::new("float64", DataType::Float64),
174190
ColumnDescr::new(
175191
"interval_year_month",
176192
DataType::Interval(IntervalUnit::YearMonth),
@@ -206,10 +222,12 @@ fn baseline_config() -> DatasetGeneratorConfig {
206222
ColumnDescr::new("utf8", DataType::Utf8),
207223
ColumnDescr::new("largeutf8", DataType::LargeUtf8),
208224
ColumnDescr::new("utf8view", DataType::Utf8View),
209-
// todo binary
210225
// low cardinality columns
211226
ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10),
212227
ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10),
228+
ColumnDescr::new("binary", DataType::Binary),
229+
ColumnDescr::new("large_binary", DataType::LargeBinary),
230+
ColumnDescr::new("binaryview", DataType::BinaryView),
213231
];
214232

215233
let min_num_rows = 512;

datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
use std::sync::Arc;
1919

2020
use arrow::datatypes::{
21-
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
22-
Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
23-
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, LargeUtf8Type,
24-
StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
25-
Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
21+
BinaryType, BinaryViewType, ByteArrayType, ByteViewType, Date32Type, Date64Type,
22+
Decimal128Type, Decimal256Type, Float32Type, Float64Type, Int16Type, Int32Type,
23+
Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType,
24+
IntervalYearMonthType, LargeBinaryType, LargeUtf8Type, StringViewType,
25+
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
26+
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
27+
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
2628
};
2729
use arrow_array::{ArrayRef, RecordBatch};
2830
use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
@@ -35,7 +37,10 @@ use rand::{
3537
thread_rng, Rng, SeedableRng,
3638
};
3739
use test_utils::{
38-
array_gen::{DecimalArrayGenerator, PrimitiveArrayGenerator, StringArrayGenerator},
40+
array_gen::{
41+
BinaryArrayGenerator, DecimalArrayGenerator, PrimitiveArrayGenerator,
42+
StringArrayGenerator,
43+
},
3944
stagger_batch,
4045
};
4146

@@ -71,17 +76,19 @@ pub struct DatasetGeneratorConfig {
7176
}
7277

7378
impl DatasetGeneratorConfig {
74-
/// return a list of all column names
79+
/// Return a list of all column names
7580
pub fn all_columns(&self) -> Vec<&str> {
7681
self.columns.iter().map(|d| d.name.as_str()).collect()
7782
}
7883

79-
/// return a list of column names that are "numeric"
84+
/// Return a list of column names that are "numeric"
8085
pub fn numeric_columns(&self) -> Vec<&str> {
8186
self.columns
8287
.iter()
8388
.filter_map(|d| {
84-
if d.column_type.is_numeric() {
89+
if d.column_type.is_numeric()
90+
&& !matches!(d.column_type, DataType::Float32 | DataType::Float64)
91+
{
8592
Some(d.name.as_str())
8693
} else {
8794
None
@@ -278,6 +285,37 @@ macro_rules! generate_primitive_array {
278285
}};
279286
}
280287

288+
macro_rules! generate_binary_array {
289+
(
290+
$SELF:ident,
291+
$NUM_ROWS:ident,
292+
$MAX_NUM_DISTINCT:expr,
293+
$BATCH_GEN_RNG:ident,
294+
$ARRAY_GEN_RNG:ident,
295+
$ARROW_TYPE:ident
296+
) => {{
297+
let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len());
298+
let null_pct = $SELF.candidate_null_pcts[null_pct_idx];
299+
300+
let max_len = $BATCH_GEN_RNG.gen_range(1..100);
301+
302+
let mut generator = BinaryArrayGenerator {
303+
max_len,
304+
num_binaries: $NUM_ROWS,
305+
num_distinct_binaries: $MAX_NUM_DISTINCT,
306+
null_pct,
307+
rng: $ARRAY_GEN_RNG,
308+
};
309+
310+
match $ARROW_TYPE::DATA_TYPE {
311+
DataType::Binary => generator.gen_data::<i32>(),
312+
DataType::LargeBinary => generator.gen_data::<i64>(),
313+
DataType::BinaryView => generator.gen_binary_view(),
314+
_ => unreachable!(),
315+
}
316+
}};
317+
}
318+
281319
impl RecordBatchGenerator {
282320
fn new(min_rows_nun: usize, max_rows_num: usize, columns: Vec<ColumnDescr>) -> Self {
283321
let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5];
@@ -527,6 +565,76 @@ impl RecordBatchGenerator {
527565
IntervalMonthDayNanoType
528566
)
529567
}
568+
DataType::Timestamp(TimeUnit::Second, None) => {
569+
generate_primitive_array!(
570+
self,
571+
num_rows,
572+
max_num_distinct,
573+
batch_gen_rng,
574+
array_gen_rng,
575+
TimestampSecondType
576+
)
577+
}
578+
DataType::Timestamp(TimeUnit::Millisecond, None) => {
579+
generate_primitive_array!(
580+
self,
581+
num_rows,
582+
max_num_distinct,
583+
batch_gen_rng,
584+
array_gen_rng,
585+
TimestampMillisecondType
586+
)
587+
}
588+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
589+
generate_primitive_array!(
590+
self,
591+
num_rows,
592+
max_num_distinct,
593+
batch_gen_rng,
594+
array_gen_rng,
595+
TimestampMicrosecondType
596+
)
597+
}
598+
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
599+
generate_primitive_array!(
600+
self,
601+
num_rows,
602+
max_num_distinct,
603+
batch_gen_rng,
604+
array_gen_rng,
605+
TimestampNanosecondType
606+
)
607+
}
608+
DataType::Binary => {
609+
generate_binary_array!(
610+
self,
611+
num_rows,
612+
max_num_distinct,
613+
batch_gen_rng,
614+
array_gen_rng,
615+
BinaryType
616+
)
617+
}
618+
DataType::LargeBinary => {
619+
generate_binary_array!(
620+
self,
621+
num_rows,
622+
max_num_distinct,
623+
batch_gen_rng,
624+
array_gen_rng,
625+
LargeBinaryType
626+
)
627+
}
628+
DataType::BinaryView => {
629+
generate_binary_array!(
630+
self,
631+
num_rows,
632+
max_num_distinct,
633+
batch_gen_rng,
634+
array_gen_rng,
635+
BinaryViewType
636+
)
637+
}
530638
DataType::Decimal128(precision, scale) => {
531639
generate_decimal_array!(
532640
self,

0 commit comments

Comments
 (0)