Skip to content

Commit 169cadd

Browse files
authored
Make easier to create custom schedulers and executors (#1118)
1 parent 4e8c64b commit 169cadd

File tree

21 files changed

+1150
-209
lines changed

21 files changed

+1150
-209
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
2121
resolver = "2"
2222

2323
[workspace.dependencies]
24+
anyhow = "1"
2425
arrow = { version = "53", features = ["ipc_compression"] }
2526
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
2627
clap = { version = "3", features = ["derive", "cargo"] }

ballista/executor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ path = "src/bin/main.rs"
3737
default = ["mimalloc"]
3838

3939
[dependencies]
40-
anyhow = "1"
40+
anyhow = { workspace = true }
4141
arrow = { workspace = true }
4242
arrow-flight = { workspace = true }
4343
async-trait = { workspace = true }

ballista/executor/src/bin/main.rs

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,15 @@
1818
//! Ballista Rust executor binary.
1919
2020
use anyhow::Result;
21-
use std::sync::Arc;
22-
21+
use ballista_core::config::LogRotationPolicy;
2322
use ballista_core::print_version;
23+
use ballista_executor::config::prelude::*;
2424
use ballista_executor::executor_process::{
2525
start_executor_process, ExecutorProcessConfig,
2626
};
27-
use config::prelude::*;
28-
29-
#[allow(unused_imports)]
30-
#[macro_use]
31-
extern crate configure_me;
32-
33-
#[allow(clippy::all, warnings)]
34-
mod config {
35-
// Ideally we would use the include_config macro from configure_me, but then we cannot use
36-
// #[allow(clippy::all)] to silence clippy warnings from the generated code
37-
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
38-
}
27+
use std::env;
28+
use std::sync::Arc;
29+
use tracing_subscriber::EnvFilter;
3930

4031
#[cfg(feature = "mimalloc")]
4132
#[global_allocator]
@@ -53,46 +44,39 @@ async fn main() -> Result<()> {
5344
std::process::exit(0);
5445
}
5546

56-
let log_file_name_prefix = format!(
57-
"executor_{}_{}",
58-
opt.external_host
59-
.clone()
60-
.unwrap_or_else(|| "localhost".to_string()),
61-
opt.bind_port
62-
);
47+
let config: ExecutorProcessConfig = opt.try_into()?;
48+
49+
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
50+
let log_filter =
51+
EnvFilter::new(rust_log.unwrap_or(config.special_mod_log_level.clone()));
52+
53+
let tracing = tracing_subscriber::fmt()
54+
.with_ansi(false)
55+
.with_thread_names(config.print_thread_info)
56+
.with_thread_ids(config.print_thread_info)
57+
.with_env_filter(log_filter);
6358

64-
let config = ExecutorProcessConfig {
65-
special_mod_log_level: opt.log_level_setting,
66-
external_host: opt.external_host,
67-
bind_host: opt.bind_host,
68-
port: opt.bind_port,
69-
grpc_port: opt.bind_grpc_port,
70-
scheduler_host: opt.scheduler_host,
71-
scheduler_port: opt.scheduler_port,
72-
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
73-
concurrent_tasks: opt.concurrent_tasks,
74-
task_scheduling_policy: opt.task_scheduling_policy,
75-
work_dir: opt.work_dir,
76-
log_dir: opt.log_dir,
77-
log_file_name_prefix,
78-
log_rotation_policy: opt.log_rotation_policy,
79-
print_thread_info: opt.print_thread_info,
80-
job_data_ttl_seconds: opt.job_data_ttl_seconds,
81-
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
82-
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
83-
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
84-
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
85-
data_cache_policy: opt.data_cache_policy,
86-
cache_dir: opt.cache_dir,
87-
cache_capacity: opt.cache_capacity,
88-
cache_io_concurrency: opt.cache_io_concurrency,
89-
execution_engine: None,
90-
function_registry: None,
91-
config_producer: None,
92-
runtime_producer: None,
93-
logical_codec: None,
94-
physical_codec: None,
95-
};
59+
// File layer
60+
if let Some(log_dir) = &config.log_dir {
61+
let log_file = match config.log_rotation_policy {
62+
LogRotationPolicy::Minutely => {
63+
tracing_appender::rolling::minutely(log_dir, &config.log_file_name_prefix)
64+
}
65+
LogRotationPolicy::Hourly => {
66+
tracing_appender::rolling::hourly(log_dir, &config.log_file_name_prefix)
67+
}
68+
LogRotationPolicy::Daily => {
69+
tracing_appender::rolling::daily(log_dir, &config.log_file_name_prefix)
70+
}
71+
LogRotationPolicy::Never => {
72+
tracing_appender::rolling::never(log_dir, &config.log_file_name_prefix)
73+
}
74+
};
75+
76+
tracing.with_writer(log_file).init();
77+
} else {
78+
tracing.init();
79+
}
9680

9781
start_executor_process(Arc::new(config)).await
9882
}

ballista/executor/src/config.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use ballista_core::error::BallistaError;
19+
20+
use crate::executor_process::ExecutorProcessConfig;
21+
22+
// Ideally we would use the include_config macro from configure_me, but then we cannot use
23+
// #[allow(clippy::all)] to silence clippy warnings from the generated code
24+
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
25+
26+
impl TryFrom<Config> for ExecutorProcessConfig {
27+
type Error = BallistaError;
28+
29+
fn try_from(opt: Config) -> Result<Self, Self::Error> {
30+
let log_file_name_prefix = format!(
31+
"executor_{}_{}",
32+
opt.external_host
33+
.clone()
34+
.unwrap_or_else(|| "localhost".to_string()),
35+
opt.bind_port
36+
);
37+
38+
Ok(ExecutorProcessConfig {
39+
special_mod_log_level: opt.log_level_setting,
40+
external_host: opt.external_host,
41+
bind_host: opt.bind_host,
42+
port: opt.bind_port,
43+
grpc_port: opt.bind_grpc_port,
44+
scheduler_host: opt.scheduler_host,
45+
scheduler_port: opt.scheduler_port,
46+
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
47+
concurrent_tasks: opt.concurrent_tasks,
48+
task_scheduling_policy: opt.task_scheduling_policy,
49+
work_dir: opt.work_dir,
50+
log_dir: opt.log_dir,
51+
log_file_name_prefix,
52+
log_rotation_policy: opt.log_rotation_policy,
53+
print_thread_info: opt.print_thread_info,
54+
job_data_ttl_seconds: opt.job_data_ttl_seconds,
55+
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
56+
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
57+
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
58+
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
59+
data_cache_policy: opt.data_cache_policy,
60+
cache_dir: opt.cache_dir,
61+
cache_capacity: opt.cache_capacity,
62+
cache_io_concurrency: opt.cache_io_concurrency,
63+
override_execution_engine: None,
64+
override_function_registry: None,
65+
override_config_producer: None,
66+
override_runtime_producer: None,
67+
override_logical_codec: None,
68+
override_physical_codec: None,
69+
})
70+
}
71+
}

ballista/executor/src/executor_process.rs

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::net::SocketAddr;
2121
use std::sync::atomic::Ordering;
2222
use std::sync::Arc;
2323
use std::time::{Duration, Instant, UNIX_EPOCH};
24-
use std::{env, io};
2524

2625
use anyhow::{Context, Result};
2726
use arrow_flight::flight_service_server::FlightServiceServer;
@@ -37,7 +36,6 @@ use tokio::signal;
3736
use tokio::sync::mpsc;
3837
use tokio::task::JoinHandle;
3938
use tokio::{fs, time};
40-
use tracing_subscriber::EnvFilter;
4139
use uuid::Uuid;
4240

4341
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -98,57 +96,20 @@ pub struct ExecutorProcessConfig {
9896
pub executor_heartbeat_interval_seconds: u64,
9997
/// Optional execution engine to use to execute physical plans, will default to
10098
/// DataFusion if none is provided.
101-
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
99+
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
102100
/// Overrides default function registry
103-
pub function_registry: Option<Arc<BallistaFunctionRegistry>>,
101+
pub override_function_registry: Option<Arc<BallistaFunctionRegistry>>,
104102
/// [RuntimeProducer] override option
105-
pub runtime_producer: Option<RuntimeProducer>,
103+
pub override_runtime_producer: Option<RuntimeProducer>,
106104
/// [ConfigProducer] override option
107-
pub config_producer: Option<ConfigProducer>,
105+
pub override_config_producer: Option<ConfigProducer>,
108106
/// [PhysicalExtensionCodec] override option
109-
pub logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
107+
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
110108
/// [PhysicalExtensionCodec] override option
111-
pub physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
109+
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
112110
}
113111

114112
pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
115-
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
116-
let log_filter =
117-
EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
118-
// File layer
119-
if let Some(log_dir) = opt.log_dir.clone() {
120-
let log_file = match opt.log_rotation_policy {
121-
LogRotationPolicy::Minutely => {
122-
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
123-
}
124-
LogRotationPolicy::Hourly => {
125-
tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
126-
}
127-
LogRotationPolicy::Daily => {
128-
tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
129-
}
130-
LogRotationPolicy::Never => {
131-
tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
132-
}
133-
};
134-
tracing_subscriber::fmt()
135-
.with_ansi(false)
136-
.with_thread_names(opt.print_thread_info)
137-
.with_thread_ids(opt.print_thread_info)
138-
.with_writer(log_file)
139-
.with_env_filter(log_filter)
140-
.init();
141-
} else {
142-
// Console layer
143-
tracing_subscriber::fmt()
144-
.with_ansi(false)
145-
.with_thread_names(opt.print_thread_info)
146-
.with_thread_ids(opt.print_thread_info)
147-
.with_writer(io::stdout)
148-
.with_env_filter(log_filter)
149-
.init();
150-
}
151-
152113
let addr = format!("{}:{}", opt.bind_host, opt.port);
153114
let addr = addr
154115
.parse()
@@ -194,23 +155,26 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
194155
// put them to session config
195156
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
196157
let config_producer = opt
197-
.config_producer
158+
.override_config_producer
198159
.clone()
199160
.unwrap_or_else(|| Arc::new(default_config_producer));
200161

201162
let wd = work_dir.clone();
202-
let runtime_producer: RuntimeProducer = Arc::new(move |_| {
203-
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
204-
Ok(Arc::new(RuntimeEnv::new(config)?))
205-
});
163+
let runtime_producer: RuntimeProducer =
164+
opt.override_runtime_producer.clone().unwrap_or_else(|| {
165+
Arc::new(move |_| {
166+
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
167+
Ok(Arc::new(RuntimeEnv::new(config)?))
168+
})
169+
});
206170

207171
let logical = opt
208-
.logical_codec
172+
.override_logical_codec
209173
.clone()
210174
.unwrap_or_else(|| Arc::new(BallistaLogicalExtensionCodec::default()));
211175

212176
let physical = opt
213-
.physical_codec
177+
.override_physical_codec
214178
.clone()
215179
.unwrap_or_else(|| Arc::new(BallistaPhysicalExtensionCodec::default()));
216180

@@ -224,10 +188,10 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
224188
&work_dir,
225189
runtime_producer,
226190
config_producer,
227-
opt.function_registry.clone().unwrap_or_default(),
191+
opt.override_function_registry.clone().unwrap_or_default(),
228192
metrics_collector,
229193
concurrent_tasks,
230-
opt.execution_engine.clone(),
194+
opt.override_execution_engine.clone(),
231195
));
232196

233197
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;

ballista/executor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#![doc = include_str!("../README.md")]
1919

2020
pub mod collect;
21+
pub mod config;
2122
pub mod execution_engine;
2223
pub mod execution_loop;
2324
pub mod executor;

ballista/scheduler/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ prometheus-metrics = ["prometheus", "once_cell"]
4141
rest-api = []
4242

4343
[dependencies]
44-
anyhow = "1"
44+
anyhow = { workspace = true }
4545
arrow-flight = { workspace = true }
4646
async-trait = { workspace = true }
4747
axum = "0.7.7"

ballista/scheduler/scheduler_config_spec.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ doc = "Delayed interval for cleaning up finished job state. Default: 3600"
8282

8383
[[param]]
8484
name = "task_distribution"
85-
type = "ballista_scheduler::config::TaskDistribution"
85+
type = "crate::config::TaskDistribution"
8686
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin, consistent-hash. Default: bias"
87-
default = "ballista_scheduler::config::TaskDistribution::Bias"
87+
default = "crate::config::TaskDistribution::Bias"
8888

8989
[[param]]
9090
name = "consistent_hash_num_replicas"

0 commit comments

Comments
 (0)