Skip to content

Commit 83e2793

Browse files
authored
Merge branch 'main' into alamb/update_dev_docs_for_agents
2 parents 20db17d + c6f7145 commit 83e2793

File tree

19 files changed

+442
-153
lines changed

19 files changed

+442
-153
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ mimalloc_extended = ["libmimalloc-sys/extended"]
4040

4141
[dependencies]
4242
arrow = { workspace = true }
43+
async-trait = "0.1"
44+
bytes = { workspace = true }
4345
clap = { version = "4.5.60", features = ["derive"] }
4446
datafusion = { workspace = true, default-features = true }
4547
datafusion-common = { workspace = true, default-features = true }

benchmarks/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -879,13 +879,13 @@ The benchmark includes queries that:
879879

880880
The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
881881
```bash
882-
./bench.sh data data_sorted_clickbench
882+
./bench.sh data clickbench_sorted
883883
```
884884

885885
To create the sorted dataset, for example with 16GB of memory, run:
886886

887887
```bash
888-
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
888+
DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted
889889
```
890890

891891
This command will:
@@ -896,7 +896,7 @@ This command will:
896896
#### Running the Benchmark
897897

898898
```bash
899-
./bench.sh run data_sorted_clickbench
899+
./bench.sh run clickbench_sorted
900900
```
901901

902902
This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.

benchmarks/bench.sh

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
4242
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
4343
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
4444
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
45+
SIMULATE_LATENCY=${SIMULATE_LATENCY:-false}
46+
47+
# Build latency arg based on SIMULATE_LATENCY setting
48+
LATENCY_ARG=""
49+
if [ "$SIMULATE_LATENCY" = "true" ]; then
50+
LATENCY_ARG="--simulate-latency"
51+
fi
4552

4653
usage() {
4754
echo "
@@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary
141148
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
142149
RESULTS_NAME folder where the benchmark files are stored
143150
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
151+
SIMULATE_LATENCY Simulate object store latency to mimic S3 (default false)
144152
DATAFUSION_* Set the given datafusion configuration
145153
"
146154
exit 1
@@ -371,6 +379,7 @@ main() {
371379
echo "RESULTS_DIR: ${RESULTS_DIR}"
372380
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
373381
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
382+
echo "SIMULATE_LATENCY: ${SIMULATE_LATENCY}"
374383
echo "***************************"
375384

376385
# navigate to the appropriate directory
@@ -655,7 +664,7 @@ run_tpch() {
655664
echo "Running tpch benchmark..."
656665

657666
FORMAT=$2
658-
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG}
667+
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
659668
}
660669

661670
# Runs the tpch in memory (needs tpch parquet data)
@@ -671,7 +680,7 @@ run_tpch_mem() {
671680
echo "RESULTS_FILE: ${RESULTS_FILE}"
672681
echo "Running tpch_mem benchmark..."
673682
# -m means in memory
674-
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
683+
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
675684
}
676685

677686
# Runs the tpcds benchmark
@@ -691,7 +700,7 @@ run_tpcds() {
691700
echo "RESULTS_FILE: ${RESULTS_FILE}"
692701
echo "Running tpcds benchmark..."
693702

694-
debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG}
703+
debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
695704
}
696705

697706
# Runs the compile profile benchmark helper
@@ -713,7 +722,7 @@ run_cancellation() {
713722
RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
714723
echo "RESULTS_FILE: ${RESULTS_FILE}"
715724
echo "Running cancellation benchmark..."
716-
debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
725+
debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" ${LATENCY_ARG}
717726
}
718727

719728

@@ -767,15 +776,15 @@ run_clickbench_1() {
767776
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
768777
echo "RESULTS_FILE: ${RESULTS_FILE}"
769778
echo "Running clickbench (1 file) benchmark..."
770-
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
779+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
771780
}
772781

773782
# Runs the clickbench benchmark with the partitioned parquet dataset (100 files)
774783
run_clickbench_partitioned() {
775784
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
776785
echo "RESULTS_FILE: ${RESULTS_FILE}"
777786
echo "Running clickbench (partitioned, 100 files) benchmark..."
778-
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
787+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
779788
}
780789

781790

@@ -784,7 +793,7 @@ run_clickbench_pushdown() {
784793
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
785794
echo "RESULTS_FILE: ${RESULTS_FILE}"
786795
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
787-
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
796+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
788797
}
789798

790799

@@ -793,7 +802,7 @@ run_clickbench_extended() {
793802
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
794803
echo "RESULTS_FILE: ${RESULTS_FILE}"
795804
echo "Running clickbench (1 file) extended benchmark..."
796-
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG}
805+
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
797806
}
798807

799808
# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors)
@@ -908,7 +917,7 @@ run_imdb() {
908917
RESULTS_FILE="${RESULTS_DIR}/imdb.json"
909918
echo "RESULTS_FILE: ${RESULTS_FILE}"
910919
echo "Running imdb benchmark..."
911-
debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
920+
debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
912921
}
913922

914923
data_h2o() {
@@ -980,7 +989,7 @@ run_h2o() {
980989
--path "${H2O_DIR}/${FILE_NAME}" \
981990
--queries-path "${QUERY_FILE}" \
982991
-o "${RESULTS_FILE}" \
983-
${QUERY_ARG}
992+
${QUERY_ARG} ${LATENCY_ARG}
984993
}
985994

986995
# Utility function to run h2o join/window benchmark
@@ -1032,7 +1041,7 @@ h2o_runner() {
10321041
--join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
10331042
--queries-path "${QUERY_FILE}" \
10341043
-o "${RESULTS_FILE}" \
1035-
${QUERY_ARG}
1044+
${QUERY_ARG} ${LATENCY_ARG}
10361045
}
10371046

10381047
# Runners for h2o join benchmark
@@ -1073,7 +1082,7 @@ run_sort_tpch() {
10731082
echo "RESULTS_FILE: ${RESULTS_FILE}"
10741083
echo "Running sort tpch benchmark..."
10751084

1076-
debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
1085+
debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
10771086
}
10781087

10791088
# Runs the sort tpch integration benchmark with limit 100 (topk)
@@ -1083,15 +1092,15 @@ run_topk_tpch() {
10831092
echo "RESULTS_FILE: ${RESULTS_FILE}"
10841093
echo "Running topk tpch benchmark..."
10851094

1086-
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
1095+
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} ${LATENCY_ARG}
10871096
}
10881097

10891098
# Runs the nlj benchmark
10901099
run_nlj() {
10911100
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
10921101
echo "RESULTS_FILE: ${RESULTS_FILE}"
10931102
echo "Running nlj benchmark..."
1094-
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1103+
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
10951104
}
10961105

10971106
# Runs the hj benchmark
@@ -1100,15 +1109,15 @@ run_hj() {
11001109
RESULTS_FILE="${RESULTS_DIR}/hj.json"
11011110
echo "RESULTS_FILE: ${RESULTS_FILE}"
11021111
echo "Running hj benchmark..."
1103-
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
1112+
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
11041113
}
11051114

11061115
# Runs the smj benchmark
11071116
run_smj() {
11081117
RESULTS_FILE="${RESULTS_DIR}/smj.json"
11091118
echo "RESULTS_FILE: ${RESULTS_FILE}"
11101119
echo "Running smj benchmark..."
1111-
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1120+
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
11121121
}
11131122

11141123

@@ -1250,7 +1259,7 @@ run_clickbench_sorted() {
12501259
--sorted-by "EventTime" \
12511260
-c datafusion.optimizer.prefer_existing_sort=true \
12521261
-o "${RESULTS_FILE}" \
1253-
${QUERY_ARG}
1262+
${QUERY_ARG} ${LATENCY_ARG}
12541263
}
12551264

12561265

benchmarks/src/clickbench.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ impl RunOpt {
207207
}
208208
}
209209

210-
let rt_builder = self.common.runtime_env_builder()?;
211-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
210+
let rt = self.common.build_runtime()?;
211+
let ctx = SessionContext::new_with_config_rt(config, rt);
212212

213213
self.register_hits(&ctx).await?;
214214

benchmarks/src/h2o.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ impl RunOpt {
8484
};
8585

8686
let config = self.common.config()?;
87-
let rt_builder = self.common.runtime_env_builder()?;
88-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
87+
let rt = self.common.build_runtime()?;
88+
let ctx = SessionContext::new_with_config_rt(config, rt);
8989

9090
// Register tables depending on which h2o benchmark is being run
9191
// (groupby/join/window)

benchmarks/src/hj.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ impl RunOpt {
324324
};
325325

326326
let config = self.common.config()?;
327-
let rt_builder = self.common.runtime_env_builder()?;
328-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
327+
let rt = self.common.build_runtime()?;
328+
let ctx = SessionContext::new_with_config_rt(config, rt);
329329

330330
if let Some(path) = &self.path {
331331
for table in &["lineitem", "supplier", "nation", "customer"] {

benchmarks/src/imdb/run.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ impl RunOpt {
312312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313313
config.options_mut().execution.hash_join_buffering_capacity =
314314
self.hash_join_buffering_capacity;
315-
let rt_builder = self.common.runtime_env_builder()?;
316-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
315+
let rt = self.common.build_runtime()?;
316+
let ctx = SessionContext::new_with_config_rt(config, rt);
317317

318318
// register tables
319319
self.register_tables(&ctx).await?;
@@ -523,6 +523,7 @@ mod tests {
523523
memory_limit: None,
524524
sort_spill_reservation_bytes: None,
525525
debug: false,
526+
simulate_latency: false,
526527
};
527528
let opt = RunOpt {
528529
query: Some(query),
@@ -560,6 +561,7 @@ mod tests {
560561
memory_limit: None,
561562
sort_spill_reservation_bytes: None,
562563
debug: false,
564+
simulate_latency: false,
563565
};
564566
let opt = RunOpt {
565567
query: Some(query),

benchmarks/src/nlj.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ impl RunOpt {
207207
};
208208

209209
let config = self.common.config()?;
210-
let rt_builder = self.common.runtime_env_builder()?;
211-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
210+
let rt = self.common.build_runtime()?;
211+
let ctx = SessionContext::new_with_config_rt(config, rt);
212212

213213
let mut benchmark_run = BenchmarkRun::new();
214214
for query_id in query_range {

benchmarks/src/smj.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,8 @@ impl RunOpt {
433433
let mut config = self.common.config()?;
434434
// Disable hash joins to force SMJ
435435
config = config.set_bool("datafusion.optimizer.prefer_hash_join", false);
436-
let rt_builder = self.common.runtime_env_builder()?;
437-
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
436+
let rt = self.common.build_runtime()?;
437+
let ctx = SessionContext::new_with_config_rt(config, rt);
438438

439439
let mut benchmark_run = BenchmarkRun::new();
440440
for query_id in query_range {

0 commit comments

Comments
 (0)