Skip to content

Commit e75226f

Browse files
authored
Merge pull request #2881 from mabel-dev/clickbench-performance-regression-investigation-1
group by improvements
2 parents a992d72 + 114c652 commit e75226f

File tree

4 files changed

+150
-7
lines changed

4 files changed

+150
-7
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1712
4+
__build__ = 1713
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1712"
6+
__version__ = "0.26.0-beta.1713"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/operators/aggregate_and_group_node.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ def __init__(self, properties: QueryProperties, **parameters):
6868
self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)
6969

7070
self.buffer = []
71-
self.max_buffer_size = 100 # Process in chunks to avoid excessive memory usage
71+
self.max_buffer_size = 250 # Buffer size before partial aggregation (kept for future parallelization)
7272
self._partial_aggregated = False # Track if we've done a partial aggregation
73+
self._disable_partial_agg = False # Can disable if partial agg isn't helping
7374

7475
@property
7576
def config(self): # pragma: no cover
@@ -221,15 +222,26 @@ def execute(self, morsel: pyarrow.Table, **kwargs):
221222
self.buffer.append(morsel)
222223

223224
# If buffer is full, do partial aggregation
224-
if len(self.buffer) >= self.max_buffer_size:
225+
# BUT: Skip partial aggregation if it's not reducing data effectively
226+
if len(self.buffer) >= self.max_buffer_size and not self._disable_partial_agg:
225227
table = pyarrow.concat_tables(
226228
self.buffer,
227229
promote_options="permissive",
228230
)
229231

230232
groups = table.group_by(self.group_by_columns)
231233
groups = groups.aggregate(self.aggregate_functions)
232-
self.buffer = [groups] # Replace buffer with partial result
233-
self._partial_aggregated = True # Mark that we've done a partial aggregation
234+
235+
# Check if partial aggregation is effective
236+
# If we're not reducing the row count significantly, stop doing partial aggs
237+
reduction_ratio = groups.num_rows / table.num_rows if table.num_rows > 0 else 1
238+
if reduction_ratio > 0.75: # Kept more than 75% of rows - high cardinality!
239+
# Partial aggregation isn't helping, disable it and keep buffering
240+
self._disable_partial_agg = True
241+
# Don't replace buffer with partial result, keep accumulating
242+
else:
243+
# Good reduction, keep using partial aggregation
244+
self.buffer = [groups] # Replace buffer with partial result
245+
self._partial_aggregated = True # Mark that we've done a partial aggregation
234246

235247
yield None

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1712"
3+
version = "0.26.0-beta.1713"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Compare two ClickBench benchmark runs and show differences.
4+
5+
Usage:
6+
python compare_runs.py baseline.txt comparison.txt [--threshold 10]
7+
8+
Example:
9+
python compare_runs.py [email protected] [email protected]
10+
python compare_runs.py old_run.txt new_run.txt --threshold 5
11+
"""
12+
13+
import argparse
14+
import re
15+
import sys
16+
from pathlib import Path
17+
18+
19+
def parse_benchmark(filename):
20+
"""Parse a benchmark file and extract version and timing data."""
21+
try:
22+
with open(filename, 'r', encoding='utf-8') as f:
23+
content = f.read()
24+
except FileNotFoundError:
25+
print(f"Error: File '{filename}' not found", file=sys.stderr)
26+
sys.exit(1)
27+
28+
# Extract version - try multiple patterns
29+
version = None
30+
for pattern in [
31+
r'0\.\d+\.\d+-beta\.\d+', # 0.25.0-beta.1444
32+
r'v?\d+\.\d+\.\d+', # v1.0.0 or 1.0.0
33+
r'\d+\.\d+\.\d+b\d+', # 0.25.0b1444
34+
]:
35+
version_match = re.search(pattern, content)
36+
if version_match:
37+
version = version_match.group(0)
38+
break
39+
40+
if not version:
41+
# Use filename as fallback
42+
version = Path(filename).stem
43+
44+
# Extract timing arrays
45+
times = []
46+
for line in content.split('\n'):
47+
match = re.match(r'\[([0-9.,null]+)\]', line)
48+
if match:
49+
values = match.group(1).split(',')
50+
# Take the median (3rd value, index 2) or first if less values
51+
try:
52+
if 'null' in values:
53+
times.append(None)
54+
else:
55+
times.append(float(values[2]) if len(values) >= 3 else float(values[0]))
56+
except (ValueError, IndexError):
57+
times.append(None)
58+
59+
return version, times
60+
61+
62+
def main():
63+
parser = argparse.ArgumentParser(
64+
description='Compare two ClickBench benchmark runs',
65+
formatter_class=argparse.RawDescriptionHelpFormatter,
66+
epilog=__doc__
67+
)
68+
parser.add_argument('baseline', help='Baseline benchmark file')
69+
parser.add_argument('comparison', help='Comparison benchmark file')
70+
parser.add_argument('--threshold', '-t', type=float, default=10.0,
71+
help='Percentage threshold for flagging regressions/improvements (default: 10%%)')
72+
parser.add_argument('--top', '-n', type=int, default=10,
73+
help='Number of top regressions/improvements to show (default: 10)')
74+
75+
args = parser.parse_args()
76+
77+
baseline_version, baseline_times = parse_benchmark(args.baseline)
78+
comparison_version, comparison_times = parse_benchmark(args.comparison)
79+
80+
if not baseline_times or not comparison_times:
81+
print("Error: Could not parse timing data from one or both files", file=sys.stderr)
82+
sys.exit(1)
83+
84+
print(f'Comparing {baseline_version} (baseline) vs {comparison_version} (comparison)')
85+
print(f'Found {len(baseline_times)} queries in baseline and {len(comparison_times)} queries in comparison')
86+
print()
87+
print('Query # | Baseline | Compare | Delta | Change')
88+
print('--------|----------|---------|---------|--------')
89+
90+
regressions = []
91+
improvements = []
92+
93+
for i, (t_base, t_comp) in enumerate(zip(baseline_times, comparison_times), 1):
94+
if t_base is None or t_comp is None:
95+
delta_str = 'N/A'
96+
change_str = 'N/A'
97+
else:
98+
delta = t_comp - t_base
99+
pct_change = ((t_comp / t_base) - 1) * 100 if t_base > 0 else 0
100+
delta_str = f'{delta:+.2f}s'
101+
change_str = f'{pct_change:+.1f}%'
102+
103+
if pct_change > args.threshold:
104+
regressions.append((i, t_base, t_comp, pct_change))
105+
elif pct_change < -args.threshold:
106+
improvements.append((i, t_base, t_comp, pct_change))
107+
108+
print(f'{i:7} | {t_base if t_base else "null":8} | {t_comp if t_comp else "null":7} | {delta_str:7} | {change_str:>7}')
109+
110+
print()
111+
print(f'Total queries: {len(baseline_times)}')
112+
print(f'Regressions (>{args.threshold}% slower): {len(regressions)}')
113+
print(f'Improvements (>{args.threshold}% faster): {len(improvements)}')
114+
print()
115+
116+
if regressions:
117+
print(f'Top {min(args.top, len(regressions))} Regressions:')
118+
regressions.sort(key=lambda x: x[3], reverse=True)
119+
for q, t_base, t_comp, pct in regressions[:args.top]:
120+
print(f' Query {q:2}: {t_base:.2f}s → {t_comp:.2f}s ({pct:+.1f}%)')
121+
122+
if improvements:
123+
print()
124+
print(f'Top {min(args.top, len(improvements))} Improvements:')
125+
improvements.sort(key=lambda x: x[3])
126+
for q, t_base, t_comp, pct in improvements[:args.top]:
127+
print(f' Query {q:2}: {t_base:.2f}s → {t_comp:.2f}s ({pct:+.1f}%)')
128+
129+
130+
if __name__ == '__main__':
131+
main()

0 commit comments

Comments
 (0)