Skip to content

Commit d1b6800

Browse files
refactor: Introduce slice op to model array slicing (#1055)
1 parent f88043b commit d1b6800

File tree

7 files changed

+259
-104
lines changed

7 files changed

+259
-104
lines changed

bigframes/core/__init__.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ def session(self) -> Session:
158158
def schema(self) -> schemata.ArraySchema:
159159
return self.node.schema
160160

161-
@functools.cached_property
162-
def _compiled_schema(self) -> schemata.ArraySchema:
163-
return bigframes.core.compile.test_only_ibis_inferred_schema(self.node)
164-
165161
@property
166162
def explicitly_ordered(self) -> bool:
167163
# see BigFrameNode.explicitly_ordered
@@ -229,6 +225,23 @@ def order_by(self, by: Sequence[OrderingExpression]) -> ArrayValue:
229225
def reversed(self) -> ArrayValue:
230226
return ArrayValue(nodes.ReversedNode(child=self.node))
231227

228+
def slice(
229+
self, start: Optional[int], stop: Optional[int], step: Optional[int]
230+
) -> ArrayValue:
231+
if self.node.order_ambiguous and not (self.session._strictly_ordered):
232+
warnings.warn(
233+
"Window ordering may be ambiguous, this can cause unstable results.",
234+
bigframes.exceptions.AmbiguousWindowWarning,
235+
)
236+
return ArrayValue(
237+
nodes.SliceNode(
238+
self.node,
239+
start=start,
240+
stop=stop,
241+
step=step if (step is not None) else 1,
242+
)
243+
)
244+
232245
def promote_offsets(self) -> Tuple[ArrayValue, str]:
233246
"""
234247
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.

bigframes/core/blocks.py

Lines changed: 8 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,84 +1465,17 @@ def slice(
14651465
self,
14661466
start: typing.Optional[int] = None,
14671467
stop: typing.Optional[int] = None,
1468-
step: typing.Optional[int] = None,
1469-
) -> bigframes.core.blocks.Block:
1470-
if step is None:
1471-
step = 1
1468+
step: int = 1,
1469+
) -> Block:
14721470
if step == 0:
1473-
raise ValueError("slice step cannot be zero")
1474-
if step < 0:
1475-
reverse_start = (-start - 1) if start else 0
1476-
reverse_stop = (-stop - 1) if stop else None
1477-
reverse_step = -step
1478-
return self.reversed()._forward_slice(
1479-
reverse_start, reverse_stop, reverse_step
1480-
)
1481-
return self._forward_slice(start or 0, stop, step)
1482-
1483-
def _forward_slice(self, start: int = 0, stop=None, step: int = 1):
1484-
"""Performs slice but only for positive step size."""
1485-
if step <= 0:
1486-
raise ValueError("forward_slice only supports positive step size")
1487-
1488-
use_postive_offsets = (
1489-
(start > 0)
1490-
or ((stop is not None) and (stop >= 0))
1491-
or ((step > 1) and (start >= 0))
1492-
)
1493-
use_negative_offsets = (
1494-
(start < 0) or (stop and (stop < 0)) or ((step > 1) and (start < 0))
1471+
raise ValueError("Slice step size must be non-zero")
1472+
return Block(
1473+
self.expr.slice(start, stop, step),
1474+
index_columns=self.index_columns,
1475+
column_labels=self.column_labels,
1476+
index_labels=self._index_labels,
14951477
)
14961478

1497-
block = self
1498-
1499-
# only generate offsets that are used
1500-
positive_offsets = None
1501-
negative_offsets = None
1502-
1503-
if use_postive_offsets:
1504-
block, positive_offsets = self.promote_offsets()
1505-
if use_negative_offsets:
1506-
block, negative_offsets = block.reversed().promote_offsets()
1507-
block = block.reversed()
1508-
1509-
conditions = []
1510-
if start != 0:
1511-
if start > 0:
1512-
assert positive_offsets
1513-
conditions.append(ops.ge_op.as_expr(positive_offsets, ex.const(start)))
1514-
else:
1515-
assert negative_offsets
1516-
conditions.append(
1517-
ops.le_op.as_expr(negative_offsets, ex.const(-start - 1))
1518-
)
1519-
if stop is not None:
1520-
if stop >= 0:
1521-
assert positive_offsets
1522-
conditions.append(ops.lt_op.as_expr(positive_offsets, ex.const(stop)))
1523-
else:
1524-
assert negative_offsets
1525-
conditions.append(
1526-
ops.gt_op.as_expr(negative_offsets, ex.const(-stop - 1))
1527-
)
1528-
if step > 1:
1529-
if start >= 0:
1530-
assert positive_offsets
1531-
start_diff = ops.sub_op.as_expr(positive_offsets, ex.const(start))
1532-
else:
1533-
assert negative_offsets
1534-
start_diff = ops.sub_op.as_expr(negative_offsets, ex.const(-start + 1))
1535-
step_cond = ops.eq_op.as_expr(
1536-
ops.mod_op.as_expr(start_diff, ex.const(step)), ex.const(0)
1537-
)
1538-
conditions.append(step_cond)
1539-
1540-
for cond in conditions:
1541-
block, cond_id = block.project_expr(cond)
1542-
block = block.filter_by_id(cond_id)
1543-
1544-
return block.select_columns(self.value_columns)
1545-
15461479
# Using cache to optimize for Jupyter Notebook's behavior where both '__repr__'
15471480
# and '__repr_html__' are called in a single display action, reducing redundant
15481481
# queries.

bigframes/core/nodes.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import functools
2121
import itertools
2222
import typing
23-
from typing import Callable, Iterable, Sequence, Tuple
23+
from typing import Callable, Iterable, Optional, Sequence, Tuple
2424

2525
import google.cloud.bigquery as bq
2626

@@ -270,6 +270,37 @@ def order_ambiguous(self) -> bool:
270270
return self.child.order_ambiguous
271271

272272

273+
@dataclass(frozen=True, eq=False)
274+
class SliceNode(UnaryNode):
275+
"""Logical slice node conditionally becomes limit or filter over row numbers."""
276+
277+
start: Optional[int]
278+
stop: Optional[int]
279+
step: int = 1
280+
281+
@property
282+
def row_preserving(self) -> bool:
283+
"""Whether this node preserves input rows."""
284+
return False
285+
286+
@property
287+
def non_local(self) -> bool:
288+
"""
289+
Whether this node combines information across multiple rows instead of processing rows independently.
290+
Used as an approximation for whether the expression may require shuffling to execute (and therefore be expensive).
291+
"""
292+
return True
293+
294+
# these are overestimates, more accurate numbers available by converting to concrete limit or analytic+filter ops
295+
@property
296+
def variables_introduced(self) -> int:
297+
return 2
298+
299+
@property
300+
def relation_ops_created(self) -> int:
301+
return 2
302+
303+
273304
@dataclass(frozen=True, eq=False)
274305
class JoinNode(BigFrameNode):
275306
left_child: BigFrameNode

bigframes/core/rewrite.py

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import dataclasses
1717
import functools
1818
import itertools
19-
from typing import Mapping, Optional, Sequence, Tuple
19+
from typing import cast, Mapping, Optional, Sequence, Tuple
2020

2121
import bigframes.core.expression as scalar_exprs
22+
import bigframes.core.guid as guids
2223
import bigframes.core.identifiers as ids
2324
import bigframes.core.join_def as join_defs
2425
import bigframes.core.nodes as nodes
2526
import bigframes.core.ordering as order
27+
import bigframes.core.tree_properties as traversals
2628
import bigframes.operations as ops
2729

2830
Selection = Tuple[Tuple[scalar_exprs.Expression, ids.ColumnId], ...]
@@ -381,3 +383,172 @@ def common_selection_root(
381383
if r_node in l_nodes:
382384
return r_node
383385
return None
386+
387+
388+
def replace_slice_ops(root: nodes.BigFrameNode) -> nodes.BigFrameNode:
389+
# TODO: we want to pull up some slices into limit op if near root.
390+
if isinstance(root, nodes.SliceNode):
391+
root = root.transform_children(replace_slice_ops)
392+
return convert_slice_to_filter(cast(nodes.SliceNode, root))
393+
else:
394+
return root.transform_children(replace_slice_ops)
395+
396+
397+
def get_simplified_slice(node: nodes.SliceNode):
398+
"""Attempts to simplify the slice."""
399+
row_count = traversals.row_count(node)
400+
start, stop, step = node.start, node.stop, node.step
401+
402+
if start is None:
403+
start = 0 if step > 0 else -1
404+
if row_count and step > 0:
405+
if start and start < 0:
406+
start = row_count + start
407+
if stop and stop < 0:
408+
stop = row_count + stop
409+
return start, stop, step
410+
411+
412+
def convert_slice_to_filter(node: nodes.SliceNode):
413+
start, stop, step = get_simplified_slice(node)
414+
415+
# no-op (eg. df[::1])
416+
if (
417+
((start == 0) or (start is None))
418+
and ((stop is None) or (stop == -1))
419+
and (step == 1)
420+
):
421+
return node.child
422+
# No filtering, just reverse (eg. df[::-1])
423+
if ((start is None) or (start == -1)) and (not stop) and (step == -1):
424+
return nodes.ReversedNode(node.child)
425+
# if start/stop/step are all non-negative, and do a simple predicate on forward offsets
426+
if ((start is None) or (start >= 0)) and ((stop is None) or (stop >= 0)):
427+
node_w_offset = add_offsets(node.child)
428+
predicate = convert_simple_slice(
429+
scalar_exprs.DerefOp(node_w_offset.col_id), start or 0, stop, step
430+
)
431+
filtered = nodes.FilterNode(node_w_offset, predicate)
432+
return drop_cols(filtered, (node_w_offset.col_id,))
433+
434+
# fallback cases, generate both forward and backward offsets
435+
if step < 0:
436+
forward_offsets = add_offsets(node.child)
437+
reversed_offsets = add_offsets(nodes.ReversedNode(forward_offsets))
438+
dual_indexed = reversed_offsets
439+
else:
440+
reversed_offsets = add_offsets(nodes.ReversedNode(node.child))
441+
forward_offsets = add_offsets(nodes.ReversedNode(reversed_offsets))
442+
dual_indexed = forward_offsets
443+
predicate = convert_complex_slice(
444+
scalar_exprs.DerefOp(forward_offsets.col_id),
445+
scalar_exprs.DerefOp(reversed_offsets.col_id),
446+
start,
447+
stop,
448+
step,
449+
)
450+
filtered = nodes.FilterNode(dual_indexed, predicate)
451+
return drop_cols(filtered, (forward_offsets.col_id, reversed_offsets.col_id))
452+
453+
454+
def add_offsets(node: nodes.BigFrameNode) -> nodes.PromoteOffsetsNode:
455+
# Allow providing custom id generator?
456+
offsets_id = ids.ColumnId(guids.generate_guid())
457+
return nodes.PromoteOffsetsNode(node, offsets_id)
458+
459+
460+
def drop_cols(
461+
node: nodes.BigFrameNode, drop_cols: Tuple[ids.ColumnId, ...]
462+
) -> nodes.SelectionNode:
463+
# adding a whole node that redefines the schema is a lot of overhead, should do something more efficient
464+
selections = tuple(
465+
(scalar_exprs.DerefOp(id), id) for id in node.ids if id not in drop_cols
466+
)
467+
return nodes.SelectionNode(node, selections)
468+
469+
470+
def convert_simple_slice(
471+
offsets: scalar_exprs.Expression,
472+
start: int = 0,
473+
stop: Optional[int] = None,
474+
step: int = 1,
475+
) -> scalar_exprs.Expression:
476+
"""Performs slice but only for positive step size."""
477+
assert start >= 0
478+
assert (stop is None) or (stop >= 0)
479+
480+
conditions = []
481+
if start > 0:
482+
conditions.append(ops.ge_op.as_expr(offsets, scalar_exprs.const(start)))
483+
if (stop is not None) and (stop >= 0):
484+
conditions.append(ops.lt_op.as_expr(offsets, scalar_exprs.const(stop)))
485+
if step > 1:
486+
start_diff = ops.sub_op.as_expr(offsets, scalar_exprs.const(start))
487+
step_cond = ops.eq_op.as_expr(
488+
ops.mod_op.as_expr(start_diff, scalar_exprs.const(step)),
489+
scalar_exprs.const(0),
490+
)
491+
conditions.append(step_cond)
492+
493+
return merge_predicates(conditions) or scalar_exprs.const(True)
494+
495+
496+
def convert_complex_slice(
497+
forward_offsets: scalar_exprs.Expression,
498+
reverse_offsets: scalar_exprs.Expression,
499+
start: int,
500+
stop: Optional[int],
501+
step: int = 1,
502+
) -> scalar_exprs.Expression:
503+
conditions = []
504+
assert step != 0
505+
if start or ((start is not None) and step < 0):
506+
if start > 0 and step > 0:
507+
start_cond = ops.ge_op.as_expr(forward_offsets, scalar_exprs.const(start))
508+
elif start > 0 and step < 0:
509+
start_cond = ops.le_op.as_expr(forward_offsets, scalar_exprs.const(start))
510+
elif start < 0 and step > 0:
511+
start_cond = ops.le_op.as_expr(
512+
reverse_offsets, scalar_exprs.const(-start - 1)
513+
)
514+
else:
515+
assert start < 0 and step < 0
516+
start_cond = ops.ge_op.as_expr(
517+
reverse_offsets, scalar_exprs.const(-start - 1)
518+
)
519+
conditions.append(start_cond)
520+
if stop is not None:
521+
if stop >= 0 and step > 0:
522+
stop_cond = ops.lt_op.as_expr(forward_offsets, scalar_exprs.const(stop))
523+
elif stop >= 0 and step < 0:
524+
stop_cond = ops.gt_op.as_expr(forward_offsets, scalar_exprs.const(stop))
525+
elif stop < 0 and step > 0:
526+
stop_cond = ops.gt_op.as_expr(
527+
reverse_offsets, scalar_exprs.const(-stop - 1)
528+
)
529+
else:
530+
assert (stop < 0) and (step < 0)
531+
stop_cond = ops.lt_op.as_expr(
532+
reverse_offsets, scalar_exprs.const(-stop - 1)
533+
)
534+
conditions.append(stop_cond)
535+
if step != 1:
536+
if step > 1 and start >= 0:
537+
start_diff = ops.sub_op.as_expr(forward_offsets, scalar_exprs.const(start))
538+
elif step > 1 and start < 0:
539+
start_diff = ops.sub_op.as_expr(
540+
reverse_offsets, scalar_exprs.const(-start + 1)
541+
)
542+
elif step < 0 and start >= 0:
543+
start_diff = ops.add_op.as_expr(forward_offsets, scalar_exprs.const(start))
544+
else:
545+
assert step < 0 and start < 0
546+
start_diff = ops.add_op.as_expr(
547+
reverse_offsets, scalar_exprs.const(-start + 1)
548+
)
549+
step_cond = ops.eq_op.as_expr(
550+
ops.mod_op.as_expr(start_diff, scalar_exprs.const(step)),
551+
scalar_exprs.const(0),
552+
)
553+
conditions.append(step_cond)
554+
return merge_predicates(conditions) or scalar_exprs.const(True)

bigframes/dataframe.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3722,7 +3722,9 @@ def _slice(
37223722
stop: typing.Optional[int] = None,
37233723
step: typing.Optional[int] = None,
37243724
) -> DataFrame:
3725-
block = self._block.slice(start=start, stop=stop, step=step)
3725+
block = self._block.slice(
3726+
start=start, stop=stop, step=step if (step is not None) else 1
3727+
)
37263728
return DataFrame(block)
37273729

37283730
def __array_ufunc__(

bigframes/series.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,9 +1923,9 @@ def _slice(
19231923
step: typing.Optional[int] = None,
19241924
) -> bigframes.series.Series:
19251925
return bigframes.series.Series(
1926-
self._block.slice(start=start, stop=stop, step=step).select_column(
1927-
self._value_column
1928-
),
1926+
self._block.slice(
1927+
start=start, stop=stop, step=step if (step is not None) else 1
1928+
).select_column(self._value_column),
19291929
)
19301930

19311931
def cache(self):

0 commit comments

Comments
 (0)