Skip to content

Commit 5279d6f

Browse files
committed
chore: Improve error handling
Signed-off-by: Alex Qyoun-ae <[email protected]>
1 parent 8e60e17 commit 5279d6f

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

datafusion/common/src/error.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ impl Display for DataFusionError {
153153
write!(f, "This feature is not implemented: {}", desc)
154154
}
155155
DataFusionError::Internal(ref desc) => {
156-
write!(f, "Internal error: {}. This was likely caused by a bug in DataFusion's \
157-
code and we would welcome that you file an bug report in our issue tracker", desc)
156+
write!(f, "Internal error: {}", desc)
158157
}
159158
DataFusionError::Plan(ref desc) => {
160159
write!(f, "Error during planning: {}", desc)

datafusion/core/src/physical_plan/repartition.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use crate::error::{DataFusionError, Result};
2727
use crate::physical_plan::hash_utils::create_hashes;
2828
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
2929
use arrow::record_batch::RecordBatch;
30-
use arrow::{array::Array, error::Result as ArrowResult};
30+
use arrow::{
31+
array::Array,
32+
error::{ArrowError, Result as ArrowResult},
33+
};
3134
use arrow::{compute::take, datatypes::SchemaRef};
3235
use tokio_stream::wrappers::UnboundedReceiverStream;
3336

@@ -369,9 +372,8 @@ impl RepartitionExec {
369372
.columns()
370373
.iter()
371374
.map(|c| {
372-
take(c.as_ref(), &indices, None).map_err(|e| {
373-
DataFusionError::Execution(e.to_string())
374-
})
375+
take(c.as_ref(), &indices, None)
376+
.map_err(DataFusionError::ArrowError)
375377
})
376378
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
377379
let output_batch =
@@ -426,9 +428,31 @@ impl RepartitionExec {
426428
}
427429
// Error from running input task
428430
Ok(Err(e)) => {
431+
// try to unwrap nested errors
432+
let mut err = &e;
433+
let message;
434+
loop {
435+
if let DataFusionError::External(ext_err) = err {
436+
message = ext_err.to_string();
437+
break;
438+
}
439+
let DataFusionError::ArrowError(arrow_err) = err else {
440+
message = err.to_string();
441+
break;
442+
};
443+
let ArrowError::ExternalError(ext_err) = arrow_err else {
444+
message = arrow_err.to_string();
445+
break;
446+
};
447+
let Some(df_err) = ext_err.downcast_ref::<DataFusionError>() else {
448+
message = ext_err.to_string();
449+
break;
450+
};
451+
err = df_err;
452+
}
429453
for (_, tx) in txs {
430454
// wrap it because need to send error to all output partitions
431-
let err = DataFusionError::Execution(e.to_string());
455+
let err = DataFusionError::Execution(message.clone());
432456
let err = Err(err.into());
433457
tx.send(Some(err)).ok();
434458
}

0 commit comments

Comments
 (0)