Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ jobs:
export PATH=$PATH:$HOME/d/protoc/bin
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test
cd examples
cargo run --example standalone_sql --features=ballista/standalone
cd ../python
cargo test --features=testcontainers
cd python
cargo test
env:
CARGO_HOME: "/github/home/.cargo"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tonic-build = { version = "0.12", default-features = false, features = [
tracing = "0.1.36"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
ctor = { version = "0.2" }

tokio = { version = "1" }
uuid = { version = "1.10", features = ["v4", "v7"] }
Expand Down
7 changes: 3 additions & 4 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ url = { workspace = true }
[dev-dependencies]
ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { version = "0.2" }
ctor = { workspace = true }
env_logger = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
testcontainers-modules = { version = "0.11", features = ["minio"] }
rstest = { version = "0.23" }
tonic = { workspace = true }

[features]
default = ["standalone"]
standalone = ["ballista-executor", "ballista-scheduler"]
testcontainers = []
11 changes: 6 additions & 5 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl SessionContextExt for SessionContext {
let config = SessionConfig::new_with_ballista();
let scheduler_url = Extension::parse_url(url)?;
log::info!(
"Connecting to Ballista scheduler at {}",
"Connecting to Ballista scheduler at: {}",
scheduler_url.clone()
);
let remote_session_id =
Expand Down Expand Up @@ -245,10 +245,11 @@ impl Extension {
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
Some(session_state) => {
ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(scheduler, concurrent_tasks, session_state)
ballista_executor::new_standalone_executor_from_state(
scheduler,
concurrent_tasks,
session_state,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
}
Expand Down
135 changes: 44 additions & 91 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,64 +19,14 @@ use std::env;
use std::error::Error;
use std::path::PathBuf;

use ballista::prelude::SessionConfigExt;
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::{
protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use ballista_scheduler::SessionBuilder;
use datafusion::execution::SessionState;
use datafusion::prelude::SessionConfig;
use object_store::aws::AmazonS3Builder;
use testcontainers_modules::minio::MinIO;
use testcontainers_modules::testcontainers::core::{CmdWaitFor, ExecCommand};
use testcontainers_modules::testcontainers::ContainerRequest;
use testcontainers_modules::{minio, testcontainers::ImageExt};

pub const REGION: &str = "eu-west-1";
pub const BUCKET: &str = "ballista";
pub const ACCESS_KEY_ID: &str = "MINIO";
pub const SECRET_KEY: &str = "MINIOMINIO";

#[allow(dead_code)]
pub fn create_s3_store(
port: u16,
) -> std::result::Result<object_store::aws::AmazonS3, object_store::Error> {
AmazonS3Builder::new()
.with_endpoint(format!("http://localhost:{port}"))
.with_region(REGION)
.with_bucket_name(BUCKET)
.with_access_key_id(ACCESS_KEY_ID)
.with_secret_access_key(SECRET_KEY)
.with_allow_http(true)
.build()
}

#[allow(dead_code)]
pub fn create_minio_container() -> ContainerRequest<minio::MinIO> {
MinIO::default()
.with_env_var("MINIO_ACCESS_KEY", ACCESS_KEY_ID)
.with_env_var("MINIO_SECRET_KEY", SECRET_KEY)
}

#[allow(dead_code)]
pub fn create_bucket_command() -> ExecCommand {
// this is hack to create a bucket without creating s3 client.
// this works with current testcontainer (and image) version 'RELEASE.2022-02-07T08-17-33Z'.
// (testcontainer does not await properly on latest image version)
//
// if testcontainer image version change to something newer we should use "mc mb /data/ballista"
// to crate a bucket.
ExecCommand::new(vec![
"mkdir".to_string(),
format!("/data/{}", crate::common::BUCKET),
])
.with_cmd_ready_condition(CmdWaitFor::seconds(1))
}

// /// Remote ballista cluster to be used for local testing.
// static BALLISTA_CLUSTER: tokio::sync::OnceCell<(String, u16)> =
// tokio::sync::OnceCell::const_new();
use datafusion::prelude::{SessionConfig, SessionContext};

/// Returns the parquet test data directory, which is by default
/// stored in a git submodule rooted at
Expand Down Expand Up @@ -161,17 +111,8 @@ pub async fn setup_test_cluster() -> (String, u16) {

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor(
scheduler,
Expand All @@ -190,7 +131,6 @@ pub async fn setup_test_cluster() -> (String, u16) {
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
//let default_codec = BallistaCodec::default();

let addr = ballista_scheduler::standalone::new_standalone_scheduler_from_state(
&session_state,
Expand All @@ -200,22 +140,10 @@ pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (Stri

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};

ballista_executor::new_standalone_executor_from_state::<
datafusion_proto::protobuf::LogicalPlanNode,
datafusion_proto::protobuf::PhysicalPlanNode,
>(
ballista_executor::new_standalone_executor_from_state(
scheduler,
config.ballista_standalone_parallelism(),
&session_state,
Expand Down Expand Up @@ -253,22 +181,13 @@ pub async fn setup_test_cluster_with_builders(

let host = "localhost".to_string();

let scheduler_url = format!("http://{}:{}", host, addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to test scheduler...");
}
Ok(scheduler) => break scheduler,
}
};
let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_builder(
scheduler,
config.ballista_standalone_parallelism(),
config_producer.clone(),
config_producer,
runtime_producer,
codec,
Default::default(),
Expand All @@ -281,6 +200,40 @@ pub async fn setup_test_cluster_with_builders(
(host, addr.port())
}

async fn connect_to_scheduler(
scheduler_url: String,
) -> SchedulerGrpcClient<tonic::transport::Channel> {
let mut retry = 50;
loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) if retry > 0 => {
retry -= 1;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::debug!("Re-attempting to connect to test scheduler...");
}

Err(_) => {
log::error!("scheduler connection timed out");
panic!("scheduler connection timed out")
}
Ok(scheduler) => break scheduler,
}
}
}

#[allow(dead_code)]
pub async fn standalone_context() -> SessionContext {
SessionContext::standalone().await.unwrap()
}

#[allow(dead_code)]
pub async fn remote_context() -> SessionContext {
let (host, port) = setup_test_cluster().await;
SessionContext::remote(&format!("df://{host}:{port}"))
.await
.unwrap()
}

#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod common;
//
#[cfg(test)]
#[cfg(feature = "standalone")]
mod standalone_tests {
mod basic {
use ballista::prelude::SessionContextExt;
use datafusion::arrow;
use datafusion::arrow::util::pretty::pretty_format_batches;
Expand Down
Loading