Skip to content

Commit 7ffad58

Browse files
asdf
1 parent a638e6d commit 7ffad58

File tree

9 files changed

+2713
-64
lines changed

9 files changed

+2713
-64
lines changed

src/execution_plans/distributed.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
4-
use crate::protobuf::DistributedCodec;
4+
use crate::protobuf::{DistributedCodec, StageKey};
5+
use crate::stage::DisplayCtx;
56
use crate::{ExecutionTask, Stage};
7+
use bytes::Bytes;
68
use datafusion::common::exec_err;
79
use datafusion::common::tree_node::{Transformed, TreeNode};
10+
use datafusion::error::DataFusionError;
811
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
912
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
1013
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
1114
use rand::Rng;
1215
use std::any::Any;
13-
use std::fmt::Formatter;
16+
use std::f32::consts::E;
17+
use std::fmt::{format, Formatter};
1418
use std::sync::Arc;
19+
use std::sync::Mutex;
1520
use url::Url;
21+
use std::sync::OnceLock;
1622

17-
/// [ExecutionPlan] that executes the inner plan in distributed mode.
23+
/// [ExecutionPan] that executes the inner plan in distributed mode.
1824
/// Before executing it, two modifications are lazily performed on the plan:
1925
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
2026
/// channel resolver and assigned to each task in each stage.
@@ -23,11 +29,34 @@ use url::Url;
2329
#[derive(Debug, Clone)]
2430
pub struct DistributedExec {
2531
pub plan: Arc<dyn ExecutionPlan>,
32+
pub prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
33+
pub display_ctx: Option<DisplayCtx>,
2634
}
2735

2836
impl DistributedExec {
2937
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
30-
Self { plan }
38+
Self { plan, prepared_plan: Arc::new(Mutex::new(None)), display_ctx: None }
39+
}
40+
41+
/// Returns a special stage key used to identify the root "stage" of the distributed plan.
42+
/// TODO: reconcile this with display_plan_graphviz
43+
pub(crate) fn to_stage_key(&self) -> StageKey {
44+
StageKey {
45+
query_id: Bytes::new(),
46+
stage_id: 0 as u64,
47+
task_number: 0,
48+
}
49+
}
50+
51+
pub(crate) fn with_display_ctx(&self, display_ctx: DisplayCtx) -> Self {
52+
Self { display_ctx: Some(display_ctx), ..self.clone() }
53+
}
54+
55+
pub(crate) fn pepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
56+
self.prepared_plan.lock()
57+
.map_err(|e| DataFusionError::Internal(format!("Failed to lock prepared plan: {}", e)))?
58+
.clone()
59+
.ok_or(DataFusionError::Internal("No prepared plan found. Was .execute() called?".to_string()))
3160
}
3261

3362
fn prepare_plan(
@@ -99,6 +128,8 @@ impl ExecutionPlan for DistributedExec {
99128
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100129
Ok(Arc::new(DistributedExec {
101130
plan: require_one_child(&children)?,
131+
prepared_plan: self.prepared_plan.clone(),
132+
display_ctx: self.display_ctx.clone(),
102133
}))
103134
}
104135

@@ -120,7 +151,17 @@ impl ExecutionPlan for DistributedExec {
120151
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
121152
let codec = DistributedCodec::new_combined_with_user(context.session_config());
122153

123-
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
154+
let plan = {
155+
let mut guard = self.prepared_plan.lock().map_err(|e| DataFusionError::Internal(format!("Failed to lock prepared plan: {}", e)))?;
156+
match guard.clone() {
157+
Some(plan) => plan,
158+
None => {
159+
let prepared = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
160+
*guard = Some(prepared.clone());
161+
prepared
162+
}
163+
}
164+
};
124165
plan.execute(partition, context)
125166
}
126167
}

src/execution_plans/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ impl MetricsWrapperExec {
2727
children: None,
2828
}
2929
}
30+
31+
pub fn inner(&self) -> &Arc<dyn ExecutionPlan> {
32+
&self.inner
33+
}
3034
}
3135

3236
/// MetricsWrapperExec is invisible during display.

src/explain.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::stage::{DisplayCtx};
2+
use crate::metrics::MetricsCollectorResult;
3+
use crate::metrics::TaskMetricsCollector;
4+
use crate::metrics::proto::MetricsSetProto;
5+
use crate::metrics::proto::df_metrics_set_to_proto;
6+
use crate::{display_plan_ascii, NetworkBoundaryExt};
7+
use datafusion::common::tree_node::Transformed;
8+
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
9+
use datafusion::error::DataFusionError;
10+
use datafusion::physical_plan::ExecutionPlan;
11+
use datafusion::physical_plan::display::DisplayableExecutionPlan;
12+
use std::sync::Arc;
13+
use crate::execution_plans::DistributedExec;
14+
use bytes::Bytes;
15+
use crate::protobuf::StageKey;
16+
17+
18+
pub fn explain_analyze(executed: Arc<dyn ExecutionPlan>) -> Result<String, DataFusionError> {
19+
// Check if the plan is distributed by looking for a root DistributedExec.
20+
match executed.as_any().downcast_ref::<DistributedExec>() {
21+
None => Ok(display_plan_ascii(executed.as_ref())),
22+
Some(dist_exec) => {
23+
// If the plan was distributed, collect metrics from the coordinating stage exec.
24+
let MetricsCollectorResult {
25+
task_metrics,
26+
mut input_task_metrics,
27+
} = TaskMetricsCollector::new().collect(
28+
dist_exec.pepared_plan()?
29+
)?;
30+
println!("task_metrics: {}", task_metrics.len());
31+
input_task_metrics.insert(
32+
dist_exec.to_stage_key(),
33+
task_metrics
34+
.into_iter()
35+
.map(|metrics| df_metrics_set_to_proto(&metrics))
36+
.collect::<Result<Vec<MetricsSetProto>, DataFusionError>>()?,
37+
);
38+
39+
let keys = input_task_metrics.iter().map(|(k, v)| (k, v.len())).collect::<Vec<_>>();
40+
println!("items: {:#?}", keys);
41+
42+
43+
let display_ctx = DisplayCtx::new(input_task_metrics);
44+
Ok(display_plan_ascii(&dist_exec.with_display_ctx(display_ctx)))
45+
}
46+
}
47+
}

src/flight_service/do_get.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ impl ArrowFlightEndpoint {
143143
let stream = map_last_stream(stream, move |last| {
144144
if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 {
145145
task_data_entries.remove(key.clone());
146+
return last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el));
146147
}
147-
last.and_then(|el| collect_and_create_metrics_flight_data(key, plan, el))
148+
last
148149
});
149150

150151
Ok(Response::new(Box::pin(stream.map_err(|err| match err {

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod execution_plans;
99
mod flight_service;
1010
mod metrics;
1111
mod stage;
12+
mod explain;
1213

1314
mod protobuf;
1415
#[cfg(any(feature = "integration", test))]
@@ -28,3 +29,4 @@ pub use flight_service::{
2829
pub use stage::{
2930
DistributedTaskContext, ExecutionTask, Stage, display_plan_ascii, display_plan_graphviz,
3031
};
32+
pub use explain::explain_analyze;

src/metrics/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub(crate) mod proto;
33
mod task_metrics_collector;
44
mod task_metrics_rewriter;
55
pub(crate) use metrics_collecting_stream::MetricsCollectingStream;
6-
pub(crate) use task_metrics_collector::TaskMetricsCollector;
6+
pub(crate) use task_metrics_collector::{MetricsCollectorResult, TaskMetricsCollector};
7+
pub(crate) use task_metrics_rewriter::TaskMetricsRewriter;

0 commit comments

Comments
 (0)