Skip to content

Commit 19865b3

Browse files
2010YOUY01martin-g
andauthored
chore: Finish refactor with assert_or_internal_err!() (#18790)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18613 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> #18613 is almost finished, I searched the codebase and refactor all the remaining patterns in this PR. Such assertion macros have been scattered to the codebase, and I have also added some error handling doc in #18762, so later we can follow this pattern and continue adopting those macros. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Martin Grigorov <[email protected]>
1 parent 958a6de commit 19865b3

File tree

14 files changed

+148
-155
lines changed

14 files changed

+148
-155
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use std::mem;
2121
use std::sync::Arc;
2222

2323
use datafusion_catalog::Session;
24-
use datafusion_common::internal_err;
25-
use datafusion_common::{HashMap, Result, ScalarValue};
24+
use datafusion_common::{
25+
assert_or_internal_err, DataFusionError, HashMap, Result, ScalarValue,
26+
};
2627
use datafusion_datasource::ListingTableUrl;
2728
use datafusion_datasource::PartitionedFile;
2829
use datafusion_expr::{lit, utils, BinaryExpr, Operator};
@@ -386,12 +387,11 @@ pub async fn pruned_partition_list<'a>(
386387
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0));
387388

388389
if partition_cols.is_empty() {
389-
if !filters.is_empty() {
390-
return internal_err!(
391-
"Got partition filters for unpartitioned table {}",
392-
table_path
393-
);
394-
}
390+
assert_or_internal_err!(
391+
filters.is_empty(),
392+
"Got partition filters for unpartitioned table {}",
393+
table_path
394+
);
395395

396396
// if no partition col => simply list all the files
397397
Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())

datafusion/common/src/scalar/mod.rs

Lines changed: 45 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use std::mem::{size_of, size_of_val};
3333
use std::str::FromStr;
3434
use std::sync::Arc;
3535

36+
use crate::assert_or_internal_err;
3637
use crate::cast::{
3738
as_binary_array, as_binary_view_array, as_boolean_array, as_date32_array,
3839
as_date64_array, as_decimal128_array, as_decimal256_array, as_decimal32_array,
@@ -78,8 +79,8 @@ use arrow::compute::kernels::numeric::{
7879
use arrow::datatypes::{
7980
i256, validate_decimal_precision_and_scale, ArrowDictionaryKeyType, ArrowNativeType,
8081
ArrowTimestampType, DataType, Date32Type, Decimal128Type, Decimal256Type,
81-
Decimal32Type, Decimal64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type,
82-
Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano,
82+
Decimal32Type, Decimal64Type, DecimalType, Field, Float32Type, Int16Type, Int32Type,
83+
Int64Type, Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano,
8384
IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
8485
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
8586
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, UnionFields,
@@ -1578,12 +1579,10 @@ impl ScalarValue {
15781579
DataType::Float32 => ScalarValue::Float32(Some(1.0)),
15791580
DataType::Float64 => ScalarValue::Float64(Some(1.0)),
15801581
DataType::Decimal32(precision, scale) => {
1581-
validate_decimal_precision_and_scale::<Decimal32Type>(
1582+
Self::validate_decimal_or_internal_err::<Decimal32Type>(
15821583
*precision, *scale,
15831584
)?;
1584-
if *scale < 0 {
1585-
return _internal_err!("Negative scale is not supported");
1586-
}
1585+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
15871586
match 10_i32.checked_pow(*scale as u32) {
15881587
Some(value) => {
15891588
ScalarValue::Decimal32(Some(value), *precision, *scale)
@@ -1592,12 +1591,10 @@ impl ScalarValue {
15921591
}
15931592
}
15941593
DataType::Decimal64(precision, scale) => {
1595-
validate_decimal_precision_and_scale::<Decimal64Type>(
1594+
Self::validate_decimal_or_internal_err::<Decimal64Type>(
15961595
*precision, *scale,
15971596
)?;
1598-
if *scale < 0 {
1599-
return _internal_err!("Negative scale is not supported");
1600-
}
1597+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16011598
match i64::from(10).checked_pow(*scale as u32) {
16021599
Some(value) => {
16031600
ScalarValue::Decimal64(Some(value), *precision, *scale)
@@ -1606,12 +1603,10 @@ impl ScalarValue {
16061603
}
16071604
}
16081605
DataType::Decimal128(precision, scale) => {
1609-
validate_decimal_precision_and_scale::<Decimal128Type>(
1606+
Self::validate_decimal_or_internal_err::<Decimal128Type>(
16101607
*precision, *scale,
16111608
)?;
1612-
if *scale < 0 {
1613-
return _internal_err!("Negative scale is not supported");
1614-
}
1609+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16151610
match i128::from(10).checked_pow(*scale as u32) {
16161611
Some(value) => {
16171612
ScalarValue::Decimal128(Some(value), *precision, *scale)
@@ -1620,12 +1615,10 @@ impl ScalarValue {
16201615
}
16211616
}
16221617
DataType::Decimal256(precision, scale) => {
1623-
validate_decimal_precision_and_scale::<Decimal256Type>(
1618+
Self::validate_decimal_or_internal_err::<Decimal256Type>(
16241619
*precision, *scale,
16251620
)?;
1626-
if *scale < 0 {
1627-
return _internal_err!("Negative scale is not supported");
1628-
}
1621+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16291622
match i256::from(10).checked_pow(*scale as u32) {
16301623
Some(value) => {
16311624
ScalarValue::Decimal256(Some(value), *precision, *scale)
@@ -1652,12 +1645,10 @@ impl ScalarValue {
16521645
DataType::Float32 => ScalarValue::Float32(Some(-1.0)),
16531646
DataType::Float64 => ScalarValue::Float64(Some(-1.0)),
16541647
DataType::Decimal32(precision, scale) => {
1655-
validate_decimal_precision_and_scale::<Decimal32Type>(
1648+
Self::validate_decimal_or_internal_err::<Decimal32Type>(
16561649
*precision, *scale,
16571650
)?;
1658-
if *scale < 0 {
1659-
return _internal_err!("Negative scale is not supported");
1660-
}
1651+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16611652
match 10_i32.checked_pow(*scale as u32) {
16621653
Some(value) => {
16631654
ScalarValue::Decimal32(Some(-value), *precision, *scale)
@@ -1666,12 +1657,10 @@ impl ScalarValue {
16661657
}
16671658
}
16681659
DataType::Decimal64(precision, scale) => {
1669-
validate_decimal_precision_and_scale::<Decimal64Type>(
1660+
Self::validate_decimal_or_internal_err::<Decimal64Type>(
16701661
*precision, *scale,
16711662
)?;
1672-
if *scale < 0 {
1673-
return _internal_err!("Negative scale is not supported");
1674-
}
1663+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16751664
match i64::from(10).checked_pow(*scale as u32) {
16761665
Some(value) => {
16771666
ScalarValue::Decimal64(Some(-value), *precision, *scale)
@@ -1680,12 +1669,10 @@ impl ScalarValue {
16801669
}
16811670
}
16821671
DataType::Decimal128(precision, scale) => {
1683-
validate_decimal_precision_and_scale::<Decimal128Type>(
1672+
Self::validate_decimal_or_internal_err::<Decimal128Type>(
16841673
*precision, *scale,
16851674
)?;
1686-
if *scale < 0 {
1687-
return _internal_err!("Negative scale is not supported");
1688-
}
1675+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
16891676
match i128::from(10).checked_pow(*scale as u32) {
16901677
Some(value) => {
16911678
ScalarValue::Decimal128(Some(-value), *precision, *scale)
@@ -1694,12 +1681,10 @@ impl ScalarValue {
16941681
}
16951682
}
16961683
DataType::Decimal256(precision, scale) => {
1697-
validate_decimal_precision_and_scale::<Decimal256Type>(
1684+
Self::validate_decimal_or_internal_err::<Decimal256Type>(
16981685
*precision, *scale,
16991686
)?;
1700-
if *scale < 0 {
1701-
return _internal_err!("Negative scale is not supported");
1702-
}
1687+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
17031688
match i256::from(10).checked_pow(*scale as u32) {
17041689
Some(value) => {
17051690
ScalarValue::Decimal256(Some(-value), *precision, *scale)
@@ -1729,14 +1714,10 @@ impl ScalarValue {
17291714
DataType::Float32 => ScalarValue::Float32(Some(10.0)),
17301715
DataType::Float64 => ScalarValue::Float64(Some(10.0)),
17311716
DataType::Decimal32(precision, scale) => {
1732-
if let Err(err) = validate_decimal_precision_and_scale::<Decimal32Type>(
1717+
Self::validate_decimal_or_internal_err::<Decimal32Type>(
17331718
*precision, *scale,
1734-
) {
1735-
return _internal_err!("Invalid precision and scale {err}");
1736-
}
1737-
if *scale < 0 {
1738-
return _internal_err!("Negative scale is not supported");
1739-
}
1719+
)?;
1720+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
17401721
match 10_i32.checked_pow((*scale + 1) as u32) {
17411722
Some(value) => {
17421723
ScalarValue::Decimal32(Some(value), *precision, *scale)
@@ -1745,14 +1726,10 @@ impl ScalarValue {
17451726
}
17461727
}
17471728
DataType::Decimal64(precision, scale) => {
1748-
if let Err(err) = validate_decimal_precision_and_scale::<Decimal64Type>(
1729+
Self::validate_decimal_or_internal_err::<Decimal64Type>(
17491730
*precision, *scale,
1750-
) {
1751-
return _internal_err!("Invalid precision and scale {err}");
1752-
}
1753-
if *scale < 0 {
1754-
return _internal_err!("Negative scale is not supported");
1755-
}
1731+
)?;
1732+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
17561733
match i64::from(10).checked_pow((*scale + 1) as u32) {
17571734
Some(value) => {
17581735
ScalarValue::Decimal64(Some(value), *precision, *scale)
@@ -1761,14 +1738,10 @@ impl ScalarValue {
17611738
}
17621739
}
17631740
DataType::Decimal128(precision, scale) => {
1764-
if let Err(err) = validate_decimal_precision_and_scale::<Decimal128Type>(
1741+
Self::validate_decimal_or_internal_err::<Decimal128Type>(
17651742
*precision, *scale,
1766-
) {
1767-
return _internal_err!("Invalid precision and scale {err}");
1768-
}
1769-
if *scale < 0 {
1770-
return _internal_err!("Negative scale is not supported");
1771-
}
1743+
)?;
1744+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
17721745
match i128::from(10).checked_pow((*scale + 1) as u32) {
17731746
Some(value) => {
17741747
ScalarValue::Decimal128(Some(value), *precision, *scale)
@@ -1777,14 +1750,10 @@ impl ScalarValue {
17771750
}
17781751
}
17791752
DataType::Decimal256(precision, scale) => {
1780-
if let Err(err) = validate_decimal_precision_and_scale::<Decimal256Type>(
1753+
Self::validate_decimal_or_internal_err::<Decimal256Type>(
17811754
*precision, *scale,
1782-
) {
1783-
return _internal_err!("Invalid precision and scale {err}");
1784-
}
1785-
if *scale < 0 {
1786-
return _internal_err!("Negative scale is not supported");
1787-
}
1755+
)?;
1756+
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
17881757
match i256::from(10).checked_pow((*scale + 1) as u32) {
17891758
Some(value) => {
17901759
ScalarValue::Decimal256(Some(value), *precision, *scale)
@@ -4354,6 +4323,20 @@ impl ScalarValue {
43544323
_ => None,
43554324
}
43564325
}
4326+
4327+
/// A thin wrapper on Arrow's validation that throws internal error if validation
4328+
/// fails.
4329+
fn validate_decimal_or_internal_err<T: DecimalType>(
4330+
precision: u8,
4331+
scale: i8,
4332+
) -> Result<()> {
4333+
validate_decimal_precision_and_scale::<T>(precision, scale).map_err(|err| {
4334+
_internal_datafusion_err!(
4335+
"Decimal precision/scale invariant violated \
4336+
(precision={precision}, scale={scale}): {err}"
4337+
)
4338+
})
4339+
}
43574340
}
43584341

43594342
/// Compacts the data of an `ArrayData` into a new `ArrayData`.

datafusion/common/src/utils/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ pub mod memory;
2222
pub mod proxy;
2323
pub mod string_utils;
2424

25-
use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
26-
use crate::{Result, ScalarValue};
25+
use crate::assert_or_internal_err;
26+
use crate::error::{_exec_datafusion_err, _internal_datafusion_err};
27+
use crate::{DataFusionError, Result, ScalarValue};
2728
use arrow::array::{
2829
cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
2930
OffsetSizeTrait,
@@ -519,9 +520,7 @@ pub fn arrays_into_list_array(
519520
arr: impl IntoIterator<Item = ArrayRef>,
520521
) -> Result<ListArray> {
521522
let arr = arr.into_iter().collect::<Vec<_>>();
522-
if arr.is_empty() {
523-
return _internal_err!("Cannot wrap empty array into list array");
524-
}
523+
assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into list array");
525524

526525
let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
527526
// Assume data type is consistent

datafusion/core/src/physical_planner.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,11 +1757,11 @@ fn qualify_join_schema_sides(
17571757
let join_fields = join_schema.fields();
17581758

17591759
// Validate lengths
1760-
if join_fields.len() != left_fields.len() + right_fields.len() {
1761-
return internal_err!(
1762-
"Join schema field count must match left and right field count."
1763-
);
1764-
}
1760+
assert_eq_or_internal_err!(
1761+
join_fields.len(),
1762+
left_fields.len() + right_fields.len(),
1763+
"Join schema field count must match left and right field count."
1764+
);
17651765

17661766
// Validate field names match
17671767
for (i, (field, expected)) in join_fields

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,10 @@ async fn bad_selection() {
257257
.await
258258
.unwrap_err();
259259
let err_string = err.to_string();
260-
assert_contains!(&err_string, "Internal error: Invalid ParquetAccessPlan Selection. Row group 0 has 5 rows but selection only specifies 4 rows");
260+
assert_contains!(
261+
&err_string,
262+
"Row group 0 has 5 rows but selection only specifies 4 rows."
263+
);
261264
}
262265

263266
/// Return a RowSelection of 1 rows from a row group of 5 rows

datafusion/core/tests/user_defined/user_defined_plan.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use arrow::{
7070
use datafusion::execution::session_state::SessionStateBuilder;
7171
use datafusion::{
7272
common::cast::as_int64_array,
73-
common::{arrow_datafusion_err, internal_err, DFSchemaRef},
73+
common::{arrow_datafusion_err, DFSchemaRef},
7474
error::{DataFusionError, Result},
7575
execution::{
7676
context::{QueryPlanner, SessionState, TaskContext},
@@ -91,7 +91,7 @@ use datafusion::{
9191
};
9292
use datafusion_common::config::ConfigOptions;
9393
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
94-
use datafusion_common::ScalarValue;
94+
use datafusion_common::{assert_eq_or_internal_err, assert_or_internal_err, ScalarValue};
9595
use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr};
9696
use datafusion_optimizer::optimizer::ApplyOrder;
9797
use datafusion_optimizer::AnalyzerRule;
@@ -585,9 +585,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
585585
kind,
586586
}) = self.invariant_mock.clone()
587587
{
588-
if should_fail_invariant && check == kind {
589-
return internal_err!("node fails check, such as improper inputs");
590-
}
588+
assert_or_internal_err!(
589+
!(should_fail_invariant && check == kind),
590+
"node fails check, such as improper inputs"
591+
);
591592
}
592593
Ok(())
593594
}
@@ -733,9 +734,11 @@ impl ExecutionPlan for TopKExec {
733734
partition: usize,
734735
context: Arc<TaskContext>,
735736
) -> Result<SendableRecordBatchStream> {
736-
if 0 != partition {
737-
return internal_err!("TopKExec invalid partition {partition}");
738-
}
737+
assert_eq_or_internal_err!(
738+
partition,
739+
0,
740+
"TopKExec invalid partition {partition}"
741+
);
739742

740743
Ok(Box::pin(TopKReader {
741744
input: self.input.execute(partition, context)?,

0 commit comments

Comments
 (0)