Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
# bump directly to datafusion v43 to avoid the serde bug on v42 (https://github.com/apache/datafusion/pull/12626)
datafusion = "42.0.0"
datafusion-cli = "42.0.0"
datafusion-proto = "42.0.0"
datafusion-proto-common = "42.0.0"
datafusion = "43.0.0"
datafusion-cli = "43.0.0"
datafusion-proto = "43.0.0"
datafusion-proto-common = "43.0.0"
object_store = "0.11"
prost = "0.13"
prost-types = "0.13"
Expand Down
8 changes: 4 additions & 4 deletions ballista/client/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod standalone {

let test_data = crate::common::example_test_data();
let config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(config)?;
let runtime_env = RuntimeEnv::try_new(config)?;

runtime_env.register_object_store(
&format!("s3://{}", crate::common::BUCKET)
Expand Down Expand Up @@ -144,7 +144,7 @@ mod remote {
.map_err(|e| DataFusionError::External(e.into()))?;

let config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(config)?;
let runtime_env = RuntimeEnv::try_new(config)?;

runtime_env.register_object_store(
&format!("s3://{}", crate::common::BUCKET)
Expand Down Expand Up @@ -290,7 +290,7 @@ mod custom_s3_config {
CustomObjectStoreRegistry::new(s3options.clone()),
));

Ok(Arc::new(RuntimeEnv::new(config)?))
Ok(Arc::new(RuntimeEnv::try_new(config)?))
});

// Session builder creates SessionState
Expand Down Expand Up @@ -500,7 +500,7 @@ mod custom_s3_config {
let config = RuntimeConfig::new().with_object_store_registry(Arc::new(
CustomObjectStoreRegistry::new(s3options.clone()),
));
let runtime_env = RuntimeEnv::new(config).unwrap();
let runtime_env = RuntimeEnv::try_new(config).unwrap();

SessionStateBuilder::new()
.with_runtime_env(runtime_env.into())
Expand Down
2 changes: 1 addition & 1 deletion ballista/client/tests/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ mod standalone {
}
}

#[derive(Default)]
#[derive(Debug, Default)]
struct BadPlanner {}

#[async_trait::async_trait]
Expand Down
23 changes: 19 additions & 4 deletions ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ pub fn default_session_builder(config: SessionConfig) -> SessionState {
SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()))
.with_runtime_env(Arc::new(
RuntimeEnv::try_new(RuntimeConfig::default()).unwrap(),
))
.build()
}

Expand Down Expand Up @@ -269,7 +271,9 @@ pub fn create_df_ctx_with_ballista_query_planner<T: 'static + AsLogicalPlan>(
let session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()))
.with_runtime_env(Arc::new(
RuntimeEnv::try_new(RuntimeConfig::default()).unwrap(),
))
.with_query_planner(planner)
.with_session_id(session_id)
.build();
Expand Down Expand Up @@ -317,7 +321,7 @@ impl SessionStateExt for SessionState {
.with_round_robin_repartition(false);

let runtime_config = RuntimeConfig::default();
let runtime_env = RuntimeEnv::new(runtime_config)?;
let runtime_env = RuntimeEnv::try_new(runtime_config)?;
let session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
Expand Down Expand Up @@ -632,6 +636,17 @@ pub struct BallistaQueryPlanner<T: AsLogicalPlan> {
plan_repr: PhantomData<T>,
}

impl<T: AsLogicalPlan> std::fmt::Debug for BallistaQueryPlanner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BallistaQueryPlanner")
.field("scheduler_url", &self.scheduler_url)
.field("config", &self.config)
.field("extension_codec", &self.extension_codec)
.field("plan_repr", &self.plan_repr)
.finish()
}
}

impl<T: 'static + AsLogicalPlan> BallistaQueryPlanner<T> {
pub fn new(scheduler_url: String, config: BallistaConfig) -> Self {
Self {
Expand Down Expand Up @@ -828,7 +843,7 @@ mod test {
use super::SessionConfigExt;

fn context() -> SessionContext {
let runtime_environment = RuntimeEnv::new(RuntimeConfig::new()).unwrap();
let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap();

let session_config = SessionConfig::new().with_information_schema(true);

Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
let wd = work_dir.clone();
let runtime_producer: RuntimeProducer = Arc::new(move |_| {
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
Ok(Arc::new(RuntimeEnv::new(config)?))
Ok(Arc::new(RuntimeEnv::try_new(config)?))
});

let logical = opt
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub async fn new_standalone_executor<
let wd = work_dir.clone();
let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| {
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
Ok(Arc::new(RuntimeEnv::new(config)?))
Ok(Arc::new(RuntimeEnv::try_new(config)?))
});

let executor = Arc::new(Executor::new_basic(
Expand Down
5 changes: 3 additions & 2 deletions ballista/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const TEST_SCHEDULER_NAME: &str = "localhost:50050";
/// Sometimes we need to construct logical plans that will produce errors
/// when we try and create physical plan. A scan using `ExplodingTableProvider`
/// will do the trick
#[derive(Debug)]
pub struct ExplodingTableProvider;

#[async_trait]
Expand Down Expand Up @@ -136,15 +137,15 @@ pub async fn datafusion_test_context(path: &str) -> Result<SessionContext> {
let default_shuffle_partitions = 2;
let config = SessionConfig::new().with_target_partitions(default_shuffle_partitions);
let ctx = SessionContext::new_with_config(config);
for table in TPCH_TABLES {
for &table in TPCH_TABLES {
let schema = get_tpch_schema(table);
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(b'|')
.has_header(false)
.file_extension(".tbl");
let dir = format!("{path}/{table}");
ctx.register_csv(table, &dir, options).await?;
ctx.register_csv(table, dir, options).await?;
}
Ok(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ async fn register_tables(
ctx: &SessionContext,
debug: bool,
) -> Result<()> {
for table in TABLES {
for &table in TABLES {
match file_format {
// dbgen creates .tbl ('|' delimited) files without header
"tbl" => {
Expand Down