Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
with:
lfs: true
- uses: ./.github/actions/setup
- run: cargo test
- run: cargo test --features integration

format-check:
runs-on: ubuntu-latest
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,13 @@ prost = "0.13.5"
rand = "0.8.5"
object_store = "0.12.3"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }

[features]
integration = [
"insta"
]

[dev-dependencies]
insta = { version = "1.43.1", features = ["filters"] }
2 changes: 1 addition & 1 deletion src/flight_service/stream_partitioner_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl StreamPartitionerRegistry {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::MockExec;
use crate::test_utils::mock_exec::MockExec;
use datafusion::arrow::array::{RecordBatch, UInt32Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::TaskContext;
Expand Down
14 changes: 8 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ mod common;
mod composed_extension_codec;
mod errors;
mod flight_service;
mod physical_optimizer;
mod plan;
#[cfg(test)]
mod test_utils;

pub mod physical_optimizer;
pub mod stage;
pub mod task;
mod stage;
mod task;
mod user_provided_codec;

#[cfg(any(feature = "integration", test))]
pub mod test_utils;

pub use channel_manager::{BoxCloneSyncChannel, ChannelManager, ChannelResolver};
pub use flight_service::{ArrowFlightEndpoint, SessionBuilder};
pub use physical_optimizer::DistributedPhysicalOptimizerRule;
pub use plan::ArrowFlightReadExec;
pub use stage::{display_stage_graphviz, ExecutionStage};
pub use user_provided_codec::{add_user_codec, with_user_codec};
2 changes: 1 addition & 1 deletion src/physical_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DistributedPhysicalOptimizerRule {
mod tests {
use crate::assert_snapshot;
use crate::physical_optimizer::DistributedPhysicalOptimizerRule;
use crate::test_utils::register_parquet_tables;
use crate::test_utils::parquet::register_parquet_tables;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::displayable;
Expand Down
6 changes: 4 additions & 2 deletions src/test_utils/insta.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::env;

pub use insta;

#[macro_export]
macro_rules! assert_snapshot {
($($arg:tt)*) => {
crate::test_utils::insta::settings().bind(|| {
insta::assert_snapshot!($($arg)*);
$crate::test_utils::insta::settings().bind(|| {
$crate::test_utils::insta::insta::assert_snapshot!($($arg)*);
})
};
}
Expand Down
6 changes: 3 additions & 3 deletions tests/common/localhost.rs → src/test_utils/localhost.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelManager, ChannelResolver, SessionBuilder,
};
use arrow_flight::flight_service_server::FlightServiceServer;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion::{common::runtime::JoinSet, prelude::SessionConfig};
use datafusion_distributed::{
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelManager, ChannelResolver, SessionBuilder,
};
use std::error::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
Expand Down
10 changes: 5 additions & 5 deletions src/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod insta;
mod mock_exec;
mod parquet;

pub use mock_exec::MockExec;
pub use parquet::register_parquet_tables;
pub mod localhost;
pub mod mock_exec;
pub mod parquet;
pub mod plan;
pub mod tpch;
3 changes: 1 addition & 2 deletions tests/common/plan.rs → src/test_utils/plan.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::{ArrowFlightReadExec, DistributedPhysicalOptimizerRule};
use datafusion::common::plan_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::error::DataFusionError;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::ExecutionPlan;
use datafusion_distributed::physical_optimizer::DistributedPhysicalOptimizerRule;
use datafusion_distributed::ArrowFlightReadExec;
use std::sync::Arc;

pub fn distribute_aggregate(
Expand Down
File renamed without changes.
29 changes: 0 additions & 29 deletions tests/common/insta.rs

This file was deleted.

5 changes: 0 additions & 5 deletions tests/common/mod.rs

This file was deleted.

20 changes: 0 additions & 20 deletions tests/common/parquet.rs

This file was deleted.

13 changes: 5 additions & 8 deletions tests/custom_extension_codec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
#[allow(dead_code)]
mod common;

#[cfg(test)]
#[cfg(all(feature = "integration", test))]
mod tests {
use crate::assert_snapshot;
use crate::common::localhost::start_localhost_context;
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -27,9 +22,11 @@ mod tests {
use datafusion::physical_plan::{
displayable, execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion_distributed::physical_optimizer::DistributedPhysicalOptimizerRule;
use datafusion_distributed::assert_snapshot;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::{
add_user_codec, with_user_codec, ArrowFlightReadExec, SessionBuilder,
add_user_codec, with_user_codec, ArrowFlightReadExec, DistributedPhysicalOptimizerRule,
SessionBuilder,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf::proto_error;
Expand Down
16 changes: 8 additions & 8 deletions tests/distributed_aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#[allow(dead_code)]
mod common;

#[cfg(test)]
#[cfg(all(feature = "integration", test))]
mod tests {
use crate::assert_snapshot;
use crate::common::localhost::{start_localhost_context, NoopSessionBuilder};
use crate::common::parquet::register_parquet_tables;
use crate::common::plan::distribute_aggregate;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion_distributed::assert_snapshot;
use datafusion_distributed::test_utils::localhost::{
start_localhost_context, NoopSessionBuilder,
};
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
use datafusion_distributed::test_utils::plan::distribute_aggregate;
use futures::TryStreamExt;
use std::error::Error;

#[tokio::test]
#[ignore]
async fn distributed_aggregation() -> Result<(), Box<dyn Error>> {
// FIXME: these ports are in use on my machine, we should find unused ports
// Changed them for now
Expand Down
11 changes: 4 additions & 7 deletions tests/error_propagation.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#[allow(dead_code)]
mod common;

#[cfg(test)]
#[cfg(all(feature = "integration", test))]
mod tests {
use crate::common::localhost::start_localhost_context;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::error::DataFusionError;
use datafusion::execution::{
Expand All @@ -15,9 +11,10 @@ mod tests {
use datafusion::physical_plan::{
execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion_distributed::physical_optimizer::DistributedPhysicalOptimizerRule;
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::{
add_user_codec, with_user_codec, ArrowFlightReadExec, SessionBuilder,
add_user_codec, with_user_codec, ArrowFlightReadExec, DistributedPhysicalOptimizerRule,
SessionBuilder,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf::proto_error;
Expand Down
14 changes: 6 additions & 8 deletions tests/highly_distributed_query.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#[allow(dead_code)]
mod common;

#[cfg(test)]
#[cfg(all(feature = "integration", test))]
mod tests {
use crate::assert_snapshot;
use crate::common::localhost::{start_localhost_context, NoopSessionBuilder};
use crate::common::parquet::register_parquet_tables;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion_distributed::ArrowFlightReadExec;
use datafusion_distributed::test_utils::localhost::{
start_localhost_context, NoopSessionBuilder,
};
use datafusion_distributed::test_utils::parquet::register_parquet_tables;
use datafusion_distributed::{assert_snapshot, ArrowFlightReadExec};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
Expand Down
13 changes: 5 additions & 8 deletions tests/stage_planning.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#[allow(dead_code)]
mod common;

#[cfg(test)]
#[cfg(all(feature = "integration", test))]
mod tests {
use crate::assert_snapshot;
use crate::common::tpch::tpch_query;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_distributed::physical_optimizer::DistributedPhysicalOptimizerRule;
use datafusion_distributed::stage::{display_stage_graphviz, ExecutionStage};
use datafusion_distributed::assert_snapshot;
use datafusion_distributed::test_utils::tpch::tpch_query;
use datafusion_distributed::DistributedPhysicalOptimizerRule;
use datafusion_distributed::{display_stage_graphviz, ExecutionStage};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
Expand Down
Loading