Skip to content

Commit 07e26de

Browse files
authored
Merge pull request #52 from datafusion-contrib/ntran/cu
Fix compile and clippy warnings
2 parents 6a48d5f + 77c74bf commit 07e26de

File tree

4 files changed

+27
-19
lines changed

4 files changed

+27
-19
lines changed

src/planning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ pub async fn distribute_stages(
460460

461461
// all stages to workers
462462
let (task_datas, final_addrs) =
463-
assign_to_workers(query_id, &stages, workers.values().collect(), codec)?;
463+
assign_to_workers(query_id, stages, workers.values().collect(), codec)?;
464464

465465
// we retry this a few times to ensure that the workers are ready
466466
// and can accept the stages

src/proxy_service.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,25 @@ impl DDProxyHandler {
7979
}
8080
}
8181

82-
pub fn create_flight_info_response(&self, query_plan: QueryPlan) -> Result<FlightInfo, Status> {
82+
pub fn create_flight_info_response(
83+
&self,
84+
query_plan: QueryPlan,
85+
) -> Result<FlightInfo, Box<Status>> {
8386
let mut flight_info = FlightInfo::new()
8487
.try_with_schema(&query_plan.schema)
85-
.map_err(|e| Status::internal(format!("Could not create flight info {e:?}")))?;
88+
.map_err(|e| {
89+
Box::new(Status::internal(format!(
90+
"Could not create flight info {e:?}"
91+
)))
92+
})?;
8693

8794
let ticket_data = TicketStatementData {
8895
query_id: query_plan.query_id,
8996
stage_id: query_plan.final_stage_id,
9097
stage_addrs: Some(query_plan.worker_addresses.into()),
91-
schema: Some(
92-
query_plan
93-
.schema
94-
.try_into()
95-
.map_err(|e| Status::internal(format!("Could not convert schema {e:?}")))?,
96-
),
98+
schema: Some(query_plan.schema.try_into().map_err(|e| {
99+
Box::new(Status::internal(format!("Could not convert schema {e:?}")))
100+
})?),
97101
};
98102

99103
let ticket = Ticket::new(
@@ -153,18 +157,18 @@ impl DDProxyHandler {
153157
&self,
154158
addrs: &Addrs,
155159
expected_stage_id: u64,
156-
) -> Result<(), Status> {
160+
) -> Result<(), Box<Status>> {
157161
if addrs.len() != 1 {
158-
return Err(Status::internal(format!(
162+
return Err(Box::new(Status::internal(format!(
159163
"Expected exactly one stage in addrs, got {}",
160164
addrs.len()
161-
)));
165+
))));
162166
}
163167
if !addrs.contains_key(&expected_stage_id) {
164-
return Err(Status::internal(format!(
168+
return Err(Box::new(Status::internal(format!(
165169
"No addresses found for stage_id {} in addrs",
166170
expected_stage_id
167-
)));
171+
))));
168172
}
169173
Ok(())
170174
}
@@ -257,7 +261,8 @@ impl FlightSqlHandler for DDProxyHandler {
257261
trace!("calculated addrs: {:?}", addrs);
258262

259263
// Validate that addrs contains exactly one stage
260-
self.validate_single_stage_addrs(&addrs, tsd.stage_id)?;
264+
self.validate_single_stage_addrs(&addrs, tsd.stage_id)
265+
.map_err(|e| *e)?;
261266

262267
let stage_partition_addrs = addrs.get(&tsd.stage_id).ok_or_else(|| {
263268
Status::internal(format!(
@@ -357,6 +362,7 @@ impl DDProxyService {
357362
handler: self.handler.clone(),
358363
};
359364

365+
#[allow(clippy::result_large_err)]
360366
fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
361367
println!("Intercepting request: {:?}", req);
362368
debug!("Intercepting request: {:?}", req);

src/query_planner.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use datafusion::{
1010

1111
use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
1212
use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan};
13-
use insta::assert_snapshot;
1413
use tokio_stream::StreamExt;
1514

1615
use crate::{
@@ -288,10 +287,12 @@ impl QueryPlanner {
288287
}
289288
}
290289

291-
pub mod tests {
290+
#[cfg(test)]
291+
mod tests {
292292
use super::*;
293293
use arrow::datatypes::{DataType, Field, Schema};
294294
use datafusion::physical_plan::displayable;
295+
use insta::assert_snapshot;
295296
use std::io::BufReader;
296297
use std::{fs::File, path::Path};
297298

src/worker_service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ struct DDWorkerHandler {
9999
done: Arc<Mutex<bool>>,
100100

101101
/// Optional customizer for our context and proto serde
102+
#[allow(dead_code)]
102103
pub customizer: Option<Arc<dyn Customizer>>,
103104

104105
codec: Arc<dyn PhysicalExtensionCodec>,
@@ -165,8 +166,7 @@ impl DDWorkerHandler {
165166
customizer
166167
.clone()
167168
.map(|c| c as Arc<dyn PhysicalExtensionCodec>)
168-
.or(Some(Arc::new(DefaultPhysicalExtensionCodec {})))
169-
.unwrap(),
169+
.unwrap_or(Arc::new(DefaultPhysicalExtensionCodec {})),
170170
));
171171

172172
Self {
@@ -502,6 +502,7 @@ impl DDWorkerHandler {
502502

503503
Ok(Box::pin(out_stream))
504504
}
505+
#[allow(clippy::result_large_err)]
505506
fn do_action_get_host(&self) -> Result<Response<crate::flight::DoActionStream>, Status> {
506507
let addr = self.addr.clone();
507508
let name = self.name.clone();

0 commit comments

Comments
 (0)