Skip to content

Commit 3735991

Browse files
committed
feat: improve executor loggers
1 parent faa05af commit 3735991

File tree

5 files changed

+36
-8
lines changed

5 files changed

+36
-8
lines changed

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::error::BallistaError;
4848
use datafusion::execution::context::TaskContext;
4949
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
5050
use itertools::Itertools;
51-
use log::{error, info};
51+
use log::{debug, error};
5252
use rand::prelude::SliceRandom;
5353
use rand::thread_rng;
5454
use tokio::sync::{mpsc, Semaphore};
@@ -144,7 +144,7 @@ impl ExecutionPlan for ShuffleReaderExec {
144144
context: Arc<TaskContext>,
145145
) -> Result<SendableRecordBatchStream> {
146146
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
147-
info!("ShuffleReaderExec::execute({})", task_id);
147+
debug!("ShuffleReaderExec::execute({})", task_id);
148148

149149
// TODO make the maximum size configurable, or make it depends on global memory control
150150
let max_request_num = 50usize;
@@ -292,7 +292,7 @@ fn send_fetch_partitions(
292292
.into_iter()
293293
.partition(check_is_local_location);
294294

295-
info!(
295+
debug!(
296296
"local shuffle file counts:{}, remote shuffle file count:{}.",
297297
local_locations.len(),
298298
remote_locations.len()

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::arrow::ipc::CompressionType;
2525

2626
use datafusion::arrow::ipc::writer::StreamWriter;
2727
use std::any::Any;
28+
use std::fmt::Debug;
2829
use std::fs;
2930
use std::fs::File;
3031
use std::future::Future;
@@ -50,8 +51,8 @@ use datafusion::physical_plan::metrics::{
5051
};
5152

5253
use datafusion::physical_plan::{
53-
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
54-
SendableRecordBatchStream, Statistics,
54+
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
55+
PlanProperties, SendableRecordBatchStream, Statistics,
5556
};
5657
use futures::{StreamExt, TryFutureExt, TryStreamExt};
5758

@@ -80,9 +81,23 @@ pub struct ShuffleWriterExec {
8081
shuffle_output_partitioning: Option<Partitioning>,
8182
/// Execution metrics
8283
metrics: ExecutionPlanMetricsSet,
84+
/// Plan properties
8385
properties: PlanProperties,
8486
}
8587

88+
impl std::fmt::Display for ShuffleWriterExec {
89+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
90+
let printable_plan = displayable(self.plan.as_ref())
91+
.set_show_statistics(true)
92+
.indent(false);
93+
write!(
94+
f,
95+
"ShuffleWriterExec: job={} stage={} work_dir={} partitioning={:?} plan: \n {}",
96+
self.job_id, self.stage_id, self.work_dir, self.shuffle_output_partitioning, printable_plan
97+
)
98+
}
99+
}
100+
86101
pub struct WriteTracker {
87102
pub num_batches: usize,
88103
pub num_rows: usize,

ballista/core/src/extension.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,13 @@ impl SessionConfigHelperExt for SessionConfig {
286286
self.options()
287287
.entries()
288288
.iter()
289+
// TODO: revisit this log once we this option is removed
290+
//
291+
// filtering this key as it's creating a lot of warning logs
292+
// at the executor side.
293+
.filter(|c| {
294+
c.key != "datafusion.sql_parser.enable_options_value_normalization"
295+
})
289296
.map(|datafusion::config::ConfigEntry { key, value, .. }| {
290297
log::trace!("sending configuration key: `{}`, value`{:?}`", key, value);
291298
KeyValuePair {

ballista/executor/src/execution_engine.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion::error::{DataFusionError, Result};
2323
use datafusion::execution::context::TaskContext;
2424
use datafusion::physical_plan::metrics::MetricsSet;
2525
use datafusion::physical_plan::ExecutionPlan;
26-
use std::fmt::Debug;
26+
use std::fmt::{Debug, Display};
2727
use std::sync::Arc;
2828

2929
/// Execution engine extension point
@@ -42,7 +42,7 @@ pub trait ExecutionEngine: Sync + Send {
4242
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
4343
/// will use the ShuffleReaderExec to read these results.
4444
#[async_trait]
45-
pub trait QueryStageExecutor: Sync + Send + Debug {
45+
pub trait QueryStageExecutor: Sync + Send + Debug + Display {
4646
async fn execute_query_stage(
4747
&self,
4848
input_partition: usize,
@@ -95,6 +95,12 @@ impl DefaultQueryStageExec {
9595
}
9696
}
9797

98+
impl Display for DefaultQueryStageExec {
99+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100+
write!(f, "DefaultQueryStageExec:\n{}", self.shuffle_writer)
101+
}
102+
}
103+
98104
#[async_trait]
99105
impl QueryStageExecutor for DefaultQueryStageExec {
100106
async fn execute_query_stage(

ballista/executor/src/metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
4949
plan: Arc<dyn QueryStageExecutor>,
5050
) {
5151
info!(
52-
"=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n",
52+
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
5353
job_id, stage_id, partition, plan
5454
);
5555
}

0 commit comments

Comments
 (0)