Skip to content

Commit f63c6a6

Browse files
authored
feat: implement framework to support multiple pyspark benchmarks (#3080)
1 parent 57780bc commit f63c6a6

File tree

6 files changed

+491
-66
lines changed

6 files changed

+491
-66
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ dev/release/comet-rm/workdir
2020
spark/benchmarks
2121
.DS_Store
2222
comet-event-trace.json
23+
__pycache__

benchmarks/pyspark/README.md

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@ specific language governing permissions and limitations
1717
under the License.
1818
-->
1919

20-
# Shuffle Size Comparison Benchmark
20+
# PySpark Benchmarks
2121

22-
Compares shuffle file sizes between Spark, Comet JVM, and Comet Native shuffle implementations.
22+
A suite of PySpark benchmarks for comparing performance between Spark, Comet JVM, and Comet Native implementations.
23+
24+
## Available Benchmarks
25+
26+
Run `python run_benchmark.py --list-benchmarks` to see all available benchmarks:
27+
28+
- **shuffle-hash** - Shuffle all columns using hash partitioning on group_key
29+
- **shuffle-roundrobin** - Shuffle all columns using round-robin partitioning
2330

2431
## Prerequisites
2532

@@ -56,42 +63,116 @@ spark-submit \
5663
| `--rows`, `-r` | 10000000 | Number of rows |
5764
| `--partitions`, `-p` | 200 | Number of output partitions |
5865

59-
## Step 2: Run Benchmark
66+
## Step 2: Run Benchmarks
6067

61-
Run benchmarks and check Spark UI for shuffle sizes:
68+
### List Available Benchmarks
6269

6370
```bash
64-
SPARK_MASTER=spark://master:7077 \
65-
EXECUTOR_MEMORY=16g \
66-
./run_all_benchmarks.sh /tmp/shuffle-benchmark-data
71+
python run_benchmark.py --list-benchmarks
6772
```
6873

69-
Or run individual modes:
74+
### Run Individual Benchmarks
75+
76+
You can run specific benchmarks by name:
7077

7178
```bash
72-
# Spark baseline
79+
# Hash partitioning shuffle - Spark baseline
7380
spark-submit --master spark://master:7077 \
74-
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode spark
81+
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode spark --benchmark shuffle-hash
7582

76-
# Comet JVM shuffle
83+
# Round-robin shuffle - Spark baseline
84+
spark-submit --master spark://master:7077 \
85+
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode spark --benchmark shuffle-roundrobin
86+
87+
# Hash partitioning - Comet JVM shuffle
7788
spark-submit --master spark://master:7077 \
7889
--jars /path/to/comet.jar \
7990
--conf spark.comet.enabled=true \
8091
--conf spark.comet.exec.shuffle.enabled=true \
8192
--conf spark.comet.shuffle.mode=jvm \
8293
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
83-
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode jvm
94+
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode jvm --benchmark shuffle-hash
8495

85-
# Comet Native shuffle
96+
# Round-robin - Comet Native shuffle
8697
spark-submit --master spark://master:7077 \
8798
--jars /path/to/comet.jar \
8899
--conf spark.comet.enabled=true \
89100
--conf spark.comet.exec.shuffle.enabled=true \
90-
--conf spark.comet.shuffle.mode=native \
101+
--conf spark.comet.exec.shuffle.mode=native \
91102
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
92-
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode native
103+
run_benchmark.py --data /tmp/shuffle-benchmark-data --mode native --benchmark shuffle-roundrobin
104+
```
105+
106+
### Run All Benchmarks
107+
108+
Use the provided script to run all benchmarks across all modes:
109+
110+
```bash
111+
SPARK_MASTER=spark://master:7077 \
112+
EXECUTOR_MEMORY=16g \
113+
./run_all_benchmarks.sh /tmp/shuffle-benchmark-data
93114
```
94115

95116
## Checking Results
96117

97118
Open the Spark UI (default: http://localhost:4040) during each benchmark run to compare shuffle write sizes in the Stages tab.
119+
120+
## Adding New Benchmarks
121+
122+
The benchmark framework makes it easy to add new benchmarks:
123+
124+
1. **Create a benchmark class** in `benchmarks/` directory (or add to existing file):
125+
126+
```python
127+
from benchmarks.base import Benchmark
128+
129+
class MyBenchmark(Benchmark):
130+
@classmethod
131+
def name(cls) -> str:
132+
return "my-benchmark"
133+
134+
@classmethod
135+
def description(cls) -> str:
136+
return "Description of what this benchmark does"
137+
138+
def run(self) -> Dict[str, Any]:
139+
# Read data
140+
df = self.spark.read.parquet(self.data_path)
141+
142+
# Run your benchmark operation
143+
def benchmark_operation():
144+
result = df.filter(...).groupBy(...).agg(...)
145+
result.write.mode("overwrite").parquet("/tmp/output")
146+
147+
# Time it
148+
duration_ms = self._time_operation(benchmark_operation)
149+
150+
return {
151+
'duration_ms': duration_ms,
152+
# Add any other metrics you want to track
153+
}
154+
```
155+
156+
2. **Register the benchmark** in `benchmarks/__init__.py`:
157+
158+
```python
159+
from .my_module import MyBenchmark
160+
161+
_BENCHMARK_REGISTRY = {
162+
# ... existing benchmarks
163+
MyBenchmark.name(): MyBenchmark,
164+
}
165+
```
166+
167+
3. **Run your new benchmark**:
168+
169+
```bash
170+
python run_benchmark.py --data /path/to/data --mode spark --benchmark my-benchmark
171+
```
172+
173+
The base `Benchmark` class provides:
174+
175+
- Automatic timing via `_time_operation()`
176+
- Standard output formatting via `execute_timed()`
177+
- Access to SparkSession, data path, and mode
178+
- Spark configuration printing
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#!/usr/bin/env python3
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Benchmark registry for PySpark benchmarks.
21+
22+
This module provides a central registry for discovering and running benchmarks.
23+
"""
24+
25+
from typing import Dict, Type, List
26+
27+
from .base import Benchmark
28+
from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark
29+
30+
31+
# Registry of all available benchmarks
32+
_BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = {
33+
ShuffleHashBenchmark.name(): ShuffleHashBenchmark,
34+
ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark,
35+
}
36+
37+
38+
def get_benchmark(name: str) -> Type[Benchmark]:
39+
"""
40+
Get a benchmark class by name.
41+
42+
Args:
43+
name: Benchmark name
44+
45+
Returns:
46+
Benchmark class
47+
48+
Raises:
49+
KeyError: If benchmark name is not found
50+
"""
51+
if name not in _BENCHMARK_REGISTRY:
52+
available = ", ".join(sorted(_BENCHMARK_REGISTRY.keys()))
53+
raise KeyError(
54+
f"Unknown benchmark: {name}. Available benchmarks: {available}"
55+
)
56+
return _BENCHMARK_REGISTRY[name]
57+
58+
59+
def list_benchmarks() -> List[tuple[str, str]]:
60+
"""
61+
List all available benchmarks.
62+
63+
Returns:
64+
List of (name, description) tuples
65+
"""
66+
benchmarks = []
67+
for name in sorted(_BENCHMARK_REGISTRY.keys()):
68+
benchmark_cls = _BENCHMARK_REGISTRY[name]
69+
benchmarks.append((name, benchmark_cls.description()))
70+
return benchmarks
71+
72+
73+
__all__ = [
74+
'Benchmark',
75+
'get_benchmark',
76+
'list_benchmarks',
77+
'ShuffleHashBenchmark',
78+
'ShuffleRoundRobinBenchmark',
79+
]
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#!/usr/bin/env python3
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Base benchmark class providing common functionality for all benchmarks.
21+
"""
22+
23+
import time
24+
from abc import ABC, abstractmethod
25+
from typing import Dict, Any
26+
27+
from pyspark.sql import SparkSession
28+
29+
30+
class Benchmark(ABC):
31+
"""Base class for all PySpark benchmarks."""
32+
33+
def __init__(self, spark: SparkSession, data_path: str, mode: str):
34+
"""
35+
Initialize benchmark.
36+
37+
Args:
38+
spark: SparkSession instance
39+
data_path: Path to input data
40+
mode: Execution mode (spark, jvm, native)
41+
"""
42+
self.spark = spark
43+
self.data_path = data_path
44+
self.mode = mode
45+
46+
@classmethod
47+
@abstractmethod
48+
def name(cls) -> str:
49+
"""Return the benchmark name (used for CLI)."""
50+
pass
51+
52+
@classmethod
53+
@abstractmethod
54+
def description(cls) -> str:
55+
"""Return a short description of the benchmark."""
56+
pass
57+
58+
@abstractmethod
59+
def run(self) -> Dict[str, Any]:
60+
"""
61+
Run the benchmark and return results.
62+
63+
Returns:
64+
Dictionary containing benchmark results (must include 'duration_ms')
65+
"""
66+
pass
67+
68+
def execute_timed(self) -> Dict[str, Any]:
69+
"""
70+
Execute the benchmark with timing and standard output.
71+
72+
Returns:
73+
Dictionary containing benchmark results
74+
"""
75+
print(f"\n{'=' * 80}")
76+
print(f"Benchmark: {self.name()}")
77+
print(f"Mode: {self.mode.upper()}")
78+
print(f"{'=' * 80}")
79+
print(f"Data path: {self.data_path}")
80+
81+
# Print relevant Spark configuration
82+
self._print_spark_config()
83+
84+
# Clear cache before running
85+
self.spark.catalog.clearCache()
86+
87+
# Run the benchmark
88+
print(f"\nRunning benchmark...")
89+
results = self.run()
90+
91+
# Print results
92+
print(f"\nDuration: {results['duration_ms']:,} ms")
93+
if 'row_count' in results:
94+
print(f"Rows processed: {results['row_count']:,}")
95+
96+
# Print any additional metrics
97+
for key, value in results.items():
98+
if key not in ['duration_ms', 'row_count']:
99+
print(f"{key}: {value}")
100+
101+
print(f"{'=' * 80}\n")
102+
103+
return results
104+
105+
def _print_spark_config(self):
106+
"""Print relevant Spark configuration."""
107+
conf = self.spark.sparkContext.getConf()
108+
print(f"Shuffle manager: {conf.get('spark.shuffle.manager', 'default')}")
109+
print(f"Comet enabled: {conf.get('spark.comet.enabled', 'false')}")
110+
print(f"Comet shuffle enabled: {conf.get('spark.comet.exec.shuffle.enabled', 'false')}")
111+
print(f"Comet shuffle mode: {conf.get('spark.comet.shuffle.mode', 'not set')}")
112+
print(f"Spark UI: {self.spark.sparkContext.uiWebUrl}")
113+
114+
def _time_operation(self, operation_fn):
115+
"""
116+
Time an operation and return duration in milliseconds.
117+
118+
Args:
119+
operation_fn: Function to time (takes no arguments)
120+
121+
Returns:
122+
Duration in milliseconds
123+
"""
124+
start_time = time.time()
125+
operation_fn()
126+
duration_ms = int((time.time() - start_time) * 1000)
127+
return duration_ms

0 commit comments

Comments
 (0)