Skip to content

Commit 48ff058

Browse files
committed
chore: Improve error handling
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent 8e60e17 commit 48ff058

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

datafusion/common/src/error.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ impl Display for DataFusionError {
153153
write!(f, "This feature is not implemented: {}", desc)
154154
}
155155
DataFusionError::Internal(ref desc) => {
156-
write!(f, "Internal error: {}. This was likely caused by a bug in DataFusion's \
157-
code and we would welcome that you file an bug report in our issue tracker", desc)
156+
write!(f, "Internal error: {}", desc)
158157
}
159158
DataFusionError::Plan(ref desc) => {
160159
write!(f, "Error during planning: {}", desc)

datafusion/core/src/physical_plan/repartition.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use crate::error::{DataFusionError, Result};
2727
use crate::physical_plan::hash_utils::create_hashes;
2828
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
2929
use arrow::record_batch::RecordBatch;
30-
use arrow::{array::Array, error::Result as ArrowResult};
30+
use arrow::{
31+
array::Array,
32+
error::{ArrowError, Result as ArrowResult},
33+
};
3134
use arrow::{compute::take, datatypes::SchemaRef};
3235
use tokio_stream::wrappers::UnboundedReceiverStream;
3336

@@ -369,9 +372,8 @@ impl RepartitionExec {
369372
.columns()
370373
.iter()
371374
.map(|c| {
372-
take(c.as_ref(), &indices, None).map_err(|e| {
373-
DataFusionError::Execution(e.to_string())
374-
})
375+
take(c.as_ref(), &indices, None)
376+
.map_err(DataFusionError::ArrowError)
375377
})
376378
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
377379
let output_batch =
@@ -426,9 +428,33 @@ impl RepartitionExec {
426428
}
427429
// Error from running input task
428430
Ok(Err(e)) => {
431+
// try to unwrap nested errors
432+
let mut err = &e;
433+
let mut message = None;
434+
// limit the number of unwraps to avoid potential infinite/deep loops
435+
for _ in 0..100 {
436+
if let DataFusionError::External(ext_err) = err {
437+
message = Some(ext_err.to_string());
438+
break;
439+
}
440+
let DataFusionError::ArrowError(arrow_err) = err else {
441+
message = Some(err.to_string());
442+
break;
443+
};
444+
let ArrowError::ExternalError(ext_err) = arrow_err else {
445+
message = Some(arrow_err.to_string());
446+
break;
447+
};
448+
let Some(df_err) = ext_err.downcast_ref::<DataFusionError>() else {
449+
message = Some(ext_err.to_string());
450+
break;
451+
};
452+
err = df_err;
453+
}
454+
let message = message.unwrap_or_else(|| err.to_string());
429455
for (_, tx) in txs {
430456
// wrap it because need to send error to all output partitions
431-
let err = DataFusionError::Execution(e.to_string());
457+
let err = DataFusionError::Execution(message.clone());
432458
let err = Err(err.into());
433459
tx.send(Some(err)).ok();
434460
}

0 commit comments

Comments
 (0)