Skip to content

Commit e892096

Browse files
committed
Propagate collected metrics optionally
1 parent 4867a18 commit e892096

File tree

13 files changed

+189
-51
lines changed

13 files changed

+189
-51
lines changed

src/channel_resolver_ext.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::DistributedConfig;
2+
use crate::config_extension_ext::set_distributed_option_extension;
23
use arrow_flight::flight_service_client::FlightServiceClient;
34
use async_trait::async_trait;
45
use datafusion::common::exec_err;
@@ -17,10 +18,10 @@ pub(crate) fn set_distributed_channel_resolver(
1718
if let Some(distributed_cfg) = opts.extensions.get_mut::<DistributedConfig>() {
1819
distributed_cfg.__private_channel_resolver = channel_resolver_ext;
1920
} else {
20-
opts.extensions.insert(DistributedConfig {
21+
set_distributed_option_extension(cfg, DistributedConfig {
2122
__private_channel_resolver: channel_resolver_ext,
2223
..Default::default()
23-
});
24+
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
2425
}
2526
}
2627

src/config_extension_ext.rs

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ pub(crate) fn set_distributed_option_extension<T: ConfigExtension + Default>(
1919
let mut meta = HeaderMap::new();
2020

2121
for entry in t.entries() {
22+
// assume that fields starting with "__" are private, and are not supposed to be sent
23+
// over the wire. This accounts for the fact that we need to send our DistributedConfig
24+
// options without setting the __private_task_estimator and __private_channel_resolver.
25+
// Ideally those two fields should not even be there on the first place, but until
26+
// https://github.com/apache/datafusion/pull/18739 we need to put them there.
27+
if entry.key.starts_with("__") {
28+
continue;
29+
}
2230
if let Some(value) = entry.value {
2331
meta.insert(
2432
HeaderName::from_str(&format!(
@@ -44,29 +52,51 @@ pub(crate) fn set_distributed_option_extension<T: ConfigExtension + Default>(
4452
Ok(())
4553
}
4654

47-
pub(crate) fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
48-
cfg: &mut SessionConfig,
55+
pub(crate) fn set_distributed_option_extension_from_headers<'a, T: ConfigExtension + Default>(
56+
cfg: &'a mut SessionConfig,
4957
headers: &HeaderMap,
50-
) -> Result<(), DataFusionError> {
51-
let mut result = T::default();
52-
let mut found_some = false;
58+
) -> Result<&'a T, DataFusionError> {
59+
enum MutOrOwned<'a, T> {
60+
Mut(&'a mut T),
61+
Owned(T),
62+
}
63+
64+
impl<'a, T> MutOrOwned<'a, T> {
65+
fn as_mut(&mut self) -> &mut T {
66+
match self {
67+
MutOrOwned::Mut(v) => v,
68+
MutOrOwned::Owned(v) => v,
69+
}
70+
}
71+
}
72+
73+
// If the config extension existed before, we want to modify instead of adding a new one from
74+
// scratch. If not, we'll start from scratch with a new one.
75+
let mut result = match cfg.options_mut().extensions.get_mut::<T>() {
76+
Some(v) => MutOrOwned::Mut(v),
77+
None => MutOrOwned::Owned(T::default()),
78+
};
79+
5380
for (k, v) in headers.iter() {
5481
let key = k.as_str().trim_start_matches(FLIGHT_METADATA_CONFIG_PREFIX);
5582
let prefix = format!("{}.", T::PREFIX);
5683
if key.starts_with(&prefix) {
57-
found_some = true;
58-
result.set(
84+
result.as_mut().set(
5985
key.trim_start_matches(&prefix),
6086
v.to_str()
6187
.map_err(|err| internal_datafusion_err!("Cannot parse header value: {err}"))?,
6288
)?;
6389
}
6490
}
65-
if !found_some {
66-
return Ok(());
91+
92+
// Only insert the extension if it is not already there. If this is otherwise MutOrOwned::Mut it
93+
// means that the extension was already there, and we already modified it.
94+
if let MutOrOwned::Owned(v) = result {
95+
cfg.options_mut().extensions.insert(v);
6796
}
68-
cfg.options_mut().extensions.insert(result);
69-
Ok(())
97+
cfg.options().extensions.get().ok_or_else(|| {
98+
internal_datafusion_err!("ProgrammingError: a config option extension was just inserted, but it was not immediately retrievable")
99+
})
70100
}
71101

72102
#[derive(Clone, Debug, Default)]
@@ -190,8 +220,15 @@ mod tests {
190220
&Default::default(),
191221
)?;
192222

193-
let extension = config.options().extensions.get::<CustomExtension>();
194-
assert!(extension.is_none());
223+
let extension = config
224+
.options()
225+
.extensions
226+
.get::<CustomExtension>()
227+
.unwrap();
228+
let default = CustomExtension::default();
229+
assert_eq!(extension.foo, default.foo);
230+
assert_eq!(extension.bar, default.bar);
231+
assert_eq!(extension.baz, default.baz);
195232

196233
Ok(())
197234
}
@@ -207,8 +244,15 @@ mod tests {
207244

208245
set_distributed_option_extension_from_headers::<CustomExtension>(&mut config, &header_map)?;
209246

210-
let extension = config.options().extensions.get::<CustomExtension>();
211-
assert!(extension.is_none());
247+
let extension = config
248+
.options()
249+
.extensions
250+
.get::<CustomExtension>()
251+
.unwrap();
252+
let default = CustomExtension::default();
253+
assert_eq!(extension.foo, default.foo);
254+
assert_eq!(extension.bar, default.bar);
255+
assert_eq!(extension.baz, default.baz);
212256

213257
Ok(())
214258
}

src/distributed_ext.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,13 @@ pub trait DistributedExt: Sized {
358358
&mut self,
359359
factor: f64,
360360
) -> Result<(), DataFusionError>;
361+
362+
/// Enables metrics collection across network boundaries so that all the metrics gather in
363+
/// each node are accessible from the head stage that started running the query.
364+
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
365+
366+
/// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
367+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
361368
}
362369

363370
impl DistributedExt for SessionConfig {
@@ -372,7 +379,8 @@ impl DistributedExt for SessionConfig {
372379
&mut self,
373380
headers: &HeaderMap,
374381
) -> Result<(), DataFusionError> {
375-
set_distributed_option_extension_from_headers::<T>(self, headers)
382+
set_distributed_option_extension_from_headers::<T>(self, headers)?;
383+
Ok(())
376384
}
377385

378386
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
@@ -415,6 +423,12 @@ impl DistributedExt for SessionConfig {
415423
Ok(())
416424
}
417425

426+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
427+
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
428+
d_cfg.collect_metrics = enabled;
429+
Ok(())
430+
}
431+
418432
delegate! {
419433
to self {
420434
#[call(set_distributed_option_extension)]
@@ -448,6 +462,10 @@ impl DistributedExt for SessionConfig {
448462
#[call(set_distributed_cardinality_effect_task_scale_factor)]
449463
#[expr($?;Ok(self))]
450464
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
465+
466+
#[call(set_distributed_metrics_collection)]
467+
#[expr($?;Ok(self))]
468+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
451469
}
452470
}
453471
}
@@ -494,6 +512,11 @@ impl DistributedExt for SessionStateBuilder {
494512
#[call(set_distributed_cardinality_effect_task_scale_factor)]
495513
#[expr($?;Ok(self))]
496514
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
515+
516+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
517+
#[call(set_distributed_metrics_collection)]
518+
#[expr($?;Ok(self))]
519+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
497520
}
498521
}
499522
}
@@ -540,6 +563,11 @@ impl DistributedExt for SessionState {
540563
#[call(set_distributed_cardinality_effect_task_scale_factor)]
541564
#[expr($?;Ok(self))]
542565
fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
566+
567+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
568+
#[call(set_distributed_metrics_collection)]
569+
#[expr($?;Ok(self))]
570+
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
543571
}
544572
}
545573
}
@@ -586,6 +614,11 @@ impl DistributedExt for SessionContext {
586614
#[call(set_distributed_cardinality_effect_task_scale_factor)]
587615
#[expr($?;Ok(self))]
588616
fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
617+
618+
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
619+
#[call(set_distributed_metrics_collection)]
620+
#[expr($?;Ok(self))]
621+
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
589622
}
590623
}
591624
}

src/distributed_planner/distributed_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ extensions_options! {
2424
/// - If a node reduces the cardinality of the data, this factor will decrease.
2525
/// - In any other situation, this factor is left intact.
2626
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
27+
/// Propagate collected metrics from all nodes in the plan across network boundaries
28+
/// so that they can be reconstructed on the head node of the plan.
29+
pub collect_metrics: bool, default = false
2730
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
2831
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
2932
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()

src/distributed_planner/task_estimator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::config_extension_ext::set_distributed_option_extension;
12
use crate::{ChannelResolver, DistributedConfig, PartitionIsolatorExec};
23
use datafusion::catalog::memory::DataSourceExec;
34
use datafusion::config::ConfigOptions;
@@ -103,10 +104,10 @@ pub(crate) fn set_distributed_task_estimator(
103104
} else {
104105
let mut estimators = CombinedTaskEstimator::default();
105106
estimators.user_provided.push(Arc::new(estimator));
106-
opts.extensions.insert(DistributedConfig {
107+
set_distributed_option_extension(cfg, DistributedConfig {
107108
__private_task_estimator: estimators,
108109
..Default::default()
109-
});
110+
}).expect("Calling set_distributed_option_extension with a default DistributedConfig should never fail");
110111
}
111112
}
112113

src/execution_plans/common.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use crate::DistributedConfig;
12
use datafusion::common::{DataFusionError, plan_err};
23
use datafusion::physical_expr::Partitioning;
34
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
5+
use http::HeaderMap;
46
use std::borrow::Borrow;
57
use std::sync::Arc;
68

@@ -40,3 +42,16 @@ pub(super) fn scale_partitioning(
4042
Partitioning::UnknownPartitioning(p) => Partitioning::UnknownPartitioning(f(*p)),
4143
}
4244
}
45+
46+
/// Manual propagation of the [DistributedConfig] fields relevant for execution. Can be removed
47+
/// after <link to issue> is fixed, as this will become automatic.
48+
pub(super) fn manually_propagate_distributed_config(
49+
mut headers: HeaderMap,
50+
d_cfg: &DistributedConfig,
51+
) -> HeaderMap {
52+
headers.insert(
53+
"distributed.collect_metrics",
54+
d_cfg.collect_metrics.to_string().parse().unwrap(),
55+
);
56+
headers
57+
}

src/execution_plans/network_coalesce.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err};
4-
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
4+
use crate::execution_plans::common::{
5+
manually_propagate_distributed_config, require_one_child, scale_partitioning_props,
6+
};
57
use crate::flight_service::DoGet;
68
use crate::metrics::MetricsCollectingStream;
79
use crate::metrics::proto::MetricsSetProto;
810
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
911
use crate::stage::{MaybeEncodedPlan, Stage};
10-
use crate::{ChannelResolver, DistributedTaskContext};
12+
use crate::{ChannelResolver, DistributedConfig, DistributedTaskContext};
1113
use arrow_flight::Ticket;
1214
use arrow_flight::decode::FlightRecordBatchStream;
1315
use arrow_flight::error::FlightError;
@@ -18,7 +20,7 @@ use datafusion::error::DataFusionError;
1820
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1921
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2022
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
21-
use futures::{TryFutureExt, TryStreamExt};
23+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
2224
use http::Extensions;
2325
use prost::Message;
2426
use std::any::Any;
@@ -258,6 +260,9 @@ impl ExecutionPlan for NetworkCoalesceExec {
258260
// get the channel manager and current stage from our context
259261
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
260262

263+
let d_cfg = DistributedConfig::from_config_options(context.session_config().options())?;
264+
let retrieve_metrics = d_cfg.collect_metrics;
265+
261266
let input_stage = &self_ready.input_stage;
262267
let encoded_input_plan = input_stage.plan.encoded()?;
263268

@@ -273,8 +278,10 @@ impl ExecutionPlan for NetworkCoalesceExec {
273278
let target_task = partition / partitions_per_task;
274279
let target_partition = partition % partitions_per_task;
275280

281+
// TODO: this propagation should be automatic <link to issue>
282+
let context_headers = manually_propagate_distributed_config(context_headers, d_cfg);
276283
let ticket = Request::from_parts(
277-
MetadataMap::from_headers(context_headers.clone()),
284+
MetadataMap::from_headers(context_headers),
278285
Extensions::default(),
279286
Ticket {
280287
ticket: DoGet {
@@ -311,13 +318,14 @@ impl ExecutionPlan for NetworkCoalesceExec {
311318
.into_inner()
312319
.map_err(|err| FlightError::Tonic(Box::new(err)));
313320

314-
let metrics_collecting_stream =
315-
MetricsCollectingStream::new(stream, metrics_collection_capture);
321+
let stream = if retrieve_metrics {
322+
MetricsCollectingStream::new(stream, metrics_collection_capture).boxed()
323+
} else {
324+
stream.boxed()
325+
};
316326

317-
Ok(
318-
FlightRecordBatchStream::new_from_flight_data(metrics_collecting_stream)
319-
.map_err(map_flight_to_datafusion_error),
320-
)
327+
Ok(FlightRecordBatchStream::new_from_flight_data(stream)
328+
.map_err(map_flight_to_datafusion_error))
321329
}
322330
.try_flatten_stream();
323331

0 commit comments

Comments
 (0)