Skip to content

Commit 5ad36d8

Browse files
committed
move validation to integration tests
1 parent 7ffcf95 commit 5ad36d8

22 files changed

+161
-234
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
[workspace]
2-
members = [
3-
"benchmarks"
4-
]
2+
members = ["benchmarks"]
53

64
[workspace.dependencies]
75
datafusion = { version = "49.0.0" }
@@ -37,14 +35,16 @@ tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d82343252
3735
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
3836
parquet = { version = "55.2.0", optional = true }
3937
arrow = { version = "55.2.0", optional = true }
38+
tokio-stream = { version = "0.1.17", optional = true }
4039

4140
[features]
4241
integration = [
4342
"insta",
4443
"tpchgen",
45-
"tpchgen-arrow",
44+
"tpchgen-arrow",
4645
"parquet",
47-
"arrow"
46+
"arrow",
47+
"tokio-stream",
4848
]
4949

5050
[dev-dependencies]

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ pub struct RunOpt {
103103
/// The maximum number of partitions per task.
104104
#[structopt(long = "ppt")]
105105
partitions_per_task: Option<usize>,
106-
107-
#[structopt(long = "validate")]
108-
validate: bool,
109106
}
110107

111108
#[async_trait]
@@ -157,7 +154,7 @@ impl SessionBuilder for RunOpt {
157154

158155
impl RunOpt {
159156
pub async fn run(self) -> Result<()> {
160-
let (ctx, _guard) = start_localhost_context([40051], self.clone()).await;
157+
let (ctx, _guard) = start_localhost_context(1, self.clone()).await;
161158
println!("Running benchmarks with the following options: {self:?}");
162159
let query_range = match self.query {
163160
Some(query_id) => query_id..=query_id,
@@ -225,42 +222,11 @@ impl RunOpt {
225222
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
226223
);
227224

228-
let valid = if self.validate {
229-
let mut single_node_result = vec![];
230-
for (i, query) in sql.iter().enumerate() {
231-
if i == result_stmt {
232-
single_node_result = self.execute_query(&single_node_ctx, query).await?;
233-
} else {
234-
self.execute_query(&single_node_ctx, query).await?;
235-
}
236-
}
237-
238-
let res = pretty::pretty_format_batches(&result)?.to_string();
239-
let single_node_res =
240-
pretty::pretty_format_batches(&single_node_result)?.to_string();
241-
res == single_node_res
242-
} else {
243-
false
244-
};
245-
246-
query_results.push(QueryResult {
247-
valid,
248-
elapsed,
249-
row_count,
250-
});
225+
query_results.push(QueryResult { elapsed, row_count });
251226
}
252227

253228
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
254-
let valid_str = if self.validate {
255-
if query_results[query_results.len() - 1].valid {
256-
"valid"
257-
} else {
258-
"invalid"
259-
}
260-
} else {
261-
""
262-
};
263-
println!("Query {query_id} avg time: {avg:.2} ms {valid_str}");
229+
println!("Query {query_id} avg time: {avg:.2} ms");
264230

265231
// Print memory stats using mimalloc (only when compiled with --features mimalloc_extended)
266232
print_memory_stats();

benchmarks/src/util/run.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ pub struct BenchQuery {
9696
pub struct QueryResult {
9797
pub elapsed: Duration,
9898
pub row_count: usize,
99-
pub valid: bool,
10099
}
101100
/// collects benchmark run data and then serializes it at the end
102101
pub struct BenchmarkRun {

src/errors/arrow_error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ mod tests {
224224
let (recovered_error, recovered_ctx) = proto.to_arrow_error();
225225

226226
if original_error.to_string() != recovered_error.to_string() {
227-
println!("original error: {}", original_error.to_string());
228-
println!("recovered error: {}", recovered_error.to_string());
227+
println!("original error: {}", original_error);
228+
println!("recovered error: {}", recovered_error);
229229
}
230230

231231
assert_eq!(original_error.to_string(), recovered_error.to_string());

src/errors/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ mod schema_error;
1717
pub fn datafusion_error_to_tonic_status(err: &DataFusionError) -> tonic::Status {
1818
let err = DataFusionErrorProto::from_datafusion_error(err);
1919
let err = err.encode_to_vec();
20-
let status = tonic::Status::with_details(tonic::Code::Internal, "DataFusionError", err.into());
21-
status
20+
21+
tonic::Status::with_details(tonic::Code::Internal, "DataFusionError", err.into())
2222
}
2323

2424
/// Decodes a [DataFusionError] from a [tonic::Status] error. If the provided [tonic::Status]

src/errors/schema_error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ impl SchemaErrorProto {
198198
valid_fields,
199199
} => SchemaErrorProto {
200200
inner: Some(SchemaErrorInnerProto::FieldNotFound(FieldNotFoundProto {
201-
field: Some(Box::new(ColumnProto::from_column(&field))),
201+
field: Some(Box::new(ColumnProto::from_column(field))),
202202
valid_fields: valid_fields.iter().map(ColumnProto::from_column).collect(),
203203
})),
204204
backtrace: backtrace.cloned(),

src/flight_service/do_get.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ use arrow_flight::flight_service_server::FlightService;
1010
use arrow_flight::Ticket;
1111
use datafusion::execution::{SessionState, SessionStateBuilder};
1212
use datafusion::optimizer::OptimizerConfig;
13-
use datafusion::physical_plan::ExecutionPlan;
1413
use futures::TryStreamExt;
1514
use prost::Message;
1615
use std::sync::Arc;
17-
use tokio::sync::OnceCell;
1816
use tonic::{Request, Response, Status};
1917

2018
use super::service::StageKey;
@@ -68,13 +66,6 @@ impl ArrowFlightEndpoint {
6866

6967
let inner_plan = stage.plan.clone();
7068

71-
/*println!(
72-
"{} Task {:?} executing partition {}",
73-
stage.name(),
74-
task.partition_group,
75-
partition
76-
);*/
77-
7869
let stream = inner_plan
7970
.execute(partition, state.task_ctx())
8071
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;

src/physical_optimizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datafusion::{
1313
error::Result,
1414
physical_optimizer::PhysicalOptimizerRule,
1515
physical_plan::{
16-
displayable, repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties,
16+
repartition::RepartitionExec, ExecutionPlan, ExecutionPlanProperties,
1717
},
1818
};
1919
use uuid::Uuid;

src/plan/arrow_flight_read.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@ use arrow_flight::error::FlightError;
1111
use arrow_flight::flight_service_client::FlightServiceClient;
1212
use arrow_flight::Ticket;
1313
use datafusion::arrow::datatypes::SchemaRef;
14-
use datafusion::arrow::util::pretty::pretty_format_batches;
1514
use datafusion::common::{exec_err, internal_datafusion_err, internal_err, plan_err};
1615
use datafusion::error::DataFusionError;
1716
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1817
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
1918
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2019
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2120
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
22-
use futures::{future, StreamExt, TryFutureExt, TryStreamExt};
21+
use futures::{future, TryFutureExt, TryStreamExt};
2322
use prost::Message;
2423
use std::any::Any;
2524
use std::fmt::Formatter;
@@ -192,7 +191,6 @@ impl ExecutionPlan for ArrowFlightReadExec {
192191

193192
let stream = async move {
194193
let futs = child_stage_tasks.iter().enumerate().map(|(i, task)| {
195-
let i_capture = i;
196194
let child_stage_proto_capture = child_stage_proto.clone();
197195
let channel_manager_capture = channel_manager.clone();
198196
let schema = schema.clone();
@@ -211,7 +209,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
211209
stage_proto: Some(child_stage_proto_capture),
212210
partition: partition as u64,
213211
stage_key: Some(key),
214-
task_number: i_capture as u64,
212+
task_number: i as u64,
215213
}
216214
.encode_to_vec()
217215
.into();

0 commit comments

Comments
 (0)