Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ pub async fn distribute_stages(

// all stages to workers
let (task_datas, final_addrs) =
assign_to_workers(query_id, &stages, workers.values().collect(), codec)?;
assign_to_workers(query_id, stages, workers.values().collect(), codec)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LiaCastaneda made this &[DDStage] in this https://github.com/datafusion-contrib/datafusion-distributed/pull/48/files and would not need another & herer


// we retry this a few times to ensure that the workers are ready
// and can accept the stages
Expand Down
34 changes: 20 additions & 14 deletions src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,25 @@ impl DDProxyHandler {
}
}

pub fn create_flight_info_response(&self, query_plan: QueryPlan) -> Result<FlightInfo, Status> {
pub fn create_flight_info_response(
&self,
query_plan: QueryPlan,
) -> Result<FlightInfo, Box<Status>> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cargo clippy complains this status is over 176 bytes and we want to put this in heap instead

let mut flight_info = FlightInfo::new()
.try_with_schema(&query_plan.schema)
.map_err(|e| Status::internal(format!("Could not create flight info {e:?}")))?;
.map_err(|e| {
Box::new(Status::internal(format!(
"Could not create flight info {e:?}"
)))
})?;

let ticket_data = TicketStatementData {
query_id: query_plan.query_id,
stage_id: query_plan.final_stage_id,
stage_addrs: Some(query_plan.worker_addresses.into()),
schema: Some(
query_plan
.schema
.try_into()
.map_err(|e| Status::internal(format!("Could not convert schema {e:?}")))?,
),
schema: Some(query_plan.schema.try_into().map_err(|e| {
Box::new(Status::internal(format!("Could not convert schema {e:?}")))
})?),
};

let ticket = Ticket::new(
Expand Down Expand Up @@ -153,18 +157,18 @@ impl DDProxyHandler {
&self,
addrs: &Addrs,
expected_stage_id: u64,
) -> Result<(), Status> {
) -> Result<(), Box<Status>> {
if addrs.len() != 1 {
return Err(Status::internal(format!(
return Err(Box::new(Status::internal(format!(
"Expected exactly one stage in addrs, got {}",
addrs.len()
)));
))));
}
if !addrs.contains_key(&expected_stage_id) {
return Err(Status::internal(format!(
return Err(Box::new(Status::internal(format!(
"No addresses found for stage_id {} in addrs",
expected_stage_id
)));
))));
}
Ok(())
}
Expand Down Expand Up @@ -257,7 +261,8 @@ impl FlightSqlHandler for DDProxyHandler {
trace!("calculated addrs: {:?}", addrs);

// Validate that addrs contains exactly one stage
self.validate_single_stage_addrs(&addrs, tsd.stage_id)?;
self.validate_single_stage_addrs(&addrs, tsd.stage_id)
.map_err(|e| *e)?;

let stage_partition_addrs = addrs.get(&tsd.stage_id).ok_or_else(|| {
Status::internal(format!(
Expand Down Expand Up @@ -357,6 +362,7 @@ impl DDProxyService {
handler: self.handler.clone(),
};

#[allow(clippy::result_large_err)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function is passed to external FlightServiceServer::with_interceptor that use Status, we cannot change it to Box. So add clippy here to avoid complains

fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
println!("Intercepting request: {:?}", req);
debug!("Intercepting request: {:?}", req);
Expand Down
5 changes: 3 additions & 2 deletions src/query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use datafusion::{

use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan};
use insta::assert_snapshot;
use tokio_stream::StreamExt;

use crate::{
Expand Down Expand Up @@ -288,10 +287,12 @@ impl QueryPlanner {
}
}

pub mod tests {
#[cfg(test)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this to avoid compile warnings that we import wrong places

mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::displayable;
use insta::assert_snapshot;
use std::io::BufReader;
use std::{fs::File, path::Path};

Expand Down
5 changes: 3 additions & 2 deletions src/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct DDWorkerHandler {
done: Arc<Mutex<bool>>,

/// Optional customizer for our context and proto serde
#[allow(dead_code)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we use customizer else where but get complain here because it is not used. Put the deadcode here to avoid it.
Also, I wonder if the unit or integration test will help avoid this warning @LiaCastaneda

Copy link
Collaborator

@LiaCastaneda LiaCastaneda Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I missed something, but do we need to keep the customizer field in DDWorkerHandler? I don't see its being used in DDWorkerHandler, except for creating the PhysicalExtensionCodec , so in theory any customization will be automatically applied/used when we call try_decode on DDCodec 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in execute_plan_and_build_stream

if let Some(ref c) = self.customizer {
.

I think we need this to talk with different datasources

pub customizer: Option<Arc<dyn Customizer>>,

codec: Arc<dyn PhysicalExtensionCodec>,
Expand Down Expand Up @@ -165,8 +166,7 @@ impl DDWorkerHandler {
customizer
.clone()
.map(|c| c as Arc<dyn PhysicalExtensionCodec>)
.or(Some(Arc::new(DefaultPhysicalExtensionCodec {})))
.unwrap(),
.unwrap_or(Arc::new(DefaultPhysicalExtensionCodec {})),
));

Self {
Expand Down Expand Up @@ -502,6 +502,7 @@ impl DDWorkerHandler {

Ok(Box::pin(out_stream))
}
#[allow(clippy::result_large_err)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. This function implements trait FlightHandler that return Status

fn do_action_get_host(&self) -> Result<Response<crate::flight::DoActionStream>, Status> {
let addr = self.addr.clone();
let name = self.name.clone();
Expand Down
Loading