Skip to content

Commit c78719c

Browse files
authored
Merge pull request #39 from datafusion-contrib/run-cargo-fmt
Run `cargo fmt`
2 parents c8c91e6 + 5c1e1ba commit c78719c

17 files changed

+384
-335
lines changed

src/codec.rs

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@ use std::sync::Arc;
22

33
use arrow::datatypes::Schema;
44
use datafusion::{
5-
common::{Result, internal_datafusion_err, internal_err},
5+
common::{internal_datafusion_err, internal_err, Result},
66
datasource::source::DataSourceExec,
77
execution::FunctionRegistry,
8-
physical_plan::{ExecutionPlan, displayable},
8+
physical_plan::{displayable, ExecutionPlan},
99
};
1010
use datafusion_proto::{
1111
physical_plan::{
12-
DefaultPhysicalExtensionCodec,
13-
PhysicalExtensionCodec,
14-
from_proto::parse_protobuf_partitioning,
15-
to_proto::serialize_partitioning,
12+
from_proto::parse_protobuf_partitioning, to_proto::serialize_partitioning,
13+
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
1614
},
1715
protobuf,
1816
};
@@ -23,11 +21,8 @@ use crate::{
2321
logging::trace,
2422
max_rows::MaxRowsExec,
2523
protobuf::{
26-
DfRayExecNode,
27-
DfRayStageReaderExecNode,
28-
MaxRowsExecNode,
24+
df_ray_exec_node::Payload, DfRayExecNode, DfRayStageReaderExecNode, MaxRowsExecNode,
2925
PartitionIsolatorExecNode,
30-
df_ray_exec_node::Payload,
3126
},
3227
stage_reader::DFRayStageReaderExec,
3328
};
@@ -95,16 +90,12 @@ impl PhysicalExtensionCodec for DFRayCodec {
9590
)))
9691
}
9792
}
98-
Payload::NumpangExec(_) => {
99-
Err(internal_datafusion_err!(
100-
"NumpangExec not supported in open source version"
101-
))
102-
}
103-
Payload::ContextExec(_) => {
104-
Err(internal_datafusion_err!(
105-
"ContextExec not supported in open source version"
106-
))
107-
}
93+
Payload::NumpangExec(_) => Err(internal_datafusion_err!(
94+
"NumpangExec not supported in open source version"
95+
)),
96+
Payload::ContextExec(_) => Err(internal_datafusion_err!(
97+
"ContextExec not supported in open source version"
98+
)),
10899
}
109100
} else {
110101
internal_err!("cannot decode proto extension in dfray codec")
@@ -169,16 +160,14 @@ mod test {
169160

170161
use arrow::datatypes::DataType;
171162
use datafusion::{
172-
physical_plan::{Partitioning, displayable},
163+
physical_plan::{displayable, Partitioning},
173164
prelude::SessionContext,
174165
};
175166
use datafusion_proto::physical_plan::AsExecutionPlan;
176167

177168
use super::*;
178169
use crate::{
179-
isolator::PartitionIsolatorExec,
180-
max_rows::MaxRowsExec,
181-
stage_reader::DFRayStageReaderExec,
170+
isolator::PartitionIsolatorExec, max_rows::MaxRowsExec, stage_reader::DFRayStageReaderExec,
182171
};
183172

184173
fn create_test_schema() -> Arc<arrow::datatypes::Schema> {

src/explain.rs

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{
19-
any::Any,
20-
fmt::Formatter,
21-
sync::Arc,
22-
};
18+
use std::{any::Any, fmt::Formatter, sync::Arc};
2319

24-
use arrow::{
25-
array::StringArray,
26-
datatypes::SchemaRef,
27-
record_batch::RecordBatch,
28-
};
20+
use arrow::{array::StringArray, datatypes::SchemaRef, record_batch::RecordBatch};
2921
use datafusion::{
3022
execution::TaskContext,
23+
physical_expr::EquivalenceProperties,
3124
physical_plan::{
25+
displayable,
3226
execution_plan::{Boundedness, EmissionType},
3327
memory::MemoryStream,
34-
ExecutionPlan, Partitioning,
35-
PlanProperties, DisplayAs, DisplayFormatType,
36-
SendableRecordBatchStream, displayable,
28+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
29+
SendableRecordBatchStream,
3730
},
38-
physical_expr::EquivalenceProperties,
3931
};
4032

4133
/// Custom distributed EXPLAIN execution plan that also returns distributed plan and stages
@@ -96,7 +88,10 @@ impl DistributedExplainExec {
9688
let mut result = String::new();
9789
for (i, stage) in stages.iter().enumerate() {
9890
result.push_str(&format!("Stage {}:\n", stage.stage_id));
99-
result.push_str(&format!(" Partition Groups: {:?}\n", stage.partition_groups));
91+
result.push_str(&format!(
92+
" Partition Groups: {:?}\n",
93+
stage.partition_groups
94+
));
10095
result.push_str(&format!(" Full Partitions: {}\n", stage.full_partitions));
10196
result.push_str(" Plan:\n");
10297
let plan_display = format!("{}", displayable(stage.plan.as_ref()).indent(true));
@@ -150,13 +145,13 @@ impl ExecutionPlan for DistributedExplainExec {
150145
_context: Arc<TaskContext>,
151146
) -> datafusion::error::Result<SendableRecordBatchStream> {
152147
let schema = self.schema.clone();
153-
148+
154149
// Create the result data with our 4 plan types
155150
let plan_types = StringArray::from(vec![
156-
"logical_plan",
157-
"physical_plan",
158-
"distributed_plan",
159-
"distributed_stages"
151+
"logical_plan",
152+
"physical_plan",
153+
"distributed_plan",
154+
"distributed_stages",
160155
]);
161156
let plans = StringArray::from(vec![
162157
self.logical_plan.as_str(),
@@ -165,14 +160,13 @@ impl ExecutionPlan for DistributedExplainExec {
165160
self.distributed_stages.as_str(),
166161
]);
167162

168-
let batch = RecordBatch::try_new(
169-
schema.clone(),
170-
vec![Arc::new(plan_types), Arc::new(plans)],
171-
).map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None))?;
163+
let batch =
164+
RecordBatch::try_new(schema.clone(), vec![Arc::new(plan_types), Arc::new(plans)])
165+
.map_err(|e| datafusion::error::DataFusionError::ArrowError(e, None))?;
172166

173167
// Use MemoryStream which is designed for DataFusion execution plans
174168
let stream = MemoryStream::try_new(vec![batch], schema, None)?;
175-
169+
176170
Ok(Box::pin(stream))
177171
}
178172

@@ -182,16 +176,20 @@ impl ExecutionPlan for DistributedExplainExec {
182176
}
183177

184178
/// Check if this is an EXPLAIN query (but not EXPLAIN ANALYZE)
185-
///
179+
///
186180
/// This function distinguishes between:
187181
/// - EXPLAIN queries (returns true) - show plan information only
188182
/// - EXPLAIN ANALYZE queries (returns false) - execute and show runtime stats
189183
/// - Regular queries (returns false) - normal query execution
190184
pub fn is_explain_query(query: &str) -> bool {
191185
let query_upper = query.trim().to_uppercase();
192186
// Must start with "EXPLAIN" followed by whitespace or end of string
193-
let is_explain = query_upper.starts_with("EXPLAIN") &&
194-
(query_upper.len() == 7 || query_upper.chars().nth(7).is_some_and(|c| c.is_whitespace()));
187+
let is_explain = query_upper.starts_with("EXPLAIN")
188+
&& (query_upper.len() == 7
189+
|| query_upper
190+
.chars()
191+
.nth(7)
192+
.is_some_and(|c| c.is_whitespace()));
195193
let is_explain_analyze = query_upper.starts_with("EXPLAIN ANALYZE");
196194
is_explain && !is_explain_analyze
197195
}
@@ -223,7 +221,7 @@ mod tests {
223221
// Test edge cases
224222
assert!(!is_explain_query(""));
225223
assert!(!is_explain_query(" "));
226-
assert!(!is_explain_query("EXPLAINSELECT")); // No space
224+
assert!(!is_explain_query("EXPLAINSELECT")); // No space
227225
assert!(is_explain_query("EXPLAIN")); // Just EXPLAIN
228226
}
229227
}

src/flight.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,9 @@
1818
use std::sync::Arc;
1919

2020
use arrow_flight::{
21-
Action,
22-
ActionType,
23-
Criteria,
24-
Empty,
25-
FlightData,
26-
FlightDescriptor,
27-
FlightInfo,
28-
HandshakeRequest,
29-
HandshakeResponse,
30-
PollInfo,
31-
PutResult,
32-
SchemaResult,
33-
Ticket,
34-
flight_service_server::FlightService,
35-
sql::server::FlightSqlService,
21+
flight_service_server::FlightService, sql::server::FlightSqlService, Action, ActionType,
22+
Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
23+
PollInfo, PutResult, SchemaResult, Ticket,
3624
};
3725
use futures::stream::BoxStream;
3826
use tonic::{Request, Response, Status, Streaming};
@@ -46,7 +34,7 @@ pub type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status
4634
pub trait FlightHandler: Send + Sync {
4735
async fn do_get(&self, request: Request<Ticket>) -> Result<Response<DoGetStream>, Status>;
4836
async fn do_action(&self, request: Request<Action>)
49-
-> Result<Response<DoActionStream>, Status>;
37+
-> Result<Response<DoActionStream>, Status>;
5038
}
5139

5240
pub struct FlightServ {

0 commit comments

Comments
 (0)