Skip to content

Commit 296f338

Browse files
committed
feat: add shuffle size comparison benchmark for #3882
Add a PySpark benchmark that measures shuffle write bytes via the Spark REST API, making it easy to compare shuffle file sizes between Spark and Comet shuffle implementations.
1 parent 67624a8 commit 296f338

File tree

3 files changed

+230
-0
lines changed

3 files changed

+230
-0
lines changed

benchmarks/pyspark/benchmarks/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626

2727
from .base import Benchmark
2828
from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark
29+
from .shuffle_size import ShuffleSizeBenchmark
2930

3031

3132
# Registry of all available benchmarks
3233
_BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = {
3334
ShuffleHashBenchmark.name(): ShuffleHashBenchmark,
3435
ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark,
36+
ShuffleSizeBenchmark.name(): ShuffleSizeBenchmark,
3537
}
3638

3739

@@ -76,4 +78,5 @@ def list_benchmarks() -> List[tuple[str, str]]:
7678
'list_benchmarks',
7779
'ShuffleHashBenchmark',
7880
'ShuffleRoundRobinBenchmark',
81+
'ShuffleSizeBenchmark',
7982
]
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#!/usr/bin/env python3
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Shuffle size benchmark for measuring shuffle write bytes.
21+
22+
Measures the actual shuffle write bytes reported by Spark to compare
23+
shuffle file sizes between Spark and Comet shuffle implementations.
24+
This is useful for investigating shuffle format overhead (see issue #3882).
25+
"""
26+
27+
import json
28+
import urllib.request
29+
from typing import Dict, Any
30+
31+
from pyspark.sql import DataFrame
32+
33+
from .base import Benchmark
34+
35+
36+
def get_shuffle_write_bytes(spark) -> int:
37+
"""Get total shuffle write bytes from the Spark REST API."""
38+
sc = spark.sparkContext
39+
ui_url = sc.uiWebUrl
40+
url = f"{ui_url}/api/v1/applications/{sc.applicationId}/stages"
41+
with urllib.request.urlopen(url) as resp:
42+
stages = json.loads(resp.read())
43+
return sum(s.get("shuffleWriteBytes", 0) for s in stages)
44+
45+
46+
def format_bytes(b: int) -> str:
47+
"""Format byte count as human-readable string."""
48+
if b >= 1024 ** 3:
49+
return f"{b / 1024 ** 3:.2f} GiB"
50+
elif b >= 1024 ** 2:
51+
return f"{b / 1024 ** 2:.2f} MiB"
52+
else:
53+
return f"{b / 1024:.2f} KiB"
54+
55+
56+
class ShuffleSizeBenchmark(Benchmark):
57+
"""
58+
Benchmark that measures shuffle write bytes via the Spark REST API.
59+
60+
Runs a simple scan -> repartition -> write pipeline and reports
61+
the shuffle write size alongside wall-clock time. Useful for
62+
comparing shuffle format overhead between Spark and Comet.
63+
"""
64+
65+
def __init__(self, spark, data_path: str, mode: str,
66+
num_partitions: int = 200):
67+
super().__init__(spark, data_path, mode)
68+
self.num_partitions = num_partitions
69+
70+
@classmethod
71+
def name(cls) -> str:
72+
return "shuffle-size"
73+
74+
@classmethod
75+
def description(cls) -> str:
76+
return "Measure shuffle write bytes (scan -> repartition -> write)"
77+
78+
def run(self) -> Dict[str, Any]:
79+
df = self.spark.read.parquet(self.data_path)
80+
row_count = df.count()
81+
print(f"Input rows: {row_count:,}")
82+
83+
schema_desc = ", ".join(
84+
f"{f.name}: {f.dataType.simpleString()}" for f in df.schema.fields
85+
)
86+
print(f"Schema: {schema_desc}")
87+
88+
output_path = (
89+
f"/tmp/shuffle-size-benchmark-output-{self.mode}"
90+
)
91+
92+
def benchmark_operation():
93+
df.repartition(self.num_partitions).write.mode(
94+
"overwrite"
95+
).parquet(output_path)
96+
97+
duration_ms = self._time_operation(benchmark_operation)
98+
99+
shuffle_write_bytes = 0
100+
try:
101+
shuffle_write_bytes = get_shuffle_write_bytes(self.spark)
102+
except Exception as e:
103+
print(f"Warning: could not read shuffle metrics: {e}")
104+
105+
bytes_per_record = (
106+
shuffle_write_bytes / row_count if row_count > 0 else 0
107+
)
108+
109+
print(f"Shuffle write: {format_bytes(shuffle_write_bytes)}")
110+
print(f"Bytes/record: {bytes_per_record:.1f}")
111+
112+
return {
113+
"duration_ms": duration_ms,
114+
"row_count": row_count,
115+
"num_partitions": self.num_partitions,
116+
"shuffle_write_bytes": shuffle_write_bytes,
117+
"bytes_per_record": round(bytes_per_record, 1),
118+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
# Compare shuffle write sizes between Spark and Comet shuffle.
22+
#
23+
# This benchmark measures actual shuffle write bytes reported by Spark
24+
# to quantify the overhead of Comet's Arrow IPC shuffle format.
25+
# See https://github.com/apache/datafusion-comet/issues/3882
26+
#
27+
# Prerequisites:
28+
# - SPARK_HOME set to a Spark 3.5 installation
29+
# - Comet JAR built (make)
30+
# - Input parquet data generated (see generate_data.py)
31+
#
32+
# Usage:
33+
# ./run_shuffle_size_benchmark.sh /path/to/parquet/data
34+
#
35+
# Environment variables:
36+
# COMET_JAR Path to Comet JAR (default: auto-detected from repo)
37+
# SPARK_MASTER Spark master URL (default: local[*])
38+
# EXECUTOR_MEMORY Executor memory (default: 16g)
39+
# OFFHEAP_SIZE Off-heap memory for Comet (default: 16g)
40+
41+
set -e
42+
43+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
44+
DATA_PATH="${1:?Usage: $0 /path/to/parquet/data}"
45+
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.15.0-SNAPSHOT.jar}"
46+
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
47+
EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}"
48+
OFFHEAP_SIZE="${OFFHEAP_SIZE:-16g}"
49+
50+
if [ -z "$SPARK_HOME" ]; then
51+
echo "Error: SPARK_HOME is not set"
52+
exit 1
53+
fi
54+
55+
if [ ! -f "$COMET_JAR" ]; then
56+
echo "Error: Comet JAR not found at $COMET_JAR"
57+
echo "Build with 'make' or set COMET_JAR to the correct path."
58+
exit 1
59+
fi
60+
61+
echo "========================================"
62+
echo "Shuffle Size Comparison Benchmark"
63+
echo "========================================"
64+
echo "Data path: $DATA_PATH"
65+
echo "Comet JAR: $COMET_JAR"
66+
echo "Spark master: $SPARK_MASTER"
67+
echo "Executor memory: $EXECUTOR_MEMORY"
68+
echo "Off-heap size: $OFFHEAP_SIZE"
69+
echo "========================================"
70+
71+
# Run Spark baseline (no Comet)
72+
echo ""
73+
echo ">>> Running SPARK (no Comet) shuffle size benchmark..."
74+
$SPARK_HOME/bin/spark-submit \
75+
--master "$SPARK_MASTER" \
76+
--executor-memory "$EXECUTOR_MEMORY" \
77+
--conf spark.comet.enabled=false \
78+
"$SCRIPT_DIR/run_benchmark.py" \
79+
--data "$DATA_PATH" \
80+
--mode spark \
81+
--benchmark shuffle-size
82+
83+
# Run Comet Native shuffle
84+
echo ""
85+
echo ">>> Running COMET NATIVE shuffle size benchmark..."
86+
$SPARK_HOME/bin/spark-submit \
87+
--master "$SPARK_MASTER" \
88+
--executor-memory "$EXECUTOR_MEMORY" \
89+
--jars "$COMET_JAR" \
90+
--driver-class-path "$COMET_JAR" \
91+
--conf spark.executor.extraClassPath="$COMET_JAR" \
92+
--conf spark.plugins=org.apache.spark.CometPlugin \
93+
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
94+
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
95+
--conf spark.memory.offHeap.enabled=true \
96+
--conf spark.memory.offHeap.size="$OFFHEAP_SIZE" \
97+
--conf spark.comet.enabled=true \
98+
--conf spark.comet.exec.shuffle.mode=native \
99+
--conf spark.comet.explainFallback.enabled=true \
100+
"$SCRIPT_DIR/run_benchmark.py" \
101+
--data "$DATA_PATH" \
102+
--mode native \
103+
--benchmark shuffle-size
104+
105+
echo ""
106+
echo "========================================"
107+
echo "BENCHMARK COMPLETE"
108+
echo "========================================"
109+
echo "Compare 'Shuffle write' and 'Bytes/record' between the two runs above."

0 commit comments

Comments
 (0)