Skip to content

Commit dd00c25

Browse files
committed
aggregates
1 parent 5fa0256 commit dd00c25

File tree

11 files changed

+505
-126
lines changed

11 files changed

+505
-126
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1691
4+
__build__ = 1695
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1691"
6+
__version__ = "0.26.0-beta.1695"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/connectors/cql_connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def read_dataset( # type:ignore
109109
predicates: list = None,
110110
chunk_size: int = INITIAL_CHUNK_SIZE, # type:ignore
111111
limit: int = None,
112+
**kwargs,
112113
) -> Generator[pyarrow.Table, None, None]: # type:ignore
113114
self.chunk_size = chunk_size
114115

opteryx/managers/expression/ops.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,11 @@ def _inner_filter_operations(arr, operator, value):
228228
matches = compute.match_like(arr, value).to_numpy(False).astype(dtype=numpy.bool_)
229229
return numpy.invert(matches)
230230
if operator == "ILike":
231-
return compute.match_like(arr, value, ignore_case=True).to_numpy(False).astype(dtype=numpy.bool_)
231+
return (
232+
compute.match_like(arr, value, ignore_case=True)
233+
.to_numpy(False)
234+
.astype(dtype=numpy.bool_)
235+
)
232236
if operator == "NotILike":
233237
matches = compute.match_like(arr, value, ignore_case=True)
234238
return numpy.invert(matches)

opteryx/operators/aggregate_and_group_node.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(self, properties: QueryProperties, **parameters):
6868
self.column_map, self.aggregate_functions = build_aggregations(self.aggregates)
6969

7070
self.buffer = []
71+
self.max_buffer_size = 50 # Process in chunks to avoid excessive memory usage
7172

7273
@property
7374
def config(self): # pragma: no cover
@@ -85,18 +86,19 @@ def execute(self, morsel: pyarrow.Table, **kwargs):
8586
yield EOS
8687
return
8788

88-
# merge all the morsels together into one table, selecting only the columns
89-
# we're pretty sure we're going to use - this will fail for datasets
90-
# larger than memory
91-
table = pyarrow.concat_tables(
92-
self.buffer,
93-
promote_options="permissive",
94-
)
89+
# If we have partial results in buffer, do final aggregation
90+
if len(self.buffer) > 0:
91+
table = pyarrow.concat_tables(
92+
self.buffer,
93+
promote_options="permissive",
94+
)
95+
table = table.combine_chunks()
96+
groups = table.group_by(self.group_by_columns)
97+
groups = groups.aggregate(self.aggregate_functions)
98+
self.buffer = [groups] # Replace buffer with final result
9599

96-
# do the group by and aggregates
97-
table = table.combine_chunks()
98-
groups = table.group_by(self.group_by_columns)
99-
groups = groups.aggregate(self.aggregate_functions)
100+
# Now buffer has the final aggregated result
101+
groups = self.buffer[0]
100102

101103
# do the secondary activities for ARRAY_AGG
102104
for node in get_all_nodes_of_type(self.aggregates, select_nodes=(NodeType.AGGREGATOR,)):
@@ -135,4 +137,16 @@ def execute(self, morsel: pyarrow.Table, **kwargs):
135137
morsel = evaluate_and_append(self.groups, morsel)
136138

137139
self.buffer.append(morsel)
140+
141+
# If buffer is full, do partial aggregation
142+
if len(self.buffer) >= self.max_buffer_size:
143+
table = pyarrow.concat_tables(
144+
self.buffer,
145+
promote_options="permissive",
146+
)
147+
table = table.combine_chunks()
148+
groups = table.group_by(self.group_by_columns)
149+
groups = groups.aggregate(self.aggregate_functions)
150+
self.buffer = [groups] # Replace buffer with partial result
151+
138152
yield None

opteryx/operators/read_node.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def to_mermaid(self, stats, nid):
188188
mermaid = f'NODE_{nid}[("**{self.node_type.upper()} (FUNCTION)**<br />'
189189
mermaid += f"{self.function}<br />"
190190
else:
191-
mermaid = f'NODE_{nid}[(**"{self.node_type.upper()} ({self.connector.__type__})**<br />'
191+
mermaid = f'NODE_{nid}[("**{self.node_type.upper()} ({self.connector.__type__})**<br />'
192192
mermaid += f"{self.connector.dataset}<br />"
193193
mermaid += BAR
194194
if self.columns:
@@ -259,7 +259,9 @@ def execute(self, morsel, **kwargs) -> Generator:
259259
arrow_schema = None
260260
start_clock = time.monotonic_ns()
261261
reader = self.connector.read_dataset(
262-
columns=self.columns, predicates=self.predicates, limit=self.limit
262+
columns=self.columns,
263+
predicates=self.predicates,
264+
limit=self.limit,
263265
)
264266
for morsel in reader:
265267
# try to make each morsel have the same schema

opteryx/operators/simple_aggregate_and_group_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def build_finalizer_aggregations(aggregators):
5959

6060

6161
class SimpleAggregateAndGroupNode(BasePlanNode):
62-
SIMPLE_AGGREGATES = {"SUM", "MIN", "MAX", "COUNT"}
62+
SIMPLE_AGGREGATES = {"SUM", "MIN", "MAX", "COUNT", "AVG", "COUNT_DISTINCT"}
6363

6464
def __init__(self, properties: QueryProperties, **parameters):
6565
BasePlanNode.__init__(self, properties=properties, **parameters)

opteryx/planner/optimizer/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from opteryx.models import QueryStatistics
6363
from opteryx.planner.logical_planner import LogicalPlan
6464
from opteryx.planner.optimizer.strategies import *
65-
from opteryx.planner.optimizer.strategies.join_groupby_pushdown import JoinGroupByPushdownStrategy
6665

6766
from .strategies.optimization_strategy import OptimizerContext
6867

@@ -84,7 +83,6 @@ def __init__(self, statistics: QueryStatistics):
8483
PredicatePushdownStrategy(statistics),
8584
ProjectionPushdownStrategy(statistics),
8685
JoinRewriteStrategy(statistics),
87-
JoinGroupByPushdownStrategy(statistics),
8886
JoinOrderingStrategy(statistics),
8987
DistinctPushdownStrategy(statistics),
9088
OperatorFusionStrategy(statistics),

0 commit comments

Comments
 (0)