Skip to content

Commit 49891aa

Browse files
committed
Add an error propagation mechanism to the Arrow Flight endpoint and the Arrow Flight reader
1 parent e7e95c3 commit 49891aa

File tree

2 files changed

+22
-14
lines changed

2 files changed

+22
-14
lines changed

src/flight_service/do_get.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
2+
use crate::errors::datafusion_error_to_tonic_status;
23
use crate::flight_service::service::ArrowFlightEndpoint;
34
use crate::plan::ArrowFlightReadExecProtoCodec;
45
use crate::stage_delegation::{ActorContext, StageContext};
@@ -140,22 +141,24 @@ impl ArrowFlightEndpoint {
140141
let stream_partitioner = self
141142
.partitioner_registry
142143
.get_or_create_stream_partitioner(stage_id, actor_idx, plan, partitioning)
143-
.map_err(|err| {
144-
Status::internal(format!("Could not create stream partitioner: {err}"))
145-
})?;
144+
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
146145

147146
let stream = stream_partitioner
148147
.execute(caller_actor_idx, state.task_ctx())
149-
.map_err(|err| Status::internal(format!("Cannot get stream partition: {err}")))?;
148+
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
150149

151-
// TODO: error propagation
152150
let flight_data_stream = FlightDataEncoderBuilder::new()
153151
.with_schema(stream_partitioner.schema())
154-
.build(stream.map_err(|err| FlightError::ExternalError(Box::new(err))));
155-
156-
Ok(Response::new(Box::pin(flight_data_stream.map_err(|err| {
157-
Status::internal(format!("Error during flight stream: {err}"))
158-
}))))
152+
.build(stream.map_err(|err| {
153+
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
154+
}));
155+
156+
Ok(Response::new(Box::pin(flight_data_stream.map_err(
157+
|err| match err {
158+
FlightError::Tonic(status) => *status,
159+
_ => Status::internal(format!("Error during flight stream: {err}")),
160+
},
161+
))))
159162
}
160163
}
161164

src/plan/arrow_flight_read.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::channel_manager::{ArrowFlightChannel, ChannelManager};
22
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
3+
use crate::errors::tonic_status_to_datafusion_error;
34
use crate::flight_service::{DoGet, DoPut};
45
use crate::plan::arrow_flight_read_proto::ArrowFlightReadExecProtoCodec;
56
use crate::stage_delegation::{ActorContext, StageContext, StageDelegation};
@@ -189,14 +190,18 @@ impl ExecutionPlan for ArrowFlightReadExec {
189190
let stream = client
190191
.do_get(ticket.into_request())
191192
.await
192-
.map_err(|err| DataFusionError::External(Box::new(err)))?
193+
.map_err(|err| tonic_status_to_datafusion_error(&err).unwrap_or_else(|| {
194+
DataFusionError::External(Box::new(err))
195+
}))?
193196
.into_inner()
194197
.map_err(|err| FlightError::Tonic(Box::new(err)));
195198

196199
Ok(FlightRecordBatchStream::new_from_flight_data(stream)
197-
// TODO: propagate the error from the service to here, probably serializing it
198-
// somehow.
199-
.map_err(|err| DataFusionError::External(Box::new(err))))
200+
.map_err(|err| match err {
201+
FlightError::Tonic(status) => tonic_status_to_datafusion_error(&status)
202+
.unwrap_or_else(|| DataFusionError::External(Box::new(status))),
203+
err => DataFusionError::External(Box::new(err))
204+
}))
200205
}.try_flatten_stream();
201206

202207
Ok(Box::pin(RecordBatchStreamAdapter::new(

0 commit comments

Comments
 (0)