Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,026 changes: 536 additions & 490 deletions Cargo.lock

Large diffs are not rendered by default.

25 changes: 13 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,28 @@ resolver = "2"
#
edition = "2021"
# we should try to follow datafusion version
rust-version = "1.86.0"
rust-version = "1.88.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to align with DataFusion rust version, feel free to let me know if we dont want to bump it right now
https://github.com/apache/datafusion/blob/main/Cargo.toml#L80

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense


[workspace.dependencies]
arrow = { version = "56", features = ["ipc_compression"] }
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
arrow = { version = "57", features = ["ipc_compression"] }
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
datafusion = "50.3.0"
datafusion-cli = "50.3.0"
datafusion-proto = "50.3.0"
datafusion-proto-common = "50.3.0"
datafusion = "51.0.0"
datafusion-cli = "51.0.0"
datafusion-proto = "51.0.0"
datafusion-proto-common = "51.0.0"
object_store = "0.12"
prost = "0.13"
prost-types = "0.13"
prost = "0.14"
prost-types = "0.14"
rstest = { version = "0.26" }
tonic = { version = "0.13" }
tonic-build = { version = "0.13", default-features = false, features = [
tonic = { version = "0.14" }
tonic-build = { version = "0.14", default-features = false, features = [
"transport",
"prost"
] }
tonic-prost = { version = "0.14" }
tonic-prost-build = { version = "0.14" }
tracing = "0.1"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
7 changes: 5 additions & 2 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::env;
use std::path::Path;
use std::{env, sync::Arc};

use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
use ballista_cli::{
Expand All @@ -28,7 +28,9 @@ use datafusion::{
execution::SessionStateBuilder,
prelude::{SessionConfig, SessionContext},
};
use datafusion_cli::print_options::MaxRows;
use datafusion_cli::{
object_storage::instrumented::InstrumentedObjectStoreRegistry, print_options::MaxRows,
};
use mimalloc::MiMalloc;

#[global_allocator]
Expand Down Expand Up @@ -151,6 +153,7 @@ pub async fn main() -> Result<()> {
quiet: args.quiet,
maxrows: MaxRows::Unlimited,
color: args.color,
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
};

let files = args.file;
Expand Down
36 changes: 19 additions & 17 deletions ballista/client/tests/context_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ mod basic {

let result = df.unwrap().collect().await.unwrap();

let expected = ["+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
"| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |",
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+",
"| datafusion | public | csv_with_timestamps | name | Utf8 | YES |",
"| datafusion | public | csv_with_timestamps | ts | Timestamp(Nanosecond, None) | YES |",
"+---------------+--------------+---------------------+-------------+-----------------------------+-------------+"];
let expected = [
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
"| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |",
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
"| datafusion | public | csv_with_timestamps | name | Utf8 | YES |",
"| datafusion | public | csv_with_timestamps | ts | Timestamp(ns) | YES |",
"+---------------+--------------+---------------------+-------------+---------------+-------------+",
];
datafusion::assert_batches_eq!(expected, &result);
}

Expand Down Expand Up @@ -359,7 +361,7 @@ mod basic {
let res = df.collect().await.unwrap();
let expected = vec![
"+-------------------+",
"| var(test.id) |",
"| var_samp(test.id) |",
"+-------------------+",
"| 6.000000000000001 |",
"+-------------------+",
Expand Down Expand Up @@ -390,11 +392,11 @@ mod basic {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------+",
"| stddev(test.id) |",
"+--------------------+",
"| 2.4494897427831783 |",
"+--------------------+",
"+----------------------+",
"| stddev_samp(test.id) |",
"+----------------------+",
"| 2.4494897427831783 |",
"+----------------------+",
];
assert_result_eq(expected, &res);
}
Expand All @@ -408,11 +410,11 @@ mod basic {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+--------------------------------------+",
"| covar_samp(test.id,test.tinyint_col) |",
"+--------------------------------------+",
"| 0.28571428571428586 |",
"+--------------------------------------+",
"+---------------------------------+",
"| covar(test.id,test.tinyint_col) |",
"+---------------------------------+",
"| 0.28571428571428586 |",
"+---------------------------------+",
];
assert_result_eq(expected, &res);
}
Expand Down
14 changes: 8 additions & 6 deletions ballista/client/tests/context_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ mod standalone {
use datafusion::{
assert_batches_eq,
common::exec_err,
execution::{context::QueryPlanner, SessionState, SessionStateBuilder},
execution::{
context::QueryPlanner, SessionState, SessionStateBuilder, TaskContext,
},
logical_expr::LogicalPlan,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
Expand Down Expand Up @@ -275,7 +277,7 @@ mod standalone {
&self,
_buf: &[u8],
_inputs: &[datafusion::logical_expr::LogicalPlan],
_ctx: &SessionContext,
_ctx: &TaskContext,
) -> datafusion::error::Result<datafusion::logical_expr::Extension> {
self.invoked
.store(true, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -297,7 +299,7 @@ mod standalone {
_buf: &[u8],
_table_ref: &datafusion::sql::TableReference,
_schema: datafusion::arrow::datatypes::SchemaRef,
_ctx: &SessionContext,
_ctx: &TaskContext,
) -> datafusion::error::Result<
std::sync::Arc<dyn datafusion::catalog::TableProvider>,
> {
Expand All @@ -320,7 +322,7 @@ mod standalone {
fn try_decode_file_format(
&self,
_buf: &[u8],
_ctx: &SessionContext,
_ctx: &TaskContext,
) -> datafusion::error::Result<
Arc<dyn datafusion::datasource::file_format::FileFormatFactory>,
> {
Expand Down Expand Up @@ -361,12 +363,12 @@ mod standalone {
&self,
buf: &[u8],
inputs: &[Arc<dyn datafusion::physical_plan::ExecutionPlan>],
registry: &dyn datafusion::execution::FunctionRegistry,
ctx: &TaskContext,
) -> datafusion::error::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>>
{
self.invoked
.store(true, std::sync::atomic::Ordering::Relaxed);
self.codec.try_decode(buf, inputs, registry)
self.codec.try_decode(buf, inputs, ctx)
}

fn try_encode(
Expand Down
2 changes: 2 additions & 0 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tonic = { workspace = true }
tonic-prost = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

Expand All @@ -72,3 +73,4 @@ tempfile = { workspace = true }
[build-dependencies]
rustc_version = "0.4.1"
tonic-build = { workspace = true }
tonic-prost-build = { workspace = true }
2 changes: 1 addition & 1 deletion ballista/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() -> Result<(), String> {
println!("cargo:rerun-if-changed=proto/datafusion_common.proto");
println!("cargo:rerun-if-changed=proto/datafusion.proto");
println!("cargo:rerun-if-changed=proto/ballista.proto");
tonic_build::configure()
tonic_prost_build::configure()
.extern_path(".datafusion_common", "::datafusion_proto_common")
.extern_path(".datafusion", "::datafusion_proto::protobuf")
.protoc_arg("--experimental_allow_proto3_optional")
Expand Down
15 changes: 15 additions & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ message NamedTime {
uint64 value = 2;
}

message NamedPruningMetrics {
string name = 1;
uint64 pruned = 2;
uint64 matched = 3;
}

message NamedRatio {
string name = 1;
uint64 part = 2;
uint64 total = 3;
}

message OperatorMetric {
oneof metric {
uint64 output_rows = 1;
Expand All @@ -277,6 +289,9 @@ message OperatorMetric {
int64 start_timestamp = 9;
int64 end_timestamp = 10;
uint64 spilled_rows = 11;
uint64 output_bytes = 12;
NamedPruningMetrics pruning_metrics = 13;
NamedRatio ratio = 14;
}
}

Expand Down
10 changes: 6 additions & 4 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
plan: LogicalPlan,
session_id: String,
) -> Self {
let properties = Self::compute_properties(plan.schema().as_ref().clone().into());
let properties =
Self::compute_properties(plan.schema().as_arrow().clone().into());
Self {
scheduler_url,
config,
Expand All @@ -103,7 +104,8 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec: Arc<dyn LogicalExtensionCodec>,
session_id: String,
) -> Self {
let properties = Self::compute_properties(plan.schema().as_ref().clone().into());
let properties =
Self::compute_properties(plan.schema().as_arrow().clone().into());
Self {
scheduler_url,
config,
Expand Down Expand Up @@ -157,7 +159,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
}

fn schema(&self) -> SchemaRef {
self.plan.schema().as_ref().clone().into()
self.plan.schema().as_arrow().clone().into()
}

fn properties(&self) -> &PlanProperties {
Expand All @@ -180,7 +182,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
plan_repr: self.plan_repr,
session_id: self.session_id.clone(),
properties: Self::compute_properties(
self.plan.schema().as_ref().clone().into(),
self.plan.schema().as_arrow().clone().into(),
),
metrics: ExecutionPlanMetricsSet::new(),
}))
Expand Down
Loading
Loading