Skip to content

Commit 9dabc1b

Browse files
address pr comments
1 parent 456cafd commit 9dabc1b

File tree

5 files changed

+12
-22
lines changed

5 files changed

+12
-22
lines changed

src/execution_plans/metrics.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ pub struct TaskMetricsCollector {
3838
}
3939

4040
/// MetricsCollectorResult is the result of collecting metrics from a task.
41-
#[allow(dead_code)]
4241
pub struct MetricsCollectorResult {
4342
// metrics is a collection of metrics for a task ordered using a pre-order traversal of the task's plan.
4443
pub(super) task_metrics: Vec<MetricsSet>,
@@ -108,7 +107,6 @@ impl TreeNodeRewriter for TaskMetricsCollector {
108107
}
109108

110109
impl TaskMetricsCollector {
111-
#[allow(dead_code)]
112110
pub fn new() -> Self {
113111
Self {
114112
task_metrics: Vec::new(),
@@ -120,7 +118,6 @@ impl TaskMetricsCollector {
120118
/// Returns
121119
/// - a vec representing the metrics for the current task (ordered using a pre-order traversal)
122120
/// - a map representing the metrics for some subset of child tasks collected from NetworkShuffleExec leaves
123-
#[allow(dead_code)]
124121
pub fn collect(
125122
mut self,
126123
plan: Arc<dyn ExecutionPlan>,
@@ -305,10 +302,10 @@ pub fn collect_and_create_metrics_flight_data(
305302
df_metrics_set_to_proto(metrics)
306303
.map_err(|err| FlightError::ProtocolError(err.to_string()))
307304
})
308-
.collect::<Result<Vec<MetricsSetProto>, FlightError>>()?;
305+
.collect::<Result<Vec<_>, _>>()?;
309306
result
310307
.child_task_metrics
311-
.insert(stage_key.clone(), proto_task_metrics.clone());
308+
.insert(stage_key, proto_task_metrics);
312309

313310
// Serialize the metrics for all tasks.
314311
let mut task_metrics_set = vec![];

src/execution_plans/metrics_collecting_stream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ impl<S> MetricsCollectingStream<S>
2727
where
2828
S: Stream<Item = Result<FlightData, FlightError>> + Send,
2929
{
30-
#[allow(dead_code)]
3130
pub fn new(
3231
stream: S,
3332
metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,

src/flight_service/do_get.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ impl ArrowFlightEndpoint {
144144
let task_data_capture = self.task_data_entries.clone();
145145
Ok(flight_stream_from_record_batch_stream(
146146
key.clone(),
147-
stage,
148147
stage_data.clone(),
149148
move || {
150149
task_data_capture.remove(key.clone());
@@ -158,11 +157,10 @@ fn missing(field: &'static str) -> impl FnOnce() -> Status {
158157
move || Status::invalid_argument(format!("Missing field '{field}'"))
159158
}
160159

161-
// Creates a tonic response from a stream of record batches. Handles
162-
// - RecordBatch to flight conversion partition tracking, stage eviction, and trailing metrics.
160+
/// Creates a tonic response from a stream of record batches. Handles
161+
/// - RecordBatch to flight conversion partition tracking, stage eviction, and trailing metrics.
163162
fn flight_stream_from_record_batch_stream(
164163
stage_key: StageKey,
165-
stage: Arc<StageExec>,
166164
stage_data: TaskData,
167165
evict_stage: impl FnOnce() + Send + 'static,
168166
stream: SendableRecordBatchStream,
@@ -183,12 +181,14 @@ fn flight_stream_from_record_batch_stream(
183181
{
184182
evict_stage();
185183

186-
let metrics_stream = collect_and_create_metrics_flight_data(stage_key, stage)
187-
.map_err(|err| {
188-
Status::internal(format!(
189-
"error collecting metrics in arrow flight endpoint: {err}"
190-
))
191-
})?;
184+
let metrics_stream =
185+
collect_and_create_metrics_flight_data(stage_key, stage_data.stage).map_err(
186+
|err| {
187+
Status::internal(format!(
188+
"error collecting metrics in arrow flight endpoint: {err}"
189+
))
190+
},
191+
)?;
192192

193193
return Ok(Some(metrics_stream));
194194
}

src/flight_service/trailing_flight_data_stream.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ where
2727
T: Stream<Item = Result<FlightData, FlightError>> + Send,
2828
F: FnOnce() -> Result<Option<T>, FlightError>,
2929
{
30-
// TODO: remove
31-
#[allow(dead_code)]
3230
pub fn new(on_complete: F, inner: S) -> Self {
3331
Self {
3432
inner,

src/metrics/proto.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ pub struct ProtoLabel {
137137
/// df_metrics_set_to_proto converts a [datafusion::physical_plan::metrics::MetricsSet] to a [MetricsSetProto].
138138
/// Custom metrics are filtered out, but any other errors are returned.
139139
/// TODO(#140): Support custom metrics.
140-
#[allow(dead_code)]
141140
pub fn df_metrics_set_to_proto(
142141
metrics_set: &MetricsSet,
143142
) -> Result<MetricsSetProto, DataFusionError> {
@@ -164,7 +163,6 @@ pub fn df_metrics_set_to_proto(
164163
}
165164

166165
/// metrics_set_proto_to_df converts a [MetricsSetProto] to a [datafusion::physical_plan::metrics::MetricsSet].
167-
#[allow(dead_code)]
168166
pub fn metrics_set_proto_to_df(
169167
metrics_set_proto: &MetricsSetProto,
170168
) -> Result<MetricsSet, DataFusionError> {
@@ -178,12 +176,10 @@ pub fn metrics_set_proto_to_df(
178176
}
179177

180178
/// Custom metrics are not supported in proto conversion.
181-
#[allow(dead_code)]
182179
const CUSTOM_METRICS_NOT_SUPPORTED: &str =
183180
"custom metrics are not supported in metrics proto conversion";
184181

185182
/// df_metric_to_proto converts a `datafusion::physical_plan::metrics::Metric` to a `MetricProto`. It does not consume the Arc<Metric>.
186-
#[allow(dead_code)]
187183
pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusionError> {
188184
let partition = metric.partition().map(|p| p as u64);
189185
let labels = metric

0 commit comments

Comments
 (0)