Skip to content

Commit 2680f42

Browse files
committed
Format
1 parent 5808d24 commit 2680f42

File tree

4 files changed

+56
-35
lines changed

4 files changed

+56
-35
lines changed

src/explain.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,9 @@ impl ExecutionPlan for DistributedExplainExec {
160160
self.distributed_stages.as_str(),
161161
]);
162162

163-
let batch = RecordBatch::try_new(
164-
schema.clone(),
165-
vec![Arc::new(plan_types), Arc::new(plans)],
166-
).map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
163+
let batch =
164+
RecordBatch::try_new(schema.clone(), vec![Arc::new(plan_types), Arc::new(plans)])
165+
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
167166

168167
// Use MemoryStream which is designed for DataFusion execution plans
169168
let stream = MemoryStream::try_new(vec![batch], schema, None)?;

src/flight_handlers.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,10 @@ impl FlightRequestHandler {
136136
Ok(Response::new(flight_info))
137137
}
138138

139-
pub async fn handle_substrait_info_request(&self, substrait_plan: datafusion_substrait::substrait::proto::Plan) -> Result<Response<FlightInfo>, Status> {
139+
pub async fn handle_substrait_info_request(
140+
&self,
141+
substrait_plan: datafusion_substrait::substrait::proto::Plan,
142+
) -> Result<Response<FlightInfo>, Status> {
140143
let query_plan = self
141144
.planner
142145
.prepare_substrait_query(substrait_plan)
@@ -150,7 +153,7 @@ impl FlightRequestHandler {
150153
query_plan.worker_addresses,
151154
query_plan.final_stage_id,
152155
query_plan.schema,
153-
None // Regular queries don't have explain data
156+
None, // Regular queries don't have explain data
154157
)?;
155158

156159
trace!("get_flight_info_statement done");

src/proxy_service.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use std::sync::Arc;
1919

2020
use anyhow::{anyhow, Context};
2121
use arrow_flight::{
22-
flight_service_server::FlightServiceServer, sql::TicketStatementQuery, FlightDescriptor, FlightInfo, Ticket
22+
flight_service_server::FlightServiceServer, sql::TicketStatementQuery, FlightDescriptor,
23+
FlightInfo, Ticket,
2324
};
2425
use datafusion_substrait::substrait;
2526
use parking_lot::Mutex;
@@ -106,14 +107,14 @@ impl FlightSqlHandler for DfRayProxyHandler {
106107
_request: Request<FlightDescriptor>,
107108
) -> Result<Response<FlightInfo>, Status> {
108109
let plan = match &substrait.plan {
109-
Some(substrait_plan) => {
110-
substrait::proto::Plan::decode(substrait_plan.plan.as_ref())
111-
.map_err(|e| Status::invalid_argument(format!("Invalid Substrait plan: {e}")))?
112-
}
110+
Some(substrait_plan) => substrait::proto::Plan::decode(substrait_plan.plan.as_ref())
111+
.map_err(|e| Status::invalid_argument(format!("Invalid Substrait plan: {e}")))?,
113112
None => return Err(Status::invalid_argument("Missing Substrait plan")),
114113
};
115114

116-
self.flight_handler.handle_substrait_info_request(plan).await
115+
self.flight_handler
116+
.handle_substrait_info_request(plan)
117+
.await
117118
}
118119
}
119120

@@ -230,7 +231,8 @@ mod tests {
230231
use super::*;
231232
// Test-specific imports
232233
use arrow_flight::{
233-
sql::{CommandStatementQuery, TicketStatementQuery}, FlightDescriptor, Ticket
234+
sql::{CommandStatementQuery, TicketStatementQuery},
235+
FlightDescriptor, Ticket,
234236
};
235237
use prost::Message;
236238
use tonic::Request;
@@ -361,5 +363,4 @@ mod tests {
361363
// TODO: Add tests for regular (non-explain) queries
362364
// We might need to create integration or end-to-end test infrastructure for this because
363365
// they need workers
364-
365366
}

src/query_planner.rs

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use arrow::datatypes::SchemaRef;
55
use datafusion::{
66
logical_expr::LogicalPlan, physical_plan::ExecutionPlan, prelude::SessionContext,
77
};
8-
use datafusion_substrait::logical_plan::{consumer::from_substrait_plan};
8+
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
99

1010
use crate::{
1111
explain::{is_explain_query, DistributedExplainExec},
@@ -75,8 +75,7 @@ impl QueryPlanner {
7575
let worker_addrs = get_worker_addresses()?;
7676

7777
// The last stage produces the data returned to the client.
78-
let final_stage = &base_result.distributed_stages
79-
[base_result.distributed_stages.len() - 1];
78+
let final_stage = &base_result.distributed_stages[base_result.distributed_stages.len() - 1];
8079
let schema = Arc::clone(&final_stage.plan.schema());
8180
let final_stage_id = final_stage.stage_id;
8281

@@ -112,7 +111,8 @@ impl QueryPlanner {
112111
let physical_plan = physical_planning(&logical_plan, &ctx).await?;
113112

114113
// divide the physical plan into chunks (stages) that we can distribute to workers later in dispatch_query_plan
115-
let (distributed_plan, distributed_stages) = execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
114+
let (distributed_plan, distributed_stages) =
115+
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
116116

117117
Ok(QueryPlanBase {
118118
query_id,
@@ -131,8 +131,13 @@ impl QueryPlanner {
131131
}
132132

133133
/// Prepare a distributed query (Substrait entry point)
134-
pub async fn prepare_substrait_query(&self, substrait_plan: datafusion_substrait::substrait::proto::Plan) -> Result<QueryPlan> {
135-
let base_result = self.prepare_substrait_query_base(substrait_plan, "SUBSTRAIT").await?;
134+
pub async fn prepare_substrait_query(
135+
&self,
136+
substrait_plan: datafusion_substrait::substrait::proto::Plan,
137+
) -> Result<QueryPlan> {
138+
let base_result = self
139+
.prepare_substrait_query_base(substrait_plan, "SUBSTRAIT")
140+
.await?;
136141
self.dispatch_query_plan(base_result).await
137142
}
138143

@@ -141,20 +146,25 @@ impl QueryPlanner {
141146
substrait_plan: datafusion_substrait::substrait::proto::Plan,
142147
query_type: &str,
143148
) -> Result<QueryPlanBase> {
144-
debug!("prepare_substrait_query_base: {} Substrait = {:#?}", query_type, substrait_plan);
145-
149+
debug!(
150+
"prepare_substrait_query_base: {} Substrait = {:#?}",
151+
query_type, substrait_plan
152+
);
153+
146154
let query_id = uuid::Uuid::new_v4().to_string();
147155
let ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
148156

149157
let logical_plan = from_substrait_plan(&ctx.state(), &substrait_plan)
150158
.await
151159
.map_err(|e| anyhow!("Failed to convert DataFusion Logical Plan: {e}"))?;
152160

153-
154-
let physical_plan = physical_planning(&logical_plan, &ctx).await.map_err(|e| anyhow!("Failed to convert DataFusion Physical Plan: {e}"))?;
161+
let physical_plan = physical_planning(&logical_plan, &ctx)
162+
.await
163+
.map_err(|e| anyhow!("Failed to convert DataFusion Physical Plan: {e}"))?;
155164

156165
// divide the physical plan into chunks (stages) that we can distribute to workers later in dispatch_query_plan
157-
let (distributed_plan, distributed_stages) = execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
166+
let (distributed_plan, distributed_stages) =
167+
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
158168

159169
Ok(QueryPlanBase {
160170
query_id,
@@ -246,18 +256,19 @@ impl QueryPlanner {
246256
#[cfg(test)]
247257
mod tests {
248258
use super::*;
249-
use std::{fs::{File}, path::Path};
250259
use std::io::BufReader;
251-
260+
use std::{fs::File, path::Path};
252261

253262
// //////////////////////////////////////////////////////////////
254263
// Test helper functions
255264
// //////////////////////////////////////////////////////////////
256265

257266
/// Set up mock worker environment for testing
258267
fn setup_mock_worker_env() {
259-
let mock_addrs = [("mock_worker_1".to_string(), "localhost:9001".to_string()),
260-
("mock_worker_2".to_string(), "localhost:9002".to_string())];
268+
let mock_addrs = [
269+
("mock_worker_1".to_string(), "localhost:9001".to_string()),
270+
("mock_worker_2".to_string(), "localhost:9002".to_string()),
271+
];
261272
let mock_env_value = mock_addrs
262273
.iter()
263274
.map(|(name, addr)| format!("{}/{}", name, addr))
@@ -303,20 +314,27 @@ mod tests {
303314
let planner = QueryPlanner::new();
304315

305316
// Read the JSON plan and convert to binary Substrait protobuf bytes
306-
let plan = serde_json::from_reader::<_, datafusion_substrait::substrait::proto::Plan>(BufReader::new(
307-
File::open(Path::new("testdata/substrait/select_one.substrait.json")).expect("file not found"),
308-
)).expect("failed to parse json");
309-
317+
let plan = serde_json::from_reader::<_, datafusion_substrait::substrait::proto::Plan>(
318+
BufReader::new(
319+
File::open(Path::new("testdata/substrait/select_one.substrait.json"))
320+
.expect("file not found"),
321+
),
322+
)
323+
.expect("failed to parse json");
324+
310325
let result = planner.prepare_substrait_query_base(plan, "TEST").await;
311-
326+
312327
if result.is_ok() {
313328
let query_plan_base = result.unwrap();
314329
// verify all fields have values
315330
assert!(!query_plan_base.query_id.is_empty());
316331
assert!(!query_plan_base.distributed_stages.is_empty());
317332
assert!(!query_plan_base.physical_plan.schema().fields().is_empty());
318333
// logical plan of select 1 on empty relation
319-
assert_eq!(query_plan_base.logical_plan.to_string(), "Projection: Int64(1) AS test_col\n Values: (Int64(0))");
334+
assert_eq!(
335+
query_plan_base.logical_plan.to_string(),
336+
"Projection: Int64(1) AS test_col\n Values: (Int64(0))"
337+
);
320338
// physical plan of select 1 on empty releation is ProjectionExec
321339
assert_eq!(query_plan_base.physical_plan.name(), "ProjectionExec");
322340
} else {

0 commit comments

Comments
 (0)