Skip to content

Commit 602ab33

Browse files
authored
chore: clean up planner.rs (apache#1650)
1 parent e08ecdc commit 602ab33

File tree

1 file changed

+29
-51
lines changed

1 file changed

+29
-51
lines changed

native/core/src/execution/planner.rs

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ use datafusion::{
4848
functions_aggregate::first_last::{FirstValue, LastValue},
4949
logical_expr::Operator as DataFusionOperator,
5050
physical_expr::{
51-
execution_props::ExecutionProps,
5251
expressions::{
5352
in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr,
5453
Literal as DataFusionLiteral, NotExpr,
@@ -141,36 +140,30 @@ pub const TEST_EXEC_CONTEXT_ID: i64 = -1;
141140
pub struct PhysicalPlanner {
142141
// The execution context id of this planner.
143142
exec_context_id: i64,
144-
execution_props: ExecutionProps,
145143
session_ctx: Arc<SessionContext>,
146144
}
147145

148146
impl Default for PhysicalPlanner {
149147
fn default() -> Self {
150148
let session_ctx = Arc::new(SessionContext::new());
151-
let execution_props = ExecutionProps::new();
152149
Self {
153150
exec_context_id: TEST_EXEC_CONTEXT_ID,
154-
execution_props,
155151
session_ctx,
156152
}
157153
}
158154
}
159155

160156
impl PhysicalPlanner {
161157
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
162-
let execution_props = ExecutionProps::new();
163158
Self {
164159
exec_context_id: TEST_EXEC_CONTEXT_ID,
165-
execution_props,
166160
session_ctx,
167161
}
168162
}
169163

170164
pub fn with_exec_id(self, exec_context_id: i64) -> Self {
171165
Self {
172166
exec_context_id,
173-
execution_props: self.execution_props,
174167
session_ctx: Arc::clone(&self.session_ctx),
175168
}
176169
}
@@ -216,9 +209,7 @@ impl PhysicalPlanner {
216209
.as_any()
217210
.downcast_ref::<DataFusionLiteral>()
218211
.ok_or_else(|| {
219-
ExecutionError::GeneralError(
220-
"Expected literal of partition value".to_string(),
221-
)
212+
GeneralError("Expected literal of partition value".to_string())
222213
})
223214
.map(|literal| literal.value().clone())
224215
})
@@ -331,7 +322,7 @@ impl PhysicalPlanner {
331322
ExprStruct::Bound(bound) => {
332323
let idx = bound.index as usize;
333324
if idx >= input_schema.fields().len() {
334-
return Err(ExecutionError::GeneralError(format!(
325+
return Err(GeneralError(format!(
335326
"Column index {} is out of bound. Schema: {}",
336327
idx, input_schema
337328
)));
@@ -389,10 +380,7 @@ impl PhysicalPlanner {
389380
DataType::Struct(fields) => ScalarStructBuilder::new_null(fields),
390381
DataType::Null => ScalarValue::Null,
391382
dt => {
392-
return Err(ExecutionError::GeneralError(format!(
393-
"{:?} is not supported in Comet",
394-
dt
395-
)))
383+
return Err(GeneralError(format!("{:?} is not supported in Comet", dt)))
396384
}
397385
}
398386
} else {
@@ -404,7 +392,7 @@ impl PhysicalPlanner {
404392
DataType::Int32 => ScalarValue::Int32(Some(*value)),
405393
DataType::Date32 => ScalarValue::Date32(Some(*value)),
406394
dt => {
407-
return Err(ExecutionError::GeneralError(format!(
395+
return Err(GeneralError(format!(
408396
"Expected either 'Int32' or 'Date32' for IntVal, but found {:?}",
409397
dt
410398
)))
@@ -419,7 +407,7 @@ impl PhysicalPlanner {
419407
ScalarValue::TimestampMicrosecond(Some(*value), Some(tz))
420408
}
421409
dt => {
422-
return Err(ExecutionError::GeneralError(format!(
410+
return Err(GeneralError(format!(
423411
"Expected either 'Int64' or 'Timestamp' for LongVal, but found {:?}",
424412
dt
425413
)))
@@ -432,7 +420,7 @@ impl PhysicalPlanner {
432420
Value::DecimalVal(value) => {
433421
let big_integer = BigInt::from_signed_bytes_be(value);
434422
let integer = big_integer.to_i128().ok_or_else(|| {
435-
ExecutionError::GeneralError(format!(
423+
GeneralError(format!(
436424
"Cannot parse {:?} as i128 for Decimal literal",
437425
big_integer
438426
))
@@ -443,7 +431,7 @@ impl PhysicalPlanner {
443431
ScalarValue::Decimal128(Some(integer), p, s)
444432
}
445433
dt => {
446-
return Err(ExecutionError::GeneralError(format!(
434+
return Err(GeneralError(format!(
447435
"Decimal literal's data type should be Decimal128 but got {:?}",
448436
dt
449437
)))
@@ -551,7 +539,7 @@ impl PhysicalPlanner {
551539
ScalarValue::Utf8(Some(pattern)) => {
552540
Ok(Arc::new(RLike::try_new(left, pattern)?))
553541
}
554-
_ => Err(ExecutionError::GeneralError(
542+
_ => Err(GeneralError(
555543
"RLike only supports scalar patterns".to_string(),
556544
)),
557545
}
@@ -690,7 +678,7 @@ impl PhysicalPlanner {
690678
let child: Arc<dyn PhysicalExpr> =
691679
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
692680
let result = create_negate_expr(child, expr.fail_on_error);
693-
result.map_err(|e| ExecutionError::GeneralError(e.to_string()))
681+
result.map_err(|e| GeneralError(e.to_string()))
694682
}
695683
ExprStruct::NormalizeNanAndZero(expr) => {
696684
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
@@ -774,10 +762,7 @@ impl PhysicalPlanner {
774762
expr.legacy_negative_index,
775763
)))
776764
}
777-
expr => Err(ExecutionError::GeneralError(format!(
778-
"Not implemented: {:?}",
779-
expr
780-
))),
765+
expr => Err(GeneralError(format!("Not implemented: {:?}", expr))),
781766
}
782767
}
783768

@@ -803,10 +788,7 @@ impl PhysicalPlanner {
803788
options,
804789
})
805790
}
806-
expr => Err(ExecutionError::GeneralError(format!(
807-
"{:?} isn't a SortOrder",
808-
expr
809-
))),
791+
expr => Err(GeneralError(format!("{:?} isn't a SortOrder", expr))),
810792
}
811793
}
812794

@@ -1130,9 +1112,7 @@ impl PhysicalPlanner {
11301112
.first()
11311113
.and_then(|f| f.partitioned_file.first())
11321114
.map(|f| f.file_path.clone())
1133-
.ok_or(ExecutionError::GeneralError(
1134-
"Failed to locate file".to_string(),
1135-
))?;
1115+
.ok_or(GeneralError("Failed to locate file".to_string()))?;
11361116
let (object_store_url, _) =
11371117
prepare_object_store(self.session_ctx.runtime_env(), one_file)?;
11381118

@@ -1176,9 +1156,7 @@ impl PhysicalPlanner {
11761156
// If it is not test execution context for unit test, we should have at least one
11771157
// input source
11781158
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {
1179-
return Err(ExecutionError::GeneralError(
1180-
"No input for scan".to_string(),
1181-
));
1159+
return Err(GeneralError("No input for scan".to_string()));
11821160
}
11831161

11841162
// Consumes the first input source for the scan
@@ -1212,7 +1190,7 @@ impl PhysicalPlanner {
12121190
Ok(CompressionCodec::Zstd(writer.compression_level))
12131191
}
12141192
Ok(SparkCompressionCodec::Lz4) => Ok(CompressionCodec::Lz4Frame),
1215-
_ => Err(ExecutionError::GeneralError(format!(
1193+
_ => Err(GeneralError(format!(
12161194
"Unsupported shuffle compression codec: {:?}",
12171195
writer.codec
12181196
))),
@@ -1513,7 +1491,7 @@ impl PhysicalPlanner {
15131491
Ok(JoinType::LeftAnti) => DFJoinType::LeftAnti,
15141492
Ok(JoinType::RightAnti) => DFJoinType::RightAnti,
15151493
Err(_) => {
1516-
return Err(ExecutionError::GeneralError(format!(
1494+
return Err(GeneralError(format!(
15171495
"Unsupported join type: {:?}",
15181496
join_type
15191497
)));
@@ -1842,7 +1820,7 @@ impl PhysicalPlanner {
18421820
func,
18431821
)
18441822
}
1845-
stats_type => Err(ExecutionError::GeneralError(format!(
1823+
stats_type => Err(GeneralError(format!(
18461824
"Unknown StatisticsType {:?} for Variance",
18471825
stats_type
18481826
))),
@@ -1872,7 +1850,7 @@ impl PhysicalPlanner {
18721850

18731851
Self::create_aggr_func_expr("variance_pop", schema, vec![child], func)
18741852
}
1875-
stats_type => Err(ExecutionError::GeneralError(format!(
1853+
stats_type => Err(GeneralError(format!(
18761854
"Unknown StatisticsType {:?} for Variance",
18771855
stats_type
18781856
))),
@@ -1902,7 +1880,7 @@ impl PhysicalPlanner {
19021880

19031881
Self::create_aggr_func_expr("stddev_pop", schema, vec![child], func)
19041882
}
1905-
stats_type => Err(ExecutionError::GeneralError(format!(
1883+
stats_type => Err(GeneralError(format!(
19061884
"Unknown StatisticsType {:?} for stddev",
19071885
stats_type
19081886
))),
@@ -1959,7 +1937,7 @@ impl PhysicalPlanner {
19591937
.collect::<Result<Vec<_>, ExecutionError>>()?;
19601938
}
19611939
other => {
1962-
return Err(ExecutionError::GeneralError(format!(
1940+
return Err(GeneralError(format!(
19631941
"{other:?} not supported for window function"
19641942
)))
19651943
}
@@ -1969,15 +1947,15 @@ impl PhysicalPlanner {
19691947
window_func_name = result.0;
19701948
window_args = result.1;
19711949
} else {
1972-
return Err(ExecutionError::GeneralError(
1950+
return Err(GeneralError(
19731951
"Both func and agg_func are not set".to_string(),
19741952
));
19751953
}
19761954

19771955
let window_func = match self.find_df_window_function(&window_func_name) {
19781956
Some(f) => f,
19791957
_ => {
1980-
return Err(ExecutionError::GeneralError(format!(
1958+
return Err(GeneralError(format!(
19811959
"{window_func_name} not supported for window function"
19821960
)))
19831961
}
@@ -2015,7 +1993,7 @@ impl PhysicalPlanner {
20151993
WindowFrameBound::Preceding(ScalarValue::Int64(None))
20161994
}
20171995
WindowFrameUnits::Groups => {
2018-
return Err(ExecutionError::GeneralError(
1996+
return Err(GeneralError(
20191997
"WindowFrameUnits::Groups is not supported.".to_string(),
20201998
));
20211999
}
@@ -2030,7 +2008,7 @@ impl PhysicalPlanner {
20302008
WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value)))
20312009
}
20322010
WindowFrameUnits::Groups => {
2033-
return Err(ExecutionError::GeneralError(
2011+
return Err(GeneralError(
20342012
"WindowFrameUnits::Groups is not supported.".to_string(),
20352013
));
20362014
}
@@ -2042,7 +2020,7 @@ impl PhysicalPlanner {
20422020
WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
20432021
WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::Int64(None)),
20442022
WindowFrameUnits::Groups => {
2045-
return Err(ExecutionError::GeneralError(
2023+
return Err(GeneralError(
20462024
"WindowFrameUnits::Groups is not supported.".to_string(),
20472025
));
20482026
}
@@ -2063,7 +2041,7 @@ impl PhysicalPlanner {
20632041
WindowFrameBound::Following(ScalarValue::Int64(None))
20642042
}
20652043
WindowFrameUnits::Groups => {
2066-
return Err(ExecutionError::GeneralError(
2044+
return Err(GeneralError(
20672045
"WindowFrameUnits::Groups is not supported.".to_string(),
20682046
));
20692047
}
@@ -2076,7 +2054,7 @@ impl PhysicalPlanner {
20762054
WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset)))
20772055
}
20782056
WindowFrameUnits::Groups => {
2079-
return Err(ExecutionError::GeneralError(
2057+
return Err(GeneralError(
20802058
"WindowFrameUnits::Groups is not supported.".to_string(),
20812059
));
20822060
}
@@ -2087,7 +2065,7 @@ impl PhysicalPlanner {
20872065
WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)),
20882066
WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::Int64(None)),
20892067
WindowFrameUnits::Groups => {
2090-
return Err(ExecutionError::GeneralError(
2068+
return Err(GeneralError(
20912069
"WindowFrameUnits::Groups is not supported.".to_string(),
20922070
));
20932071
}
@@ -2143,7 +2121,7 @@ impl PhysicalPlanner {
21432121
};
21442122
Ok(("sum".to_string(), vec![child]))
21452123
}
2146-
other => Err(ExecutionError::GeneralError(format!(
2124+
other => Err(GeneralError(format!(
21472125
"{other:?} not supported for window function"
21482126
))),
21492127
}
@@ -2791,7 +2769,7 @@ mod tests {
27912769
let op = create_filter(op_scan, 0);
27922770
let planner = PhysicalPlanner::default();
27932771

2794-
let (mut _scans, filter_exec) = planner.create_plan(&op, &mut vec![], 1).unwrap();
2772+
let (_scans, filter_exec) = planner.create_plan(&op, &mut vec![], 1).unwrap();
27952773

27962774
assert_eq!("CometFilterExec", filter_exec.native_plan.name());
27972775
assert_eq!(1, filter_exec.children.len());

0 commit comments

Comments
 (0)