Skip to content

Commit ff3bb89

Browse files
refactor: simplify ArrayValue public interface (#82)
* refactor: simplify ArrayValue public interface --------- Co-authored-by: Tim Swast <[email protected]>
1 parent 740c451 commit ff3bb89

File tree

10 files changed

+212
-255
lines changed

10 files changed

+212
-255
lines changed

bigframes/core/__init__.py

Lines changed: 90 additions & 155 deletions
Large diffs are not rendered by default.

bigframes/core/blocks.py

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def value_columns(self) -> Sequence[str]:
152152
"""All value columns, mutually exclusive with index columns."""
153153
return [
154154
column
155-
for column in self._expr.column_names
155+
for column in self._expr.column_ids
156156
if column not in self.index_columns
157157
]
158158

@@ -444,9 +444,7 @@ def _compute_and_count(
444444
# TODO(swast): Allow for dry run and timeout.
445445
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
446446

447-
results_iterator, query_job = expr.start_query(
448-
max_results=max_results, expose_extra_columns=True
449-
)
447+
results_iterator, query_job = expr.start_query(max_results=max_results)
450448

451449
table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
452450
fraction = (
@@ -483,12 +481,6 @@ def _compute_and_count(
483481
if self.index_columns:
484482
df.set_index(list(self.index_columns), inplace=True)
485483
df.index.names = self.index.names # type: ignore
486-
487-
df.drop(
488-
[col for col in df.columns if col not in self.value_columns],
489-
axis=1,
490-
inplace=True,
491-
)
492484
elif (sampling_method == _UNIFORM) and (random_state is None):
493485
filtered_expr = self.expr._uniform_sampling(fraction)
494486
block = Block(
@@ -520,12 +512,6 @@ def _compute_and_count(
520512
df.set_index(list(self.index_columns), inplace=True)
521513
df.index.names = self.index.names # type: ignore
522514

523-
df.drop(
524-
[col for col in df.columns if col not in self.value_columns],
525-
axis=1,
526-
inplace=True,
527-
)
528-
529515
return df, total_rows, query_job
530516

531517
def _split(
@@ -1087,7 +1073,7 @@ def _normalize_expression(
10871073
):
10881074
"""Normalizes expression by moving index columns to left."""
10891075
value_columns = [
1090-
col_id for col_id in expr.column_names.keys() if col_id not in index_columns
1076+
col_id for col_id in expr.column_ids if col_id not in index_columns
10911077
]
10921078
if (assert_value_size is not None) and (
10931079
len(value_columns) != assert_value_size
@@ -1096,20 +1082,92 @@ def _normalize_expression(
10961082
return expr.select_columns([*index_columns, *value_columns])
10971083

10981084
def slice(
1099-
self: bigframes.core.blocks.Block,
1085+
self,
11001086
start: typing.Optional[int] = None,
11011087
stop: typing.Optional[int] = None,
11021088
step: typing.Optional[int] = None,
11031089
) -> bigframes.core.blocks.Block:
1104-
sliced_expr = self.expr.slice(start=start, stop=stop, step=step)
1105-
# since this is slice, return a copy even if unchanged
1106-
block = Block(
1107-
sliced_expr,
1108-
index_columns=self.index_columns,
1109-
column_labels=self.column_labels,
1110-
index_labels=self._index_labels,
1090+
if step is None:
1091+
step = 1
1092+
if step == 0:
1093+
raise ValueError("slice step cannot be zero")
1094+
if step < 0:
1095+
reverse_start = (-start - 1) if start else 0
1096+
reverse_stop = (-stop - 1) if stop else None
1097+
reverse_step = -step
1098+
return self.reversed()._forward_slice(
1099+
reverse_start, reverse_stop, reverse_step
1100+
)
1101+
return self._forward_slice(start or 0, stop, step)
1102+
1103+
def _forward_slice(self, start: int = 0, stop=None, step: int = 1):
1104+
"""Performs slice but only for positive step size."""
1105+
if step <= 0:
1106+
raise ValueError("forward_slice only supports positive step size")
1107+
1108+
use_postive_offsets = (
1109+
(start > 0)
1110+
or ((stop is not None) and (stop >= 0))
1111+
or ((step > 1) and (start >= 0))
11111112
)
1112-
return block
1113+
use_negative_offsets = (
1114+
(start < 0) or (stop and (stop < 0)) or ((step > 1) and (start < 0))
1115+
)
1116+
1117+
block = self
1118+
1119+
# only generate offsets that are used
1120+
positive_offsets = None
1121+
negative_offsets = None
1122+
1123+
if use_postive_offsets:
1124+
block, positive_offsets = self.promote_offsets()
1125+
if use_negative_offsets:
1126+
block, negative_offsets = block.reversed().promote_offsets()
1127+
block = block.reversed()
1128+
1129+
conditions = []
1130+
if start != 0:
1131+
if start > 0:
1132+
op = ops.partial_right(ops.ge_op, start)
1133+
assert positive_offsets
1134+
block, start_cond = block.apply_unary_op(positive_offsets, op)
1135+
else:
1136+
op = ops.partial_right(ops.le_op, -start - 1)
1137+
assert negative_offsets
1138+
block, start_cond = block.apply_unary_op(negative_offsets, op)
1139+
conditions.append(start_cond)
1140+
if stop is not None:
1141+
if stop >= 0:
1142+
op = ops.partial_right(ops.lt_op, stop)
1143+
assert positive_offsets
1144+
block, stop_cond = block.apply_unary_op(positive_offsets, op)
1145+
else:
1146+
op = ops.partial_right(ops.gt_op, -stop - 1)
1147+
assert negative_offsets
1148+
block, stop_cond = block.apply_unary_op(negative_offsets, op)
1149+
conditions.append(stop_cond)
1150+
1151+
if step > 1:
1152+
op = ops.partial_right(ops.mod_op, step)
1153+
if start >= 0:
1154+
op = ops.partial_right(ops.sub_op, start)
1155+
assert positive_offsets
1156+
block, start_diff = block.apply_unary_op(positive_offsets, op)
1157+
else:
1158+
op = ops.partial_right(ops.sub_op, -start + 1)
1159+
assert negative_offsets
1160+
block, start_diff = block.apply_unary_op(negative_offsets, op)
1161+
modulo_op = ops.partial_right(ops.mod_op, step)
1162+
block, mod = block.apply_unary_op(start_diff, modulo_op)
1163+
is_zero_op = ops.partial_right(ops.eq_op, 0)
1164+
block, step_cond = block.apply_unary_op(mod, is_zero_op)
1165+
conditions.append(step_cond)
1166+
1167+
for cond in conditions:
1168+
block = block.filter(cond)
1169+
1170+
return block.select_columns(self.value_columns)
11131171

11141172
# Using cache to optimize for Jupyter Notebook's behavior where both '__repr__'
11151173
# and '__repr_html__' are called in a single display action, reducing redundant
@@ -1396,7 +1454,7 @@ def concat(
13961454
)
13971455
result_block = Block(
13981456
result_expr,
1399-
index_columns=list(result_expr.column_names.keys())[:index_nlevels],
1457+
index_columns=list(result_expr.column_ids)[:index_nlevels],
14001458
column_labels=aligned_blocks[0].column_labels,
14011459
index_labels=result_labels,
14021460
)
@@ -1530,9 +1588,7 @@ def to_sql_query(
15301588
# the BigQuery unicode column name feature?
15311589
substitutions[old_id] = new_id
15321590

1533-
sql = array_value.to_sql(
1534-
ordering_mode="unordered", col_id_overrides=substitutions
1535-
)
1591+
sql = array_value.to_sql(col_id_overrides=substitutions)
15361592
return (
15371593
sql,
15381594
new_ids[: len(idx_labels)],

bigframes/core/groupby/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,6 @@ def __init__(
426426
self._value_name = value_name
427427
self._dropna = dropna # Applies to aggregations but not windowing
428428

429-
@property
430-
def _value(self):
431-
return self._block.expr.get_column(self._value_column)
432-
433429
def all(self) -> series.Series:
434430
return self._aggregate(agg_ops.all_op)
435431

bigframes/core/indexes/index.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,9 +398,7 @@ def to_pandas(self) -> pandas.Index:
398398
"""Executes deferred operations and downloads the results."""
399399
# Project down to only the index column. So the query can be cached to visualize other data.
400400
index_columns = list(self._block.index_columns)
401-
expr = self._expr.projection(
402-
[self._expr.get_any_column(col) for col in index_columns]
403-
)
401+
expr = self._expr.select_columns(index_columns)
404402
results, _ = expr.start_query()
405403
df = expr._session._rows_to_dataframe(results)
406404
df = df.set_index(index_columns)

bigframes/core/joins/row_identity.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ def join_by_row_identity(
3838
f"Only how='outer','left','inner' currently supported. {constants.FEEDBACK_LINK}"
3939
)
4040

41-
if not left.table.equals(right.table):
41+
if not left._table.equals(right._table):
4242
raise ValueError(
4343
"Cannot combine objects without an explicit join/merge key. "
44-
f"Left based on: {left.table.compile()}, but "
45-
f"right based on: {right.table.compile()}"
44+
f"Left based on: {left._table.compile()}, but "
45+
f"right based on: {right._table.compile()}"
4646
)
4747

4848
left_predicates = left._predicates
@@ -63,11 +63,11 @@ def join_by_row_identity(
6363
left_mask = left_relative_predicates if how in ["right", "outer"] else None
6464
right_mask = right_relative_predicates if how in ["left", "outer"] else None
6565
joined_columns = [
66-
_mask_value(left.get_column(key), left_mask).name(map_left_id(key))
67-
for key in left.column_names.keys()
66+
_mask_value(left._get_ibis_column(key), left_mask).name(map_left_id(key))
67+
for key in left.column_ids
6868
] + [
69-
_mask_value(right.get_column(key), right_mask).name(map_right_id(key))
70-
for key in right.column_names.keys()
69+
_mask_value(right._get_ibis_column(key), right_mask).name(map_right_id(key))
70+
for key in right.column_ids
7171
]
7272

7373
# If left isn't being masked, can just use left ordering
@@ -108,7 +108,7 @@ def join_by_row_identity(
108108

109109
joined_expr = core.ArrayValue(
110110
left._session,
111-
left.table,
111+
left._table,
112112
columns=joined_columns,
113113
hidden_ordering_columns=hidden_ordering_columns,
114114
ordering=new_ordering,

bigframes/core/joins/single_column.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ def join_by_column(
7474
if (
7575
allow_row_identity_join
7676
and how in bigframes.core.joins.row_identity.SUPPORTED_ROW_IDENTITY_HOW
77-
and left.table.equals(right.table)
77+
and left._table.equals(right._table)
7878
# Make sure we're joining on exactly the same column(s), at least with
7979
# regards to value its possible that they both have the same names but
8080
# were modified in different ways. Ignore differences in the names.
8181
and all(
82-
left.get_any_column(lcol)
82+
left._get_any_column(lcol)
8383
.name("index")
84-
.equals(right.get_any_column(rcol).name("index"))
84+
.equals(right._get_any_column(rcol).name("index"))
8585
for lcol, rcol in zip(left_column_ids, right_column_ids)
8686
)
8787
):
@@ -90,14 +90,16 @@ def join_by_column(
9090
get_column_right,
9191
) = bigframes.core.joins.row_identity.join_by_row_identity(left, right, how=how)
9292
left_join_keys = [
93-
combined_expr.get_column(get_column_left(col)) for col in left_column_ids
93+
combined_expr._get_ibis_column(get_column_left(col))
94+
for col in left_column_ids
9495
]
9596
right_join_keys = [
96-
combined_expr.get_column(get_column_right(col)) for col in right_column_ids
97+
combined_expr._get_ibis_column(get_column_right(col))
98+
for col in right_column_ids
9799
]
98100
join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how)
99101
join_key_ids = [col.get_name() for col in join_key_cols]
100-
combined_expr = combined_expr.projection(
102+
combined_expr = combined_expr._projection(
101103
[*join_key_cols, *combined_expr.columns]
102104
)
103105
if sort:
@@ -119,13 +121,13 @@ def join_by_column(
119121
lmapping = {
120122
col_id: guid.generate_guid()
121123
for col_id in itertools.chain(
122-
left.column_names, left._hidden_ordering_column_names
124+
left.column_ids, left._hidden_ordering_column_names
123125
)
124126
}
125127
rmapping = {
126128
col_id: guid.generate_guid()
127129
for col_id in itertools.chain(
128-
right.column_names, right._hidden_ordering_column_names
130+
right.column_ids, right._hidden_ordering_column_names
129131
)
130132
}
131133

@@ -136,12 +138,12 @@ def get_column_right(col_id):
136138
return rmapping[col_id]
137139

138140
left_table = left._to_ibis_expr(
139-
ordering_mode="unordered",
141+
"unordered",
140142
expose_hidden_cols=True,
141143
col_id_overrides=lmapping,
142144
)
143145
right_table = right._to_ibis_expr(
144-
ordering_mode="unordered",
146+
"unordered",
145147
expose_hidden_cols=True,
146148
col_id_overrides=rmapping,
147149
)

bigframes/dataframe.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ def _apply_series_binop(
554554
other._block.index, how=how
555555
)
556556

557-
series_column_id = other._value.get_name()
557+
series_column_id = other._value_column
558558
series_col = get_column_right(series_column_id)
559559
block = joined_index._block
560560
for column_id, label in zip(
@@ -2382,13 +2382,11 @@ def _create_io_query(self, index: bool, ordering_id: Optional[str]) -> str:
23822382

23832383
if ordering_id is not None:
23842384
return array_value.to_sql(
2385-
ordering_mode="offset_col",
2385+
offset_column=ordering_id,
23862386
col_id_overrides=id_overrides,
2387-
order_col_name=ordering_id,
23882387
)
23892388
else:
23902389
return array_value.to_sql(
2391-
ordering_mode="unordered",
23922390
col_id_overrides=id_overrides,
23932391
)
23942392

bigframes/operations/base.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import typing
1818

19-
import ibis.expr.types as ibis_types
2019
import pandas as pd
2120

2221
import bigframes.constants as constants
@@ -106,11 +105,6 @@ def __init__(
106105
if pd_series.name is None:
107106
self._block = self._block.with_column_labels([None])
108107

109-
@property
110-
def _value(self) -> ibis_types.Value:
111-
"""Private property to get Ibis expression for the value column."""
112-
return self._block.expr.get_column(self._value_column)
113-
114108
@property
115109
def _value_column(self) -> str:
116110
return self._block.value_columns[0]

bigframes/series.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1150,7 +1150,11 @@ def _groupby_values(
11501150
def apply(self, func) -> Series:
11511151
# TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs
11521152
# is actually a ternary op
1153-
return self._apply_unary_op(ops.RemoteFunctionOp(func))
1153+
# Reproject as workaround to applying filter too late. This forces the filter
1154+
# to be applied before passing data to remote function, protecting from bad
1155+
# inputs causing errors.
1156+
reprojected_series = Series(self._block._force_reproject())
1157+
return reprojected_series._apply_unary_op(ops.RemoteFunctionOp(func))
11541158

11551159
def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series:
11561160
return Series(self._get_block().add_prefix(prefix))

0 commit comments

Comments
 (0)