Skip to content

Commit 210ea86

Browse files
committed
allow collect left hashjoins
1 parent b247b82 commit 210ea86

File tree

8 files changed

+90
-288
lines changed

8 files changed

+90
-288
lines changed

benchmarks/gen-tpch.sh

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
set -e
44

5-
SCALE_FACTOR=1
5+
SCALE_FACTOR=10
66

77
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
8-
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
8+
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
99
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
1010
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
1111

12-
if [ -z "$SCALE_FACTOR" ] ; then
12+
if [ -z "$SCALE_FACTOR" ]; then
1313
echo "Internal error: Scale factor not specified"
1414
exit 1
1515
fi
@@ -36,7 +36,7 @@ if test -f "${FILE}"; then
3636
else
3737
echo " Copying answers to ${TPCH_DIR}/answers"
3838
mkdir -p "${TPCH_DIR}/answers"
39-
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
39+
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
4040
fi
4141

4242
# Create 'parquet' files from tbl
@@ -45,9 +45,9 @@ if test -d "${FILE}"; then
4545
echo " parquet files exist ($FILE exists)."
4646
else
4747
echo " creating parquet files using benchmark binary ..."
48-
pushd "${SCRIPT_DIR}" > /dev/null
48+
pushd "${SCRIPT_DIR}" >/dev/null
4949
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
50-
popd > /dev/null
50+
popd >/dev/null
5151
fi
5252

5353
# Create 'csv' files from tbl
@@ -56,8 +56,7 @@ if test -d "${FILE}"; then
5656
echo " csv files exist ($FILE exists)."
5757
else
5858
echo " creating csv files using benchmark binary ..."
59-
pushd "${SCRIPT_DIR}" > /dev/null
59+
pushd "${SCRIPT_DIR}" >/dev/null
6060
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv
61-
popd > /dev/null
61+
popd >/dev/null
6262
fi
63-

benchmarks/src/tpch/run.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,19 @@ impl SessionBuilder for RunOpt {
121121
.with_collect_statistics(!self.disable_statistics)
122122
.with_target_partitions(self.partitions());
123123

124-
// FIXME: these three options are critical for the correct function of the library
125-
// but we are not enforcing that the user sets them. They are here at the moment
126-
// but we should figure out a way to do this better.
127-
config
128-
.options_mut()
129-
.optimizer
130-
.hash_join_single_partition_threshold = 0;
131-
config
132-
.options_mut()
133-
.optimizer
134-
.hash_join_single_partition_threshold_rows = 0;
135-
136-
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124+
// // FIXME: these three options are critical for the correct function of the library
125+
// // but we are not enforcing that the user sets them. They are here at the moment
126+
// // but we should figure out a way to do this better.
127+
// config
128+
// .options_mut()
129+
// .optimizer
130+
// .hash_join_single_partition_threshold = 0;
131+
// config
132+
// .options_mut()
133+
// .optimizer
134+
// .hash_join_single_partition_threshold_rows = 0;
135+
//
136+
// config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
137137
// end critical options section
138138
let rt_builder = self.common.runtime_env_builder()?;
139139

src/common/ttl_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ where
9494
shard.insert(key);
9595
}
9696
BucketOp::Clear => {
97-
let keys_to_delete = mem::replace(&mut shard, HashSet::new());
97+
let keys_to_delete = mem::take(&mut shard);
9898
for key in keys_to_delete {
9999
data.remove(&key);
100100
}

src/common/util.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
12
use datafusion::error::Result;
3+
use datafusion::physical_plan::joins::PartitionMode;
24
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
35

46
use std::fmt::Write;
7+
use std::sync::Arc;
58

69
pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
710
let mut f = String::new();
@@ -34,3 +37,38 @@ pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<St
3437
visit(plan, 0, &mut f)?;
3538
Ok(f)
3639
}
40+
41+
/// Returns a boolean indicating if this stage can be divided into more than one task.
42+
///
43+
/// Some Plan nodes need to materialize all partitions inorder to execute such as
44+
/// NestedLoopJoinExec. Rewriting the plan to accommodate dividing it into tasks
45+
/// would result in redundant work.
46+
///
47+
/// The plans we cannot split are:
48+
/// - NestedLoopJoinExec
49+
/// - HashJoinExec when not in Partitioned mode
50+
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
51+
// recursively check to see if this stages plan contains a NestedLoopJoinExec
52+
let mut has_unsplittable_plan = false;
53+
let search = |f: &Arc<dyn ExecutionPlan>| {
54+
if f.as_any()
55+
.downcast_ref::<datafusion::physical_plan::joins::NestedLoopJoinExec>()
56+
.is_some()
57+
{
58+
has_unsplittable_plan = true;
59+
return Ok(TreeNodeRecursion::Stop);
60+
} else if let Some(hash_join) = f
61+
.as_any()
62+
.downcast_ref::<datafusion::physical_plan::joins::HashJoinExec>()
63+
{
64+
if hash_join.partition_mode() != &PartitionMode::Partitioned {
65+
has_unsplittable_plan = true;
66+
return Ok(TreeNodeRecursion::Stop);
67+
}
68+
}
69+
70+
Ok(TreeNodeRecursion::Continue)
71+
};
72+
plan.apply(search)?;
73+
Ok(!has_unsplittable_plan)
74+
}

src/physical_optimizer.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use super::stage::ExecutionStage;
4+
use crate::common::util::can_be_divided;
45
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
56
use datafusion::common::tree_node::TreeNodeRecursion;
67
use datafusion::error::DataFusionError;
@@ -83,12 +84,14 @@ impl DistributedPhysicalOptimizerRule {
8384
internal_datafusion_err!("Expected RepartitionExec to have a child"),
8485
)?);
8586

86-
let maybe_isolated_plan = if let Some(ppt) = self.partitions_per_task {
87-
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
88-
plan.with_new_children(vec![isolated])?
89-
} else {
90-
plan
91-
};
87+
let maybe_isolated_plan =
88+
if can_be_divided(&plan)? && self.partitions_per_task.is_some() {
89+
let ppt = self.partitions_per_task.unwrap();
90+
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
91+
plan.with_new_children(vec![isolated])?
92+
} else {
93+
plan
94+
};
9295

9396
return Ok(Transformed::yes(Arc::new(
9497
ArrowFlightReadExec::new_pending(
@@ -120,7 +123,7 @@ impl DistributedPhysicalOptimizerRule {
120123
) -> Result<ExecutionStage, DataFusionError> {
121124
let mut inputs = vec![];
122125

123-
let distributed = plan.transform_down(|plan| {
126+
let distributed = plan.clone().transform_down(|plan| {
124127
let Some(node) = plan.as_any().downcast_ref::<ArrowFlightReadExec>() else {
125128
return Ok(Transformed::no(plan));
126129
};
@@ -137,9 +140,13 @@ impl DistributedPhysicalOptimizerRule {
137140
let mut stage = ExecutionStage::new(query_id, *num, distributed.data, inputs);
138141
*num += 1;
139142

140-
if let Some(partitions_per_task) = self.partitions_per_task {
141-
stage = stage.with_maximum_partitions_per_task(partitions_per_task);
142-
}
143+
stage = match (self.partitions_per_task, can_be_divided(&plan)?) {
144+
(Some(partitions_per_task), true) => {
145+
stage.with_maximum_partitions_per_task(partitions_per_task)
146+
}
147+
(_, _) => stage,
148+
};
149+
143150
stage.depth = depth;
144151

145152
Ok(stage)

src/stage/execution_stage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use datafusion::common::internal_err;
4+
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
45
use datafusion::error::{DataFusionError, Result};
56
use datafusion::execution::TaskContext;
67
use datafusion::physical_plan::ExecutionPlan;

src/test_utils/tpch.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,18 @@ where
160160

161161
macro_rules! must_generate_tpch_table {
162162
($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => {
163-
generate_table(
164-
// TODO: Consider adjusting the partitions and batch sizes.
165-
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
166-
$name,
167-
$data_dir,
168-
)
169-
.expect(concat!("Failed to generate ", $name, " table"));
163+
let data_dir = $data_dir.join(format!("{}.parquet", $name));
164+
fs::create_dir_all(data_dir.clone()).expect("Failed to create data directory");
165+
// create three partitions for the table
166+
(1..=3).for_each(|part| {
167+
generate_table(
168+
// TODO: Consider adjusting the partitions and batch sizes.
169+
$arrow::new($generator::new(SCALE_FACTOR, part, 3)).with_batch_size(1000),
170+
&format!("{}.parquet", part),
171+
&data_dir.clone().into_boxed_path(),
172+
)
173+
.expect(concat!("Failed to generate ", $name, " table"));
174+
});
170175
};
171176
}
172177

0 commit comments

Comments
 (0)