Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 69 additions & 69 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ resolver = "2"
# Minimum Supported Rust Version (MSRV) to 1.85.0
# which is datafusion 49s
#
edition = "2021"
edition = "2024"
# we should try to follow datafusion version
rust-version = "1.82.0"
rust-version = "1.89.0"

[workspace.dependencies]
arrow = { version = "55", features = ["ipc_compression"] }
Expand Down
2 changes: 1 addition & 1 deletion ballista-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::common::Result;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionContext;

use crate::functions::{display_all_functions, Function};
use crate::functions::{Function, display_all_functions};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;

Expand Down
4 changes: 2 additions & 2 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
//! Execution functions

use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::io::prelude::*;
use std::sync::Arc;
use std::time::Instant;

use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use rustyline::error::ReadlineError;

use crate::{
command::{Command, OutputFormat},
Expand Down
2 changes: 1 addition & 1 deletion ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::Path;

use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
use ballista_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION,
BALLISTA_CLI_VERSION, exec, print_format::PrintFormat, print_options::PrintOptions,
};
use clap::Parser;
use datafusion::{
Expand Down
2 changes: 1 addition & 1 deletion ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::path::PathBuf;

use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::{
protobuf::scheduler_grpc_client::SchedulerGrpcClient, BallistaCodec,
BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
};
use ballista_core::{ConfigProducer, RuntimeProducer};
use ballista_scheduler::SessionBuilder;
Expand Down
6 changes: 4 additions & 2 deletions ballista/client/tests/context_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ mod basic {

let result = df.unwrap().collect().await.unwrap();

let expected = ["+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
let expected = [
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
"| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |",
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
"| datafusion | public | csv_with_timestamps | name | Utf8 | YES |",
"| datafusion | public | csv_with_timestamps | ts | Timestamp(Nanosecond, None) | YES |",
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+"];
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
];
datafusion::assert_batches_eq!(expected, &result);
}

Expand Down
12 changes: 7 additions & 5 deletions ballista/client/tests/context_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ mod remote {
#[cfg(feature = "standalone")]
mod standalone {

use std::sync::{atomic::AtomicBool, Arc};
use std::sync::{Arc, atomic::AtomicBool};

use ballista::extension::{SessionConfigExt, SessionContextExt};
use ballista_core::serde::BallistaPhysicalExtensionCodec;
use datafusion::{
assert_batches_eq,
common::exec_err,
execution::{context::QueryPlanner, SessionState, SessionStateBuilder},
execution::{SessionState, SessionStateBuilder, context::QueryPlanner},
logical_expr::LogicalPlan,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
Expand Down Expand Up @@ -224,9 +224,11 @@ mod standalone {
.collect()
.await;

assert!(physical_codec
.invoked
.load(std::sync::atomic::Ordering::Relaxed));
assert!(
physical_codec
.invoked
.load(std::sync::atomic::Ordering::Relaxed)
);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion ballista/client/tests/context_unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ mod unsupported {
"| | FilterExec: id@0 > 4 |",
"| | ParquetExec: file_groups={1 group: [[Users/ballista/git/datafusion-ballista/ballista/client/testdata/alltypes_plain.parquet]]}, projection=[id], predicate=id@0 > 4, pruning_predicate=CASE WHEN id_null_count@1 = id_row_count@2 THEN false ELSE id_max@0 > 4 END, required_guarantees=[] |",
"| | |",
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
];

assert_batches_eq!(expected, &result);
Expand Down
15 changes: 7 additions & 8 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use crate::error::{BallistaError, Result};
use crate::serde::scheduler::{Action, PartitionId};

use arrow_flight;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, flight_service_client::FlightServiceClient};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::{
datatypes::{Schema, SchemaRef},
Expand Down Expand Up @@ -64,14 +64,13 @@ impl BallistaClient {
pub async fn try_new(host: &str, port: u16, max_message_size: usize) -> Result<Self> {
let addr = format!("http://{host}:{port}");
debug!("BallistaClient connecting to {addr}");
let connection =
create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::GrpcConnectionError(format!(
let connection = create_grpc_client_connection(addr.clone()).await.map_err(
|e| {
BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
))
})?;
},
)?;
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
Expand Down
18 changes: 10 additions & 8 deletions ballista/core/src/consistent_hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ where
.range(hashed_key..)
.chain(self.virtual_nodes.iter())
{
if let Some((node, _)) = self.node_replicas.get(node_name) {
if node.is_valid() {
return Some(position_key.clone());
}
if let Some((node, _)) = self.node_replicas.get(node_name)
&& node.is_valid()
{
return Some(position_key.clone());
}
if tolerance == 0 {
return None;
Expand All @@ -177,8 +177,8 @@ pub fn md5_hash(data: &[u8]) -> Vec<u8> {

#[cfg(test)]
mod test {
use crate::consistent_hash::node::Node;
use crate::consistent_hash::ConsistentHash;
use crate::consistent_hash::node::Node;

#[test]
fn test_topology() {
Expand Down Expand Up @@ -219,9 +219,11 @@ mod test {
for (i, key) in keys.iter().enumerate() {
if i == 2 {
assert!(consistent_hash.get(key.as_bytes()).is_none());
assert!(consistent_hash
.get_with_tolerance(key.as_bytes(), 1)
.is_some());
assert!(
consistent_hash
.get_with_tolerance(key.as_bytes(), 1)
.is_some()
);
} else {
assert_eq!(
consistent_hash.get(key.as_bytes()).unwrap().name(),
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/diagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ use crate::error::Result;
use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};

use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use log::warn;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
let write_file = File::create(filename)?;
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use async_trait::async_trait;
use log::{error, info};
Expand Down
6 changes: 3 additions & 3 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::client::BallistaClient;
use crate::config::BallistaConfig;
use crate::serde::protobuf::SuccessfulJob;
use crate::serde::protobuf::{
execute_query_params::Query, execute_query_result, job_status,
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, GetJobStatusParams,
GetJobStatusResult, KeyValuePair, PartitionLocation,
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation, execute_query_params::Query, execute_query_result, job_status,
scheduler_grpc_client::SchedulerGrpcClient,
};
use crate::utils::create_grpc_client_connection;
use datafusion::arrow::datatypes::SchemaRef;
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use itertools::Itertools;
use log::{debug, error};
use rand::prelude::SliceRandom;
use rand::rng;
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{Semaphore, mpsc};
use tokio_stream::wrappers::ReceiverStream;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
Expand Down Expand Up @@ -472,7 +472,7 @@ mod tests {
use datafusion::physical_plan::common;

use datafusion::prelude::SessionContext;
use tempfile::{tempdir, TempDir};
use tempfile::{TempDir, tempdir};

#[tokio::test]
async fn test_stats_for_partitions_empty() {
Expand Down
6 changes: 3 additions & 3 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::ipc::writer::IpcWriteOptions;

use datafusion::arrow::ipc::writer::StreamWriter;
use std::any::Any;
Expand Down Expand Up @@ -51,8 +51,8 @@ use datafusion::physical_plan::metrics::{
};

use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics, displayable,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};

Expand Down
11 changes: 7 additions & 4 deletions ballista/core/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
// under the License.

use crate::config::{
BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME,
BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_STANDALONE_PARALLELISM,
BallistaConfig,
};
use crate::planner::BallistaQueryPlanner;
use crate::serde::protobuf::KeyValuePair;
Expand Down Expand Up @@ -482,8 +483,10 @@ mod test {
let pairs = config.to_key_value_pairs();

assert!(pairs.iter().any(|p| p.key == BALLISTA_JOB_NAME));
assert!(pairs
.iter()
.any(|p| p.key == "datafusion.catalog.information_schema"))
assert!(
pairs
.iter()
.any(|p| p.key == "datafusion.catalog.information_schema")
)
}
}
11 changes: 7 additions & 4 deletions ballista/core/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,13 @@ impl CustomObjectStoreRegistry {
}

if let Some(endpoint) = endpoint {
if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) {
if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" {
return config_err!("Invalid endpoint: {endpoint}. HTTP is not allowed for S3 endpoints. To allow HTTP, set 's3.allow_http' to true");
}
if let Ok(endpoint_url) = Url::try_from(endpoint.as_str())
&& !matches!(allow_http, Some(true))
&& endpoint_url.scheme() == "http"
{
return config_err!(
"Invalid endpoint: {endpoint}. HTTP is not allowed for S3 endpoints. To allow HTTP, set 's3.allow_http' to true"
);
}

builder = builder.with_endpoint(endpoint);
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::logical_expr::{LogicalPlan, TableScan};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use std::marker::PhantomData;
Expand Down Expand Up @@ -180,7 +180,7 @@ mod test {
use datafusion::{
common::tree_node::TreeNode,
error::Result,
execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
execution::{SessionStateBuilder, runtime_env::RuntimeEnvBuilder},
prelude::{SessionConfig, SessionContext},
};

Expand Down
8 changes: 4 additions & 4 deletions ballista/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use datafusion_proto::logical_plan::file_formats::{
ArrowLogicalExtensionCodec, AvroLogicalExtensionCodec, CsvLogicalExtensionCodec,
JsonLogicalExtensionCodec, ParquetLogicalExtensionCodec,
};
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::protobuf::proto_error;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use datafusion_proto::{
Expand Down Expand Up @@ -492,12 +492,12 @@ mod test {
use super::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::registry::MemoryFunctionRegistry;
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::expressions::col;
use datafusion::{
common::DFSchema,
datasource::file_format::{parquet::ParquetFormatFactory, DefaultFileType},
logical_expr::{dml::CopyTo, EmptyRelation, LogicalPlan},
datasource::file_format::{DefaultFileType, parquet::ParquetFormatFactory},
logical_expr::{EmptyRelation, LogicalPlan, dml::CopyTo},
prelude::SessionContext,
};
use datafusion_proto::{logical_plan::AsLogicalPlan, protobuf::LogicalPlanNode};
Expand Down
Loading
Loading