Skip to content

Commit 80c2c56

Browse files
authored
chore: dependency cleanup (#1150)
1 parent b3cf8d1 commit 80c2c56

33 files changed

+320
-402
lines changed

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
2121
resolver = "2"
2222

2323
[workspace.dependencies]
24-
anyhow = "1"
2524
arrow = { version = "53", features = ["ipc_compression"] }
2625
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
2726
clap = { version = "4.5", features = ["derive", "cargo"] }
@@ -40,9 +39,9 @@ tonic-build = { version = "0.12", default-features = false, features = [
4039
"transport",
4140
"prost"
4241
] }
43-
tracing = "0.1.36"
42+
tracing = "0.1"
4443
tracing-appender = "0.2.2"
45-
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
44+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
4645
ctor = { version = "0.2" }
4746
mimalloc = { version = "0.1" }
4847

@@ -58,7 +57,6 @@ dashmap = { version = "6.1" }
5857
async-trait = { version = "0.1.4" }
5958
serde = { version = "1.0" }
6059
tokio-stream = { version = "0.1" }
61-
parse_arg = { version = "0.1" }
6260
url = { version = "2.5" }
6361

6462
# cargo build --profile release-lto

ballista-cli/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ keywords = ["ballista", "cli"]
2525
license = "Apache-2.0"
2626
homepage = "https://github.com/apache/arrow-ballista"
2727
repository = "https://github.com/apache/arrow-ballista"
28-
rust-version = "1.72"
2928
readme = "README.md"
3029

3130
[dependencies]

ballista/client/Cargo.toml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,26 @@ repository = "https://github.com/apache/arrow-ballista"
2525
readme = "README.md"
2626
authors = ["Apache DataFusion <[email protected]>"]
2727
edition = "2021"
28-
rust-version = "1.72"
2928

3029
[dependencies]
3130
async-trait = { workspace = true }
3231
ballista-core = { path = "../core", version = "0.12.0" }
3332
ballista-executor = { path = "../executor", version = "0.12.0", optional = true }
3433
ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = true }
3534
datafusion = { workspace = true }
36-
datafusion-proto = { workspace = true }
37-
futures = { workspace = true }
3835
log = { workspace = true }
39-
parking_lot = { workspace = true }
40-
tempfile = { workspace = true }
36+
4137
tokio = { workspace = true }
4238
url = { workspace = true }
4339

4440
[dev-dependencies]
4541
ballista-executor = { path = "../executor", version = "0.12.0" }
4642
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
4743
ctor = { workspace = true }
44+
datafusion-proto = { workspace = true }
4845
env_logger = { workspace = true }
4946
rstest = { version = "0.23" }
47+
tempfile = { workspace = true }
5048
tonic = { workspace = true }
5149

5250
[features]

ballista/core/Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,24 @@ exclude = ["*.proto"]
3434
rustc-args = ["--cfg", "docsrs"]
3535

3636
[features]
37+
build-binary = ["configure_me", "clap"]
3738
docsrs = []
3839
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
3940
force_hash_collisions = ["datafusion/force_hash_collisions"]
4041

41-
4242
[dependencies]
4343
arrow-flight = { workspace = true }
4444
async-trait = { workspace = true }
4545
chrono = { version = "0.4", default-features = false }
46-
clap = { workspace = true }
46+
clap = { workspace = true, optional = true }
47+
configure_me = { workspace = true, optional = true }
4748
datafusion = { workspace = true }
4849
datafusion-proto = { workspace = true }
4950
datafusion-proto-common = { workspace = true }
5051
futures = { workspace = true }
51-
5252
itertools = "0.13"
5353
log = { workspace = true }
5454
md-5 = { version = "^0.10.0" }
55-
parse_arg = { workspace = true }
5655
prost = { workspace = true }
5756
prost-types = { workspace = true }
5857
rand = { workspace = true }
@@ -66,5 +65,5 @@ url = { workspace = true }
6665
tempfile = { workspace = true }
6766

6867
[build-dependencies]
69-
rustc_version = "0.4.0"
68+
rustc_version = "0.4.1"
7069
tonic-build = { workspace = true }

ballista/core/src/config.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
//! Ballista configuration
2020
21-
use clap::ValueEnum;
22-
use core::fmt;
2321
use std::collections::HashMap;
2422
use std::result;
2523

@@ -252,30 +250,33 @@ impl datafusion::config::ConfigExtension for BallistaConfig {
252250

253251
// an enum used to configure the scheduler policy
254252
// needs to be visible to code generated by configure_me
255-
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
253+
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
254+
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
256255
pub enum TaskSchedulingPolicy {
257256
#[default]
258257
PullStaged,
259258
PushStaged,
260259
}
261260

261+
#[cfg(feature = "build-binary")]
262262
impl std::str::FromStr for TaskSchedulingPolicy {
263263
type Err = String;
264264

265265
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
266-
ValueEnum::from_str(s, true)
266+
clap::ValueEnum::from_str(s, true)
267267
}
268268
}
269-
270-
impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
271-
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
269+
#[cfg(feature = "build-binary")]
270+
impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
271+
fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
272272
write!(writer, "The scheduler policy for the scheduler")
273273
}
274274
}
275275

276276
// an enum used to configure the log rolling policy
277277
// needs to be visible to code generated by configure_me
278-
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
278+
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
279+
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
279280
pub enum LogRotationPolicy {
280281
Minutely,
281282
Hourly,
@@ -284,16 +285,18 @@ pub enum LogRotationPolicy {
284285
Never,
285286
}
286287

288+
#[cfg(feature = "build-binary")]
287289
impl std::str::FromStr for LogRotationPolicy {
288290
type Err = String;
289291

290292
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
291-
ValueEnum::from_str(s, true)
293+
clap::ValueEnum::from_str(s, true)
292294
}
293295
}
294296

295-
impl parse_arg::ParseArgFromStr for LogRotationPolicy {
296-
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
297+
#[cfg(feature = "build-binary")]
298+
impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy {
299+
fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
297300
write!(writer, "The log rotation policy")
298301
}
299302
}

ballista/core/src/diagram.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::error::Result;
19+
use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
20+
21+
use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
22+
use datafusion::physical_plan::aggregates::AggregateExec;
23+
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
24+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
25+
use datafusion::physical_plan::filter::FilterExec;
26+
use datafusion::physical_plan::joins::HashJoinExec;
27+
use datafusion::physical_plan::projection::ProjectionExec;
28+
use datafusion::physical_plan::sorts::sort::SortExec;
29+
use datafusion::physical_plan::ExecutionPlan;
30+
use std::fs::File;
31+
use std::io::{BufWriter, Write};
32+
use std::sync::atomic::{AtomicUsize, Ordering};
33+
use std::sync::Arc;
34+
35+
pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
36+
let write_file = File::create(filename)?;
37+
let mut w = BufWriter::new(&write_file);
38+
writeln!(w, "digraph G {{")?;
39+
40+
// draw stages and entities
41+
for stage in stages {
42+
writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
43+
writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
44+
let mut id = AtomicUsize::new(0);
45+
build_exec_plan_diagram(
46+
&mut w,
47+
stage.children()[0].as_ref(),
48+
stage.stage_id(),
49+
&mut id,
50+
true,
51+
)?;
52+
writeln!(w, "\t}}")?;
53+
}
54+
55+
// draw relationships
56+
for stage in stages {
57+
let mut id = AtomicUsize::new(0);
58+
build_exec_plan_diagram(
59+
&mut w,
60+
stage.children()[0].as_ref(),
61+
stage.stage_id(),
62+
&mut id,
63+
false,
64+
)?;
65+
}
66+
67+
write!(w, "}}")?;
68+
Ok(())
69+
}
70+
71+
fn build_exec_plan_diagram(
72+
w: &mut BufWriter<&File>,
73+
plan: &dyn ExecutionPlan,
74+
stage_id: usize,
75+
id: &mut AtomicUsize,
76+
draw_entity: bool,
77+
) -> Result<usize> {
78+
let operator_str = if plan.as_any().downcast_ref::<AggregateExec>().is_some() {
79+
"AggregateExec"
80+
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
81+
"SortExec"
82+
} else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
83+
"ProjectionExec"
84+
} else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
85+
"HashJoinExec"
86+
} else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
87+
"ParquetExec"
88+
} else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
89+
"CsvExec"
90+
} else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
91+
"FilterExec"
92+
} else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
93+
"ShuffleWriterExec"
94+
} else if plan
95+
.as_any()
96+
.downcast_ref::<UnresolvedShuffleExec>()
97+
.is_some()
98+
{
99+
"UnresolvedShuffleExec"
100+
} else if plan
101+
.as_any()
102+
.downcast_ref::<CoalesceBatchesExec>()
103+
.is_some()
104+
{
105+
"CoalesceBatchesExec"
106+
} else if plan
107+
.as_any()
108+
.downcast_ref::<CoalescePartitionsExec>()
109+
.is_some()
110+
{
111+
"CoalescePartitionsExec"
112+
} else {
113+
println!("Unknown: {plan:?}");
114+
"Unknown"
115+
};
116+
117+
let node_id = id.load(Ordering::SeqCst);
118+
id.store(node_id + 1, Ordering::SeqCst);
119+
120+
if draw_entity {
121+
writeln!(
122+
w,
123+
"\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];"
124+
)?;
125+
}
126+
for child in plan.children() {
127+
if let Some(shuffle) = child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
128+
if !draw_entity {
129+
writeln!(
130+
w,
131+
"\tstage_{}_exec_1 -> stage_{}_exec_{};",
132+
shuffle.stage_id, stage_id, node_id
133+
)?;
134+
}
135+
} else {
136+
// relationships within same entity
137+
let child_id =
138+
build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?;
139+
if draw_entity {
140+
writeln!(
141+
w,
142+
"\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};"
143+
)?;
144+
}
145+
}
146+
}
147+
Ok(node_id)
148+
}

0 commit comments

Comments
 (0)