Skip to content

Commit 83f254a

Browse files
refactor: Allow unambiguous windows even in unstrictly ordered sessions (#849)
Co-authored-by: Huan Chen <[email protected]>
1 parent ff2faed commit 83f254a

File tree

3 files changed

+45
-2
lines changed

3 files changed

+45
-2
lines changed

bigframes/core/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
194194
"""
195195
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
196196
"""
197-
if not self.session._strictly_ordered:
197+
if self.node.order_ambiguous and not self.session._strictly_ordered:
198198
raise ValueError("Generating offsets not supported in unordered mode")
199199
return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))
200200

@@ -346,7 +346,7 @@ def project_window_op(
346346
"""
347347
# TODO: Support non-deterministic windowing
348348
if window_spec.row_bounded or not op.order_independent:
349-
if not self.session._strictly_ordered:
349+
if self.node.order_ambiguous and not self.session._strictly_ordered:
350350
raise ValueError(
351351
"Order-dependent windowed ops not supported in unordered mode"
352352
)

bigframes/core/nodes.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ def joins(self) -> bool:
127127
"""
128128
return False
129129

130+
@property
131+
@abc.abstractmethod
132+
def order_ambiguous(self) -> bool:
133+
"""
134+
Whether row ordering is potentially ambiguous. For example, ReadTable (without a primary key) could be ordered in different ways.
135+
"""
136+
...
137+
130138
@functools.cached_property
131139
def total_variables(self) -> int:
132140
return self.variables_introduced + sum(
@@ -177,6 +185,10 @@ def transform_children(
177185
) -> BigFrameNode:
178186
return replace(self, child=t(self.child))
179187

188+
@property
189+
def order_ambiguous(self) -> bool:
190+
return self.child.order_ambiguous
191+
180192

181193
@dataclass(frozen=True)
182194
class JoinNode(BigFrameNode):
@@ -196,6 +208,10 @@ def non_local(self) -> bool:
196208
def child_nodes(self) -> typing.Sequence[BigFrameNode]:
197209
return (self.left_child, self.right_child)
198210

211+
@property
212+
def order_ambiguous(self) -> bool:
213+
return True
214+
199215
def __hash__(self):
200216
return self._node_hash
201217

@@ -247,6 +263,10 @@ def __post_init__(self):
247263
def child_nodes(self) -> typing.Sequence[BigFrameNode]:
248264
return self.children
249265

266+
@property
267+
def order_ambiguous(self) -> bool:
268+
return any(child.order_ambiguous for child in self.children)
269+
250270
def __hash__(self):
251271
return self._node_hash
252272

@@ -293,6 +313,10 @@ def variables_introduced(self) -> int:
293313
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
294314
return len(self.schema.items) + 1
295315

316+
@property
317+
def order_ambiguous(self) -> bool:
318+
return False
319+
296320
def transform_children(
297321
self, t: Callable[[BigFrameNode], BigFrameNode]
298322
) -> BigFrameNode:
@@ -350,6 +374,10 @@ def relation_ops_created(self) -> int:
350374
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
351375
return 3
352376

377+
@property
378+
def order_ambiguous(self) -> bool:
379+
return len(self.total_order_cols) == 0
380+
353381
@functools.cached_property
354382
def variables_introduced(self) -> int:
355383
return len(self.schema.items) + 1
@@ -417,6 +445,10 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
417445
if col not in self.schema.names
418446
)
419447

448+
@property
449+
def order_ambiguous(self) -> bool:
450+
return not isinstance(self.ordering, orderings.TotalOrdering)
451+
420452
def transform_children(
421453
self, t: Callable[[BigFrameNode], BigFrameNode]
422454
) -> BigFrameNode:
@@ -600,6 +632,10 @@ def schema(self) -> schemata.ArraySchema:
600632
def variables_introduced(self) -> int:
601633
return len(self.aggregations) + len(self.by_column_ids)
602634

635+
@property
636+
def order_ambiguous(self) -> bool:
637+
return False
638+
603639

604640
@dataclass(frozen=True)
605641
class WindowOpNode(UnaryNode):

tests/system/small/test_unordered.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ def test_unordered_mode_cache_aggregate(unordered_session):
4040
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)
4141

4242

43+
def test_unordered_mode_single_aggregate(unordered_session):
44+
pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype())
45+
bf_df = bpd.DataFrame(pd_df, session=unordered_session)
46+
47+
assert bf_df.a.mean() == pd_df.a.mean()
48+
49+
4350
def test_unordered_mode_print(unordered_session):
4451
pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype())
4552
df = bpd.DataFrame(pd_df, session=unordered_session).cache()

0 commit comments

Comments
 (0)