Skip to content

Commit 0200ce0

Browse files
address pr comments
1 parent ec800de commit 0200ce0

File tree

5 files changed

+12
-22
lines changed

5 files changed

+12
-22
lines changed

src/execution_plans/metrics.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ pub struct TaskMetricsCollector {
3838
}
3939

4040
/// MetricsCollectorResult is the result of collecting metrics from a task.
41-
#[allow(dead_code)]
4241
pub struct MetricsCollectorResult {
4342
// metrics is a collection of metrics for a task ordered using a pre-order traversal of the task's plan.
4443
pub(super) task_metrics: Vec<MetricsSet>,
@@ -108,7 +107,6 @@ impl TreeNodeRewriter for TaskMetricsCollector {
108107
}
109108

110109
impl TaskMetricsCollector {
111-
#[allow(dead_code)]
112110
pub fn new() -> Self {
113111
Self {
114112
task_metrics: Vec::new(),
@@ -120,7 +118,6 @@ impl TaskMetricsCollector {
120118
/// Returns
121119
/// - a vec representing the metrics for the current task (ordered using a pre-order traversal)
122120
/// - a map representing the metrics for some subset of child tasks collected from NetworkShuffleExec leaves
123-
#[allow(dead_code)]
124121
pub fn collect(
125122
mut self,
126123
plan: Arc<dyn ExecutionPlan>,
@@ -305,10 +302,10 @@ pub fn collect_and_create_metrics_flight_data(
305302
df_metrics_set_to_proto(metrics)
306303
.map_err(|err| FlightError::ProtocolError(err.to_string()))
307304
})
308-
.collect::<Result<Vec<MetricsSetProto>, FlightError>>()?;
305+
.collect::<Result<Vec<_>, _>>()?;
309306
result
310307
.child_task_metrics
311-
.insert(stage_key.clone(), proto_task_metrics.clone());
308+
.insert(stage_key, proto_task_metrics);
312309

313310
// Serialize the metrics for all tasks.
314311
let mut task_metrics_set = vec![];

src/execution_plans/metrics_collecting_stream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ impl<S> MetricsCollectingStream<S>
2727
where
2828
S: Stream<Item = Result<FlightData, FlightError>> + Send,
2929
{
30-
#[allow(dead_code)]
3130
pub fn new(
3231
stream: S,
3332
metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,

src/flight_service/do_get.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ impl ArrowFlightEndpoint {
126126
let task_data_capture = self.task_data_entries.clone();
127127
Ok(flight_stream_from_record_batch_stream(
128128
key.clone(),
129-
stage,
130129
stage_data.clone(),
131130
move || {
132131
task_data_capture.remove(key.clone());
@@ -140,11 +139,10 @@ fn missing(field: &'static str) -> impl FnOnce() -> Status {
140139
move || Status::invalid_argument(format!("Missing field '{field}'"))
141140
}
142141

143-
// Creates a tonic response from a stream of record batches. Handles
144-
// - RecordBatch to flight conversion partition tracking, stage eviction, and trailing metrics.
142+
/// Creates a tonic response from a stream of record batches. Handles
143+
/// - RecordBatch to flight conversion partition tracking, stage eviction, and trailing metrics.
145144
fn flight_stream_from_record_batch_stream(
146145
stage_key: StageKey,
147-
stage: Arc<StageExec>,
148146
stage_data: TaskData,
149147
evict_stage: impl FnOnce() + Send + 'static,
150148
stream: SendableRecordBatchStream,
@@ -165,12 +163,14 @@ fn flight_stream_from_record_batch_stream(
165163
{
166164
evict_stage();
167165

168-
let metrics_stream = collect_and_create_metrics_flight_data(stage_key, stage)
169-
.map_err(|err| {
170-
Status::internal(format!(
171-
"error collecting metrics in arrow flight endpoint: {err}"
172-
))
173-
})?;
166+
let metrics_stream =
167+
collect_and_create_metrics_flight_data(stage_key, stage_data.stage).map_err(
168+
|err| {
169+
Status::internal(format!(
170+
"error collecting metrics in arrow flight endpoint: {err}"
171+
))
172+
},
173+
)?;
174174

175175
return Ok(Some(metrics_stream));
176176
}

src/flight_service/trailing_flight_data_stream.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ where
2727
T: Stream<Item = Result<FlightData, FlightError>> + Send,
2828
F: FnOnce() -> Result<Option<T>, FlightError>,
2929
{
30-
// TODO: remove
31-
#[allow(dead_code)]
3230
pub fn new(on_complete: F, inner: S) -> Self {
3331
Self {
3432
inner,

src/metrics/proto.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ pub struct ProtoLabel {
137137
/// df_metrics_set_to_proto converts a [datafusion::physical_plan::metrics::MetricsSet] to a [MetricsSetProto].
138138
/// Custom metrics are filtered out, but any other errors are returned.
139139
/// TODO(#140): Support custom metrics.
140-
#[allow(dead_code)]
141140
pub fn df_metrics_set_to_proto(
142141
metrics_set: &MetricsSet,
143142
) -> Result<MetricsSetProto, DataFusionError> {
@@ -164,7 +163,6 @@ pub fn df_metrics_set_to_proto(
164163
}
165164

166165
/// metrics_set_proto_to_df converts a [MetricsSetProto] to a [datafusion::physical_plan::metrics::MetricsSet].
167-
#[allow(dead_code)]
168166
pub fn metrics_set_proto_to_df(
169167
metrics_set_proto: &MetricsSetProto,
170168
) -> Result<MetricsSet, DataFusionError> {
@@ -178,12 +176,10 @@ pub fn metrics_set_proto_to_df(
178176
}
179177

180178
/// Custom metrics are not supported in proto conversion.
181-
#[allow(dead_code)]
182179
const CUSTOM_METRICS_NOT_SUPPORTED: &str =
183180
"custom metrics are not supported in metrics proto conversion";
184181

185182
/// df_metric_to_proto converts a `datafusion::physical_plan::metrics::Metric` to a `MetricProto`. It does not consume the Arc<Metric>.
186-
#[allow(dead_code)]
187183
pub fn df_metric_to_proto(metric: Arc<Metric>) -> Result<MetricProto, DataFusionError> {
188184
let partition = metric.partition().map(|p| p as u64);
189185
let labels = metric

0 commit comments

Comments
 (0)