|
20 | 20 | use datafusion::arrow::ipc::reader::StreamReader; |
21 | 21 | use std::convert::TryFrom; |
22 | 22 | use std::fs::File; |
| 23 | +use std::path::Path; |
23 | 24 | use std::pin::Pin; |
24 | 25 | use tokio_util::io::ReaderStream; |
25 | 26 |
|
26 | 27 | use arrow_flight::encode::FlightDataEncoderBuilder; |
27 | 28 | use arrow_flight::error::FlightError; |
28 | 29 | use ballista_core::error::BallistaError; |
| 30 | +use ballista_core::execution_plans::sort_shuffle::{ |
| 31 | + get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition, |
| 32 | +}; |
29 | 33 | use ballista_core::serde::decode_protobuf; |
30 | 34 | use ballista_core::serde::scheduler::Action as BallistaAction; |
31 | 35 | use datafusion::arrow::ipc::CompressionType; |
@@ -95,8 +99,42 @@ impl FlightService for BallistaFlightService { |
95 | 99 | decode_protobuf(&ticket.ticket).map_err(|e| from_ballista_err(&e))?; |
96 | 100 |
|
97 | 101 | match &action { |
98 | | - BallistaAction::FetchPartition { path, .. } => { |
99 | | - debug!("FetchPartition reading {path}"); |
| 102 | + BallistaAction::FetchPartition { |
| 103 | + path, partition_id, .. |
| 104 | + } => { |
| 105 | + debug!("FetchPartition reading partition {partition_id} from {path}"); |
| 106 | + let data_path = Path::new(path); |
| 107 | + |
| 108 | + // Check if this is a sort-based shuffle output |
| 109 | + if is_sort_shuffle_output(data_path) { |
| 110 | + debug!("Detected sort-based shuffle format for {path}"); |
| 111 | + let index_path = get_index_path(data_path); |
| 112 | + let stream = stream_sort_shuffle_partition( |
| 113 | + data_path, |
| 114 | + &index_path, |
| 115 | + *partition_id, |
| 116 | + ) |
| 117 | + .map_err(|e| from_ballista_err(&e))?; |
| 118 | + |
| 119 | + let schema = stream.schema(); |
| 120 | + // Map DataFusionError to FlightError |
| 121 | + let stream = stream.map_err(|e| FlightError::from(ArrowError::from(e))); |
| 122 | + |
| 123 | + let write_options: IpcWriteOptions = IpcWriteOptions::default() |
| 124 | + .try_with_compression(Some(CompressionType::LZ4_FRAME)) |
| 125 | + .map_err(|e| from_arrow_err(&e))?; |
| 126 | + let flight_data_stream = FlightDataEncoderBuilder::new() |
| 127 | + .with_schema(schema) |
| 128 | + .with_options(write_options) |
| 129 | + .build(stream) |
| 130 | + .map_err(|err| Status::from_error(Box::new(err))); |
| 131 | + |
| 132 | + return Ok(Response::new( |
| 133 | + Box::pin(flight_data_stream) as Self::DoGetStream |
| 134 | + )); |
| 135 | + } |
| 136 | + |
| 137 | + // Standard hash-based shuffle - read the entire file |
100 | 138 | let file = File::open(path) |
101 | 139 | .map_err(|e| { |
102 | 140 | BallistaError::General(format!( |
|
0 commit comments