Skip to content

Commit ef51c1c

Browse files
committed
refactor test helper and benchmark results
1 parent b8a3244 commit ef51c1c

File tree

5 files changed

+232
-22
lines changed

5 files changed

+232
-22
lines changed

BENCHMARK.md

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,36 @@ Benchmark run on Apple M3 Max (14 cores), macOS Darwin 25.2.0.
5858

5959
| Rows | DuckDB (s) | Spark (s) | Speedup |
6060
|------|------------|-----------|---------|
61-
| 100K | 0.080 | 1.159 | **14.6x** |
62-
| 1M | 0.114 | 1.824 | **16.0x** |
63-
| 5M | 0.243 | 2.491 | **10.3x** |
64-
| 10M | 0.354 | 3.276 | **9.2x** |
65-
| 50M | 1.153 | 10.959 | **9.5x** |
66-
| 130M | 2.792 | 27.385 | **9.8x** |
61+
| 100K | 0.052 | 0.667 | **12.8x** |
62+
| 1M | 0.090 | 1.718 | **19.1x** |
63+
| 5M | 0.221 | 2.591 | **11.7x** |
64+
| 10M | 0.335 | 3.504 | **10.5x** |
65+
| 50M | 1.177 | 12.808 | **10.9x** |
66+
| 130M | 2.897 | 29.570 | **10.2x** |
6767

6868
### Experiment 2: Varying Columns
6969

7070
| Cols | Checks | DuckDB (s) | Spark (s) | Speedup |
7171
|------|--------|------------|-----------|---------|
72-
| 10 | 16 | 0.108 | 1.572 | **14.5x** |
73-
| 20 | 46 | 0.280 | 2.049 | **7.3x** |
74-
| 40 | 106 | 0.824 | 2.760 | **3.3x** |
75-
| 80 | 226 | 2.320 | 4.425 | **1.9x** |
72+
| 10 | 16 | 0.118 | 1.656 | **14.1x** |
73+
| 20 | 46 | 0.286 | 2.129 | **7.5x** |
74+
| 40 | 106 | 0.713 | 2.869 | **4.0x** |
75+
| 80 | 226 | 2.214 | 4.434 | **2.0x** |
7676

7777
### Experiment 3: Column Profiling
7878

7979
| Rows | DuckDB (s) | Spark (s) | Speedup |
8080
|------|------------|-----------|---------|
81-
| 100K | 0.097 | 0.651 | **6.7x** |
82-
| 1M | 0.372 | 0.778 | **2.1x** |
83-
| 5M | 1.446 | 1.898 | **1.3x** |
84-
| 10M | 2.614 | 3.450 | **1.3x** |
81+
| 100K | 0.086 | 0.599 | **7.0x** |
82+
| 1M | 0.388 | 0.814 | **2.1x** |
83+
| 5M | 1.470 | 2.399 | **1.6x** |
84+
| 10M | 2.659 | 4.109 | **1.5x** |
8585

8686
### Key Takeaways
8787

88-
1. **DuckDB is 9-16x faster** for row-scaling validation workloads
89-
2. **Speedup decreases with complexity** - more columns/checks narrow the gap
90-
3. **Profiling converges** - at 10M rows, both engines perform similarly
88+
1. **DuckDB is 10-19x faster** for row-scaling validation workloads
89+
2. **Speedup decreases with complexity** - more columns/checks narrow the gap (14x → 2x)
90+
3. **Profiling converges** - at 10M rows, DuckDB is still 1.5x faster
9191
4. **No JVM overhead** - DuckDB runs natively in Python, no startup cost
9292

9393
## Quick Start

imgs/benchmark_chart.png

4.43 KB
Loading

tests/conftest.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
def spark_connect_server():
1919
"""Session-scoped fixture to start Spark Connect server.
2020
21-
Automatically starts the Spark Connect server if not already running,
22-
using the benchmark configuration. The server is NOT stopped after
23-
tests complete (to allow reuse across test runs).
21+
Automatically starts the Spark Connect server if not already running.
22+
The server is NOT stopped after tests complete (to allow reuse across test runs).
2423
"""
25-
from benchmark.spark_server import SparkConnectServer
26-
from benchmark.config import SparkServerConfig
24+
from tests.helpers.spark_server import SparkConnectServer, SparkServerConfig
2725

2826
config = SparkServerConfig()
2927
server = SparkConnectServer(config)

tests/helpers/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test helpers for PyDeequ tests."""
16+
17+
from tests.helpers.spark_server import SparkConnectServer, SparkServerConfig
18+
19+
__all__ = ["SparkConnectServer", "SparkServerConfig"]

tests/helpers/spark_server.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Spark Connect server management for tests.
16+
17+
This module provides utilities to start and manage a Spark Connect server
18+
for running integration tests that require Spark.
19+
"""
20+
21+
import os
22+
import socket
23+
import subprocess
24+
import time
25+
from dataclasses import dataclass, field
26+
from typing import Optional
27+
28+
29+
@dataclass
30+
class SparkServerConfig:
31+
"""Configuration for Spark Connect server."""
32+
33+
java_home: str = field(
34+
default_factory=lambda: os.environ.get(
35+
"JAVA_HOME",
36+
"/Library/Java/JavaVirtualMachines/amazon-corretto-17.jdk/Contents/Home",
37+
)
38+
)
39+
spark_home: str = field(
40+
default_factory=lambda: os.environ.get(
41+
"SPARK_HOME", "/Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3"
42+
)
43+
)
44+
port: int = 15002
45+
startup_timeout: int = 60
46+
poll_interval: float = 1.0
47+
driver_memory: str = "4g"
48+
executor_memory: str = "4g"
49+
deequ_jar: str = field(
50+
default_factory=lambda: os.environ.get(
51+
"DEEQU_JAR",
52+
"/Volumes/workplace/deequ_rewrite/deequ/target/deequ_2.12-2.1.0b-spark-3.5.jar"
53+
)
54+
)
55+
56+
57+
class SparkConnectServer:
58+
"""Manages Spark Connect server lifecycle for tests."""
59+
60+
def __init__(self, config: Optional[SparkServerConfig] = None):
61+
"""
62+
Initialize Spark Connect server manager.
63+
64+
Args:
65+
config: Server configuration (uses defaults if not provided)
66+
"""
67+
self.config = config or SparkServerConfig()
68+
self._process: Optional[subprocess.Popen] = None
69+
self._started_by_us = False
70+
71+
def is_running(self) -> bool:
72+
"""Check if Spark Connect server is running by attempting to connect."""
73+
try:
74+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
75+
sock.settimeout(1)
76+
result = sock.connect_ex(("localhost", self.config.port))
77+
sock.close()
78+
return result == 0
79+
except (socket.error, OSError):
80+
return False
81+
82+
def start(self) -> float:
83+
"""
84+
Start Spark Connect server if not already running.
85+
86+
Returns:
87+
Time taken to start the server (0 if already running)
88+
89+
Raises:
90+
RuntimeError: If server fails to start within timeout
91+
"""
92+
if self.is_running():
93+
print(f"Spark Connect server already running on port {self.config.port}")
94+
return 0.0
95+
96+
start_time = time.time()
97+
98+
# Build the startup command
99+
start_script = os.path.join(self.config.spark_home, "sbin", "start-connect-server.sh")
100+
101+
if not os.path.exists(start_script):
102+
raise RuntimeError(f"Spark Connect start script not found: {start_script}")
103+
104+
cmd = [
105+
start_script,
106+
"--conf", f"spark.driver.memory={self.config.driver_memory}",
107+
"--conf", f"spark.executor.memory={self.config.executor_memory}",
108+
"--packages", "org.apache.spark:spark-connect_2.12:3.5.0",
109+
"--jars", self.config.deequ_jar,
110+
"--conf", "spark.connect.extensions.relation.classes=com.amazon.deequ.connect.DeequRelationPlugin",
111+
]
112+
113+
# Set up environment
114+
env = os.environ.copy()
115+
env["JAVA_HOME"] = self.config.java_home
116+
env["SPARK_HOME"] = self.config.spark_home
117+
118+
print(f"Starting Spark Connect server on port {self.config.port}...")
119+
print(f" JAVA_HOME: {self.config.java_home}")
120+
print(f" SPARK_HOME: {self.config.spark_home}")
121+
122+
# Start the server
123+
self._process = subprocess.Popen(
124+
cmd,
125+
env=env,
126+
stdout=subprocess.PIPE,
127+
stderr=subprocess.PIPE,
128+
)
129+
self._started_by_us = True
130+
131+
# Wait for server to be ready
132+
deadline = time.time() + self.config.startup_timeout
133+
while time.time() < deadline:
134+
if self.is_running():
135+
elapsed = time.time() - start_time
136+
print(f"Spark Connect server started in {elapsed:.1f}s")
137+
return elapsed
138+
time.sleep(self.config.poll_interval)
139+
140+
# Timeout - try to get error output
141+
if self._process:
142+
self._process.terminate()
143+
_, stderr = self._process.communicate(timeout=5)
144+
error_msg = stderr.decode() if stderr else "Unknown error"
145+
self._process = None
146+
self._started_by_us = False
147+
raise RuntimeError(
148+
f"Spark Connect server failed to start within {self.config.startup_timeout}s: {error_msg[:500]}"
149+
)
150+
151+
raise RuntimeError(
152+
f"Spark Connect server failed to start within {self.config.startup_timeout}s"
153+
)
154+
155+
def stop(self) -> None:
156+
"""Stop Spark Connect server if we started it."""
157+
if not self._started_by_us:
158+
print("Spark Connect server was not started by us, skipping stop")
159+
return
160+
161+
stop_script = os.path.join(self.config.spark_home, "sbin", "stop-connect-server.sh")
162+
163+
if os.path.exists(stop_script):
164+
print("Stopping Spark Connect server...")
165+
env = os.environ.copy()
166+
env["JAVA_HOME"] = self.config.java_home
167+
env["SPARK_HOME"] = self.config.spark_home
168+
169+
try:
170+
subprocess.run(
171+
[stop_script],
172+
env=env,
173+
timeout=30,
174+
capture_output=True,
175+
)
176+
print("Spark Connect server stopped")
177+
except subprocess.TimeoutExpired:
178+
print("Warning: stop script timed out")
179+
except Exception as e:
180+
print(f"Warning: Error stopping server: {e}")
181+
else:
182+
# Fall back to killing the process directly
183+
if self._process:
184+
print("Terminating Spark Connect server process...")
185+
self._process.terminate()
186+
try:
187+
self._process.wait(timeout=10)
188+
except subprocess.TimeoutExpired:
189+
self._process.kill()
190+
print("Spark Connect server process terminated")
191+
192+
self._started_by_us = False
193+
self._process = None

0 commit comments

Comments
 (0)