|
1 | 1 | use crate::composed_extension_codec::ComposedPhysicalExtensionCodec; |
| 2 | +use crate::errors::datafusion_error_to_tonic_status; |
2 | 3 | use crate::flight_service::service::ArrowFlightEndpoint; |
3 | 4 | use crate::plan::ArrowFlightReadExecProtoCodec; |
4 | 5 | use crate::stage_delegation::{ActorContext, StageContext}; |
@@ -140,22 +141,24 @@ impl ArrowFlightEndpoint { |
140 | 141 | let stream_partitioner = self |
141 | 142 | .partitioner_registry |
142 | 143 | .get_or_create_stream_partitioner(stage_id, actor_idx, plan, partitioning) |
143 | | - .map_err(|err| { |
144 | | - Status::internal(format!("Could not create stream partitioner: {err}")) |
145 | | - })?; |
| 144 | + .map_err(|err| datafusion_error_to_tonic_status(&err))?; |
146 | 145 |
|
147 | 146 | let stream = stream_partitioner |
148 | 147 | .execute(caller_actor_idx, state.task_ctx()) |
149 | | - .map_err(|err| Status::internal(format!("Cannot get stream partition: {err}")))?; |
| 148 | + .map_err(|err| datafusion_error_to_tonic_status(&err))?; |
150 | 149 |
|
151 | | - // TODO: error propagation |
152 | 150 | let flight_data_stream = FlightDataEncoderBuilder::new() |
153 | 151 | .with_schema(stream_partitioner.schema()) |
154 | | - .build(stream.map_err(|err| FlightError::ExternalError(Box::new(err)))); |
155 | | - |
156 | | - Ok(Response::new(Box::pin(flight_data_stream.map_err(|err| { |
157 | | - Status::internal(format!("Error during flight stream: {err}")) |
158 | | - })))) |
| 152 | + .build(stream.map_err(|err| { |
| 153 | + FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err))) |
| 154 | + })); |
| 155 | + |
| 156 | + Ok(Response::new(Box::pin(flight_data_stream.map_err( |
| 157 | + |err| match err { |
| 158 | + FlightError::Tonic(status) => *status, |
| 159 | + _ => Status::internal(format!("Error during flight stream: {err}")), |
| 160 | + }, |
| 161 | + )))) |
159 | 162 | } |
160 | 163 | } |
161 | 164 |
|
|
0 commit comments