Skip to content

Commit e0db0e3

Browse files
committed
clean up python tpch client a bit
1 parent 2a7be3a commit e0db0e3

File tree

4 files changed

+74
-46
lines changed

4 files changed

+74
-46
lines changed

scripts/launch_python_arrowflightsql_client.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def run_sql(sql_query):
185185
rich_table.add_row(*row_data)
186186
187187
console = Console()
188-
console.print(rich_table)
188+
console.print(rich_table, markup=False)
189189
190190
except Exception as e:
191191
print(f"Error executing SQL query: {str(e)}")

src/analyze.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ pub struct DistributedAnalyzeRootExec {
115115
impl DistributedAnalyzeRootExec {
116116
pub fn new(input: Arc<dyn ExecutionPlan>, verbose: bool, show_statistics: bool) -> Self {
117117
let field_a = Field::new("Task", DataType::Utf8, false);
118-
let field_b = Field::new("Plan", DataType::Utf8, false);
118+
let field_b = Field::new("Plan with Metrics", DataType::Utf8, false);
119119
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
120120

121121
let properties = PlanProperties::new(
@@ -226,7 +226,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
226226
.clone();
227227

228228
// we want to gather all partitions
229-
let coalesce = CoalescePartitionsExec::new(self.input.clone());
229+
let coalesce = Arc::new(CoalescePartitionsExec::new(self.input.clone()));
230230

231231
let mut input_stream = coalesce.execute(partition, context)?;
232232

@@ -272,10 +272,10 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
272272

273273
tasks.sort_by_key(|t| (t.stage_id, t.partition_group.clone()));
274274

275+
trace!("sorted tasks: {:?}", tasks);
276+
275277
let mut task_builder = StringBuilder::with_capacity(1, 1024);
276278
let mut plan_builder = StringBuilder::with_capacity(1, 1024);
277-
task_builder.append_value("Task");
278-
plan_builder.append_value("Plan with Metrics");
279279

280280
for task_output in tasks.iter() {
281281
task_builder.append_value(format!(
@@ -285,7 +285,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
285285
task_output
286286
.host
287287
.as_ref()
288-
.map(|h| h.to_string())
288+
.map(|h| format!("{} {}", h.name, h.addr))
289289
.unwrap_or("Unknown".to_string())
290290
));
291291
plan_builder.append_value(&task_output.plan);

src/query_planner.rs

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::sync::Arc;
22

33
use anyhow::anyhow;
4-
use arrow::datatypes::SchemaRef;
4+
use arrow::{compute::concat_batches, datatypes::SchemaRef};
55
use datafusion::{
6-
logical_expr::LogicalPlan, physical_plan::ExecutionPlan, prelude::SessionContext,
6+
logical_expr::LogicalPlan,
7+
physical_plan::{coalesce_partitions::CoalescePartitionsExec, ExecutionPlan},
8+
prelude::SessionContext,
79
};
810

911
use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan};
12+
use tokio_stream::StreamExt;
1013

1114
use crate::{
1215
explain::build_explain_batch,
@@ -77,6 +80,7 @@ impl QueryPlanner {
7780
match logical_plan {
7881
p @ LogicalPlan::Explain(_) => self.prepare_explain(p, ctx).await,
7982
// add other logical plans for local execution here following the pattern for explain
83+
p @ LogicalPlan::DescribeTable(_) => self.prepare_local(p, ctx).await,
8084
p => self.prepare_query(p, ctx).await,
8185
}
8286
}
@@ -89,6 +93,7 @@ impl QueryPlanner {
8993
match logical_plan {
9094
p @ LogicalPlan::Explain(_) => self.prepare_explain(p, ctx).await,
9195
// add other logical plans for local execution here following the pattern for explain
96+
p @ LogicalPlan::DescribeTable(_) => self.prepare_local(p, ctx).await,
9297
p => self.prepare_query(p, ctx).await,
9398
}
9499
}
@@ -103,44 +108,30 @@ impl QueryPlanner {
103108
self.send_it(logical_plan, physical_plan, ctx).await
104109
}
105110

106-
async fn send_it(
111+
async fn prepare_local(
107112
&self,
108113
logical_plan: LogicalPlan,
109-
physical_plan: Arc<dyn ExecutionPlan>,
110114
ctx: SessionContext,
111115
) -> Result<QueryPlan> {
112-
let query_id = uuid::Uuid::new_v4().to_string();
113-
114-
// divide the physical plan into chunks (tasks) that we can distribute to workers
115-
let (distributed_plan, distributed_stages) =
116-
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
116+
let physical_plan = physical_planning(&logical_plan, &ctx).await?;
117117

118-
let worker_addrs = get_worker_addresses()?;
118+
// execute it locally
119+
let mut stream =
120+
Arc::new(CoalescePartitionsExec::new(physical_plan)).execute(0, ctx.task_ctx())?;
121+
let mut batches = vec![];
119122

120-
// gather some information we need to send back such that
121-
// we can send a ticket to the client
122-
let final_stage = &distributed_stages[distributed_stages.len() - 1];
123-
let schema = Arc::clone(&final_stage.plan.schema());
124-
let final_stage_id = final_stage.stage_id;
123+
while let Some(batch) = stream.next().await {
124+
batches.push(batch?);
125+
}
125126

126-
// distribute the stages to workers, further dividing them up
127-
// into chunks of partitions (partition_groups)
128-
let (final_workers, tasks) =
129-
distribute_stages(&query_id, distributed_stages, worker_addrs).await?;
127+
if batches.is_empty() {
128+
return Err(anyhow!("No data returned from local execution").into());
129+
}
130130

131-
let qp = QueryPlan {
132-
query_id,
133-
session_context: ctx,
134-
worker_addresses: final_workers,
135-
final_stage_id,
136-
schema,
137-
logical_plan,
138-
physical_plan,
139-
distributed_plan,
140-
distributed_tasks: tasks,
141-
};
131+
let combined_batch = concat_batches(&batches[0].schema(), &batches)?;
132+
let physical_plan = Arc::new(RecordBatchExec::new(combined_batch));
142133

143-
Ok(qp)
134+
self.send_it(logical_plan, physical_plan, ctx).await
144135
}
145136

146137
async fn prepare_explain(
@@ -172,4 +163,43 @@ impl QueryPlanner {
172163
)
173164
.await
174165
}
166+
async fn send_it(
167+
&self,
168+
logical_plan: LogicalPlan,
169+
physical_plan: Arc<dyn ExecutionPlan>,
170+
ctx: SessionContext,
171+
) -> Result<QueryPlan> {
172+
let query_id = uuid::Uuid::new_v4().to_string();
173+
174+
// divide the physical plan into chunks (tasks) that we can distribute to workers
175+
let (distributed_plan, distributed_stages) =
176+
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
177+
178+
let worker_addrs = get_worker_addresses()?;
179+
180+
// gather some information we need to send back such that
181+
// we can send a ticket to the client
182+
let final_stage = &distributed_stages[distributed_stages.len() - 1];
183+
let schema = Arc::clone(&final_stage.plan.schema());
184+
let final_stage_id = final_stage.stage_id;
185+
186+
// distribute the stages to workers, further dividing them up
187+
// into chunks of partitions (partition_groups)
188+
let (final_workers, tasks) =
189+
distribute_stages(&query_id, distributed_stages, worker_addrs).await?;
190+
191+
let qp = QueryPlan {
192+
query_id,
193+
session_context: ctx,
194+
worker_addresses: final_workers,
195+
final_stage_id,
196+
schema,
197+
logical_plan,
198+
physical_plan,
199+
distributed_plan,
200+
distributed_tasks: tasks,
201+
};
202+
203+
Ok(qp)
204+
}
175205
}

tests/common/mod.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ except Exception as e:
220220
print(f"Error executing distributed query: {{str(e)}}", file=sys.stderr)
221221
sys.exit(1)
222222
"#,
223-
self.get_proxy_address().split(':').last().unwrap()
223+
self.get_proxy_address().split(':').next_back().unwrap()
224224
);
225225

226226
let script_path = Self::write_temp_file(
@@ -271,15 +271,13 @@ except Exception as e:
271271
for port in &ports {
272272
// Find and kill processes using lsof
273273
if let Ok(output) = Command::new("lsof")
274-
.args(&["-ti", &format!(":{}", port)])
274+
.args(["-ti", &format!(":{}", port)])
275275
.output()
276276
{
277277
let stdout = String::from_utf8_lossy(&output.stdout);
278278
for line in stdout.lines() {
279279
if let Ok(pid) = line.trim().parse::<u32>() {
280-
let _ = Command::new("kill")
281-
.args(&["-9", &pid.to_string()])
282-
.output();
280+
let _ = Command::new("kill").args(["-9", &pid.to_string()]).output();
283281
println!(" 🔥 Killed process {} on port {}", pid, port);
284282
}
285283
}
@@ -362,7 +360,7 @@ except Exception as e:
362360

363361
// Check if packages are already installed
364362
let check_cmd = Command::new("python3")
365-
.args(&["-c", "import adbc_driver_manager; import adbc_driver_flightsql; import duckdb; import pyarrow; print('OK')"])
363+
.args(["-c", "import adbc_driver_manager; import adbc_driver_flightsql; import duckdb; import pyarrow; print('OK')"])
366364
.output();
367365

368366
if let Ok(output) = check_cmd {
@@ -449,7 +447,7 @@ except Exception as e:
449447
let worker = Self::spawn_process(
450448
binary_path_str,
451449
&["--mode", "worker", "--port", &port.to_string()],
452-
&[("DFRAY_TABLES", &tpch_tables), ("DFRAY_VIEWS", &tpch_views)],
450+
&[("DFRAY_TABLES", &tpch_tables), ("DFRAY_VIEWS", tpch_views)],
453451
&format!("start worker {}", i + 1),
454452
)?;
455453
self.worker_processes.push(worker);
@@ -473,7 +471,7 @@ except Exception as e:
473471
&[
474472
("DFRAY_WORKER_ADDRESSES", &worker_addresses),
475473
("DFRAY_TABLES", &tpch_tables),
476-
("DFRAY_VIEWS", &tpch_views),
474+
("DFRAY_VIEWS", tpch_views),
477475
],
478476
"start proxy",
479477
)?;
@@ -819,7 +817,7 @@ pub fn batches_to_sorted_strings(
819817
let mut in_table = false;
820818
let mut header_found = false;
821819

822-
for (_line_idx, line) in lines.iter().enumerate() {
820+
for line in lines.iter() {
823821
let trimmed = line.trim();
824822

825823
// Detect table boundaries (+---+)

0 commit comments

Comments
 (0)