Skip to content

Commit b00db7e

Browse files
refactor: Make column id namespaces explicit (#982)
1 parent f7c03dc commit b00db7e

File tree

9 files changed

+165
-109
lines changed

9 files changed

+165
-109
lines changed

bigframes/core/__init__.py

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import functools
1919
import io
2020
import typing
21-
from typing import Iterable, Optional, Sequence, Tuple
21+
from typing import Iterable, List, Optional, Sequence, Tuple
2222
import warnings
2323

2424
import google.cloud.bigquery
@@ -106,6 +106,7 @@ def from_table(
106106

107107
@property
108108
def column_ids(self) -> typing.Sequence[str]:
109+
"""Returns column ids as strings."""
109110
return self.schema.names
110111

111112
@property
@@ -182,10 +183,11 @@ def order_by(self, by: Sequence[OrderingExpression]) -> ArrayValue:
182183
def reversed(self) -> ArrayValue:
183184
return ArrayValue(nodes.ReversedNode(child=self.node))
184185

185-
def promote_offsets(self, col_id: str) -> ArrayValue:
186+
def promote_offsets(self) -> Tuple[ArrayValue, str]:
186187
"""
187188
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
188189
"""
190+
col_id = self._gen_namespaced_uid()
189191
if self.node.order_ambiguous and not (self.session._strictly_ordered):
190192
if not self.session._allows_ambiguity:
191193
raise ValueError(
@@ -197,21 +199,30 @@ def promote_offsets(self, col_id: str) -> ArrayValue:
197199
bigframes.exceptions.AmbiguousWindowWarning,
198200
)
199201

200-
return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id))
202+
return (
203+
ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id)),
204+
col_id,
205+
)
201206

202207
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
203208
"""Append together multiple ArrayValue objects."""
204209
return ArrayValue(
205210
nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]]))
206211
)
207212

208-
def compute_values(self, assignments: Sequence[Tuple[ex.Expression, str]]):
209-
return ArrayValue(
210-
nodes.ProjectionNode(child=self.node, assignments=tuple(assignments))
213+
def compute_values(self, assignments: Sequence[ex.Expression]):
214+
col_ids = self._gen_namespaced_uids(len(assignments))
215+
ex_id_pairs = tuple((ex, id) for ex, id in zip(assignments, col_ids))
216+
return (
217+
ArrayValue(nodes.ProjectionNode(child=self.node, assignments=ex_id_pairs)),
218+
col_ids,
211219
)
212220

213-
def project_to_id(self, expression: ex.Expression, output_id: str):
214-
return self.compute_values(((expression, output_id),))
221+
def project_to_id(self, expression: ex.Expression):
222+
array_val, ids = self.compute_values(
223+
[expression],
224+
)
225+
return array_val, ids[0]
215226

216227
def assign(self, source_id: str, destination_id: str) -> ArrayValue:
217228
if destination_id in self.column_ids: # Mutate case
@@ -234,19 +245,22 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
234245

235246
def create_constant(
236247
self,
237-
destination_id: str,
238248
value: typing.Any,
239249
dtype: typing.Optional[bigframes.dtypes.Dtype],
240-
) -> ArrayValue:
250+
) -> Tuple[ArrayValue, str]:
251+
destination_id = self._gen_namespaced_uid()
241252
if pandas.isna(value):
242253
# Need to assign a data type when value is NaN.
243254
dtype = dtype or bigframes.dtypes.DEFAULT_DTYPE
244255

245-
return ArrayValue(
246-
nodes.ProjectionNode(
247-
child=self.node,
248-
assignments=((ex.const(value, dtype), destination_id),),
249-
)
256+
return (
257+
ArrayValue(
258+
nodes.ProjectionNode(
259+
child=self.node,
260+
assignments=((ex.const(value, dtype), destination_id),),
261+
)
262+
),
263+
destination_id,
250264
)
251265

252266
def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
@@ -297,11 +311,10 @@ def project_window_op(
297311
column_name: str,
298312
op: agg_ops.UnaryWindowOp,
299313
window_spec: WindowSpec,
300-
output_name=None,
301314
*,
302315
never_skip_nulls=False,
303316
skip_reproject_unsafe: bool = False,
304-
) -> ArrayValue:
317+
) -> Tuple[ArrayValue, str]:
305318
"""
306319
Creates a new expression based on this expression with unary operation applied to one column.
307320
column_name: the id of the input column present in the expression
@@ -324,16 +337,20 @@ def project_window_op(
324337
bigframes.exceptions.AmbiguousWindowWarning,
325338
)
326339

327-
return ArrayValue(
328-
nodes.WindowOpNode(
329-
child=self.node,
330-
column_name=column_name,
331-
op=op,
332-
window_spec=window_spec,
333-
output_name=output_name,
334-
never_skip_nulls=never_skip_nulls,
335-
skip_reproject_unsafe=skip_reproject_unsafe,
336-
)
340+
output_name = self._gen_namespaced_uid()
341+
return (
342+
ArrayValue(
343+
nodes.WindowOpNode(
344+
child=self.node,
345+
column_name=column_name,
346+
op=op,
347+
window_spec=window_spec,
348+
output_name=output_name,
349+
never_skip_nulls=never_skip_nulls,
350+
skip_reproject_unsafe=skip_reproject_unsafe,
351+
)
352+
),
353+
output_name,
337354
)
338355

339356
def _reproject_to_table(self) -> ArrayValue:
@@ -410,3 +427,21 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue:
410427

411428
def get_offset_for_name(self, name: str):
412429
return self.schema.names.index(name)
430+
431+
# Deterministically generate namespaced ids for new variables
432+
# These new ids are only unique within the current namespace.
433+
# Many operations, such as joins, create new namespaces. See: BigFrameNode.defines_namespace
434+
# When migrating to integer ids, these will generate the next available integer, in order to densely pack ids
435+
# this will help represent variables sets as compact bitsets
436+
def _gen_namespaced_uid(self) -> str:
437+
return self._gen_namespaced_uids(1)[0]
438+
439+
def _gen_namespaced_uids(self, n: int) -> List[str]:
440+
i = len(self.node.defined_variables)
441+
genned_ids: List[str] = []
442+
while len(genned_ids) < n:
443+
attempted_id = f"col_{i}"
444+
if attempted_id not in self.node.defined_variables:
445+
genned_ids.append(attempted_id)
446+
i = i + 1
447+
return genned_ids

bigframes/core/blocks.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,7 @@ def reset_index(self, drop: bool = True) -> Block:
336336
self.session._default_index_type
337337
== bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
338338
):
339-
new_index_col_id = guid.generate_guid()
340-
expr = expr.promote_offsets(new_index_col_id)
339+
expr, new_index_col_id = expr.promote_offsets()
341340
new_index_cols = [new_index_col_id]
342341
elif self.session._default_index_type == bigframes.enums.DefaultIndexKind.NULL:
343342
new_index_cols = []
@@ -846,9 +845,7 @@ def project_expr(
846845
"""
847846
Apply a scalar expression to the block. Creates a new column to store the result.
848847
"""
849-
# TODO(tbergeron): handle labels safely so callers don't need to
850-
result_id = guid.generate_guid()
851-
array_val = self._expr.project_to_id(expr, result_id)
848+
array_val, result_id = self._expr.project_to_id(expr)
852849
block = Block(
853850
array_val,
854851
index_columns=self.index_columns,
@@ -971,12 +968,10 @@ def apply_window_op(
971968
for key in window_spec.grouping_keys:
972969
block, not_null_id = block.apply_unary_op(key, ops.notnull_op)
973970
block = block.filter_by_id(not_null_id).drop_columns([not_null_id])
974-
result_id = guid.generate_guid()
975-
expr = block._expr.project_window_op(
971+
expr, result_id = block._expr.project_window_op(
976972
column,
977973
op,
978974
window_spec,
979-
result_id,
980975
skip_reproject_unsafe=skip_reproject_unsafe,
981976
never_skip_nulls=never_skip_nulls,
982977
)
@@ -1005,8 +1000,7 @@ def create_constant(
10051000
label: Label = None,
10061001
dtype: typing.Optional[bigframes.dtypes.Dtype] = None,
10071002
) -> typing.Tuple[Block, str]:
1008-
result_id = guid.generate_guid()
1009-
expr = self.expr.create_constant(result_id, scalar_constant, dtype=dtype)
1003+
expr, result_id = self.expr.create_constant(scalar_constant, dtype=dtype)
10101004
# Create index copy with label inserted
10111005
# See: https://pandas.pydata.org/docs/reference/api/pandas.Index.insert.html
10121006
labels = self.column_labels.insert(len(self.column_labels), label)
@@ -1063,10 +1057,9 @@ def aggregate_all_and_stack(
10631057
)
10641058
for col_id in self.value_columns
10651059
]
1066-
index_id = guid.generate_guid()
1067-
result_expr = self.expr.aggregate(
1060+
result_expr, index_id = self.expr.aggregate(
10681061
aggregations, dropna=dropna
1069-
).create_constant(index_id, None, None)
1062+
).create_constant(None, None)
10701063
# Transpose as last operation so that final block has valid transpose cache
10711064
return Block(
10721065
result_expr,
@@ -1077,8 +1070,7 @@ def aggregate_all_and_stack(
10771070
else: # axis_n == 1
10781071
# using offsets as identity to group on.
10791072
# TODO: Allow to promote identity/total_order columns instead for better perf
1080-
offset_col = guid.generate_guid()
1081-
expr_with_offsets = self.expr.promote_offsets(offset_col)
1073+
expr_with_offsets, offset_col = self.expr.promote_offsets()
10821074
stacked_expr, (_, value_col_ids, passthrough_cols,) = unpivot(
10831075
expr_with_offsets,
10841076
row_labels=self.column_labels,
@@ -1224,8 +1216,7 @@ def aggregate(
12241216

12251217
names: typing.List[Label] = []
12261218
if len(by_column_ids) == 0:
1227-
label_id = guid.generate_guid()
1228-
result_expr = result_expr.create_constant(label_id, 0, pd.Int64Dtype())
1219+
result_expr, label_id = result_expr.create_constant(0, pd.Int64Dtype())
12291220
index_columns = (label_id,)
12301221
names = [None]
12311222
else:
@@ -1275,8 +1266,7 @@ def get_stat(
12751266
for stat in stats_to_fetch
12761267
]
12771268
expr = self.expr.aggregate(aggregations)
1278-
offset_index_id = guid.generate_guid()
1279-
expr = expr.promote_offsets(offset_index_id)
1269+
expr, offset_index_id = expr.promote_offsets()
12801270
block = Block(
12811271
expr,
12821272
index_columns=[offset_index_id],
@@ -1303,8 +1293,7 @@ def get_binary_stat(
13031293
)
13041294
]
13051295
expr = self.expr.aggregate(aggregations)
1306-
offset_index_id = guid.generate_guid()
1307-
expr = expr.promote_offsets(offset_index_id)
1296+
expr, offset_index_id = expr.promote_offsets()
13081297
block = Block(
13091298
expr,
13101299
index_columns=[offset_index_id],
@@ -1406,9 +1395,10 @@ def explode(
14061395
expr = self.expr.explode(column_ids)
14071396

14081397
if ignore_index:
1409-
new_index_ids = guid.generate_guid()
1398+
expr = expr.drop_columns(self.index_columns)
1399+
expr, new_index_ids = expr.promote_offsets()
14101400
return Block(
1411-
expr.drop_columns(self.index_columns).promote_offsets(new_index_ids),
1401+
expr,
14121402
column_labels=self.column_labels,
14131403
# Initiates default index creation using the block constructor.
14141404
index_columns=[new_index_ids],
@@ -1593,8 +1583,7 @@ def retrieve_repr_request_results(
15931583
return computed_df, count, query_job
15941584

15951585
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
1596-
result_id = guid.generate_guid()
1597-
expr = self._expr.promote_offsets(result_id)
1586+
expr, result_id = self._expr.promote_offsets()
15981587
return (
15991588
Block(
16001589
expr,
@@ -1611,13 +1600,11 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
16111600
expr = self._expr
16121601
new_index_cols = []
16131602
for index_col in self._index_columns:
1614-
new_col = guid.generate_guid()
1615-
expr = expr.project_to_id(
1603+
expr, new_col = expr.project_to_id(
16161604
expression=ops.add_op.as_expr(
16171605
ex.const(prefix),
16181606
ops.AsTypeOp(to_type="string").as_expr(index_col),
16191607
),
1620-
output_id=new_col,
16211608
)
16221609
new_index_cols.append(new_col)
16231610
expr = expr.select_columns((*new_index_cols, *self.value_columns))
@@ -1637,13 +1624,11 @@ def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block:
16371624
expr = self._expr
16381625
new_index_cols = []
16391626
for index_col in self._index_columns:
1640-
new_col = guid.generate_guid()
1641-
expr = expr.project_to_id(
1627+
expr, new_col = expr.project_to_id(
16421628
expression=ops.add_op.as_expr(
16431629
ops.AsTypeOp(to_type="string").as_expr(index_col),
16441630
ex.const(suffix),
16451631
),
1646-
output_id=new_col,
16471632
)
16481633
new_index_cols.append(new_col)
16491634
expr = expr.select_columns((*new_index_cols, *self.value_columns))
@@ -1785,8 +1770,7 @@ def melt(
17851770
)
17861771

17871772
if create_offsets_index:
1788-
index_id = guid.generate_guid()
1789-
unpivot_expr = unpivot_expr.promote_offsets(index_id)
1773+
unpivot_expr, index_id = unpivot_expr.promote_offsets()
17901774
index_cols = [index_id]
17911775
else:
17921776
index_cols = []
@@ -2012,12 +1996,10 @@ def merge(
20121996

20131997
coalesced_ids = []
20141998
for left_id, right_id in zip(left_join_ids, right_join_ids):
2015-
coalesced_id = guid.generate_guid()
2016-
joined_expr = joined_expr.project_to_id(
1999+
joined_expr, coalesced_id = joined_expr.project_to_id(
20172000
ops.coalesce_op.as_expr(
20182001
get_column_left[left_id], get_column_right[right_id]
20192002
),
2020-
coalesced_id,
20212003
)
20222004
coalesced_ids.append(coalesced_id)
20232005

@@ -2076,8 +2058,7 @@ def merge(
20762058
expr = joined_expr
20772059
index_columns = []
20782060
else:
2079-
offset_index_id = guid.generate_guid()
2080-
expr = joined_expr.promote_offsets(offset_index_id)
2061+
expr, offset_index_id = joined_expr.promote_offsets()
20812062
index_columns = [offset_index_id]
20822063

20832064
return Block(expr, index_columns=index_columns, column_labels=labels)
@@ -2442,8 +2423,7 @@ def _get_rows_as_json_values(self) -> Block:
24422423
# expression.
24432424
# TODO(shobs): Replace direct SQL manipulation by structured expression
24442425
# manipulation
2445-
ordering_column_name = guid.generate_guid()
2446-
expr = self.expr.promote_offsets(ordering_column_name)
2426+
expr, ordering_column_name = self.expr.promote_offsets()
24472427
expr_sql = self.session._to_sql(expr)
24482428

24492429
# Names of the columns to serialize for the row.
@@ -2869,8 +2849,8 @@ def coalesce_columns(
28692849
expr = expr.drop_columns([left_id])
28702850
elif how == "outer":
28712851
coalesced_id = guid.generate_guid()
2872-
expr = expr.project_to_id(
2873-
ops.coalesce_op.as_expr(left_id, right_id), coalesced_id
2852+
expr, coalesced_id = expr.project_to_id(
2853+
ops.coalesce_op.as_expr(left_id, right_id)
28742854
)
28752855
expr = expr.drop_columns([left_id, right_id])
28762856
result_ids.append(coalesced_id)
@@ -3047,7 +3027,7 @@ def unpivot(
30473027
explode_offsets_id = labels_mapping[labels_array.column_ids[-1]]
30483028

30493029
# Build the output rows as a case statment that selects between the N input columns
3050-
unpivot_exprs: List[Tuple[ex.Expression, str]] = []
3030+
unpivot_exprs: List[ex.Expression] = []
30513031
# Supports producing multiple stacked ouput columns for stacking only part of hierarchical index
30523032
for input_ids in unpivot_columns:
30533033
# row explode offset used to choose the input column
@@ -3064,11 +3044,11 @@ def unpivot(
30643044
)
30653045
)
30663046
col_expr = ops.case_when_op.as_expr(*cases)
3067-
unpivot_exprs.append((col_expr, guid.generate_guid()))
3047+
unpivot_exprs.append(col_expr)
30683048

3069-
unpivot_col_ids = [id for _, id in unpivot_exprs]
3049+
joined_array, unpivot_col_ids = joined_array.compute_values(unpivot_exprs)
30703050

3071-
return joined_array.compute_values(unpivot_exprs).select_columns(
3051+
return joined_array.select_columns(
30723052
[*index_col_ids, *unpivot_col_ids, *new_passthrough_cols]
30733053
), (tuple(index_col_ids), tuple(unpivot_col_ids), tuple(new_passthrough_cols))
30743054

0 commit comments

Comments
 (0)