Skip to content

Commit eceeb22

Browse files
refactor: ArrayValue is now a tree that defers conversion to ibis (#110)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 39df43e commit eceeb22

26 files changed

+1996
-1172
lines changed

bigframes/core/__init__.py

Lines changed: 216 additions & 1005 deletions
Large diffs are not rendered by default.

bigframes/core/block_transforms.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import bigframes.core as core
2222
import bigframes.core.blocks as blocks
2323
import bigframes.core.ordering as ordering
24+
import bigframes.core.window_spec as windows
2425
import bigframes.operations as ops
2526
import bigframes.operations.aggregations as agg_ops
2627

@@ -68,21 +69,21 @@ def indicate_duplicates(
6869
if keep == "first":
6970
# Count how many copies occur up to current copy of value
7071
# Discard this value if there are copies BEFORE
71-
window_spec = core.WindowSpec(
72+
window_spec = windows.WindowSpec(
7273
grouping_keys=tuple(columns),
7374
following=0,
7475
)
7576
elif keep == "last":
7677
# Count how many copies occur up to current copy of values
7778
# Discard this value if there are copies AFTER
78-
window_spec = core.WindowSpec(
79+
window_spec = windows.WindowSpec(
7980
grouping_keys=tuple(columns),
8081
preceding=0,
8182
)
8283
else: # keep == False
8384
# Count how many copies of the value occur in entire series.
8485
# Discard this value if there are copies ANYWHERE
85-
window_spec = core.WindowSpec(grouping_keys=tuple(columns))
86+
window_spec = windows.WindowSpec(grouping_keys=tuple(columns))
8687
block, dummy = block.create_constant(1)
8788
block, val_count_col_id = block.apply_window_op(
8889
dummy,
@@ -131,7 +132,7 @@ def value_counts(
131132
)
132133
count_id = agg_ids[0]
133134
if normalize:
134-
unbound_window = core.WindowSpec()
135+
unbound_window = windows.WindowSpec()
135136
block, total_count_id = block.apply_window_op(
136137
count_id, agg_ops.sum_op, unbound_window
137138
)
@@ -153,7 +154,7 @@ def value_counts(
153154

154155
def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
155156
column_labels = block.column_labels
156-
window_spec = core.WindowSpec(
157+
window_spec = windows.WindowSpec(
157158
preceding=periods if periods > 0 else None,
158159
following=-periods if periods < 0 else None,
159160
)
@@ -195,7 +196,7 @@ def rank(
195196
ops.isnull_op,
196197
)
197198
nullity_col_ids.append(nullity_col_id)
198-
window = core.WindowSpec(
199+
window = windows.WindowSpec(
199200
# BigQuery has syntax to reorder nulls with "NULLS FIRST/LAST", but that is unavailable through ibis presently, so must order on a separate nullity expression first.
200201
ordering=(
201202
ordering.OrderingColumnReference(
@@ -229,7 +230,7 @@ def rank(
229230
block, result_id = block.apply_window_op(
230231
rownum_col_ids[i],
231232
agg_op,
232-
window_spec=core.WindowSpec(grouping_keys=[columns[i]]),
233+
window_spec=windows.WindowSpec(grouping_keys=(columns[i],)),
233234
skip_reproject_unsafe=(i < (len(columns) - 1)),
234235
)
235236
post_agg_rownum_col_ids.append(result_id)
@@ -311,7 +312,7 @@ def nsmallest(
311312
block, counter = block.apply_window_op(
312313
column_ids[0],
313314
agg_ops.rank_op,
314-
window_spec=core.WindowSpec(ordering=order_refs),
315+
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
315316
)
316317
block, condition = block.apply_unary_op(
317318
counter, ops.partial_right(ops.le_op, n)
@@ -343,7 +344,7 @@ def nlargest(
343344
block, counter = block.apply_window_op(
344345
column_ids[0],
345346
agg_ops.rank_op,
346-
window_spec=core.WindowSpec(ordering=order_refs),
347+
window_spec=windows.WindowSpec(ordering=tuple(order_refs)),
347348
)
348349
block, condition = block.apply_unary_op(
349350
counter, ops.partial_right(ops.le_op, n)
@@ -440,14 +441,14 @@ def _mean_delta_to_power(
440441
grouping_column_ids: typing.Sequence[str],
441442
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
442443
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
443-
window = core.WindowSpec(grouping_keys=grouping_column_ids)
444+
window = windows.WindowSpec(grouping_keys=tuple(grouping_column_ids))
444445
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
445446
delta_ids = []
446447
cube_op = ops.partial_right(ops.pow_op, n_power)
447448
for val_id, mean_val_id in zip(column_ids, mean_ids):
448449
block, delta_id = block.apply_binary_op(val_id, mean_val_id, ops.sub_op)
449450
block, delta_power_id = block.apply_unary_op(delta_id, cube_op)
450-
block = block.drop_columns(delta_id)
451+
block = block.drop_columns([delta_id])
451452
delta_ids.append(delta_power_id)
452453
return block, delta_ids
453454

@@ -645,7 +646,7 @@ def _idx_extrema(
645646
for idx_col in original_block.index_columns
646647
],
647648
]
648-
window_spec = core.WindowSpec(ordering=order_refs)
649+
window_spec = windows.WindowSpec(ordering=tuple(order_refs))
649650
idx_col = original_block.index_columns[0]
650651
block, result_col = block.apply_window_op(
651652
idx_col, agg_ops.first_op, window_spec

bigframes/core/blocks.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import bigframes.core as core
3636
import bigframes.core.guid as guid
3737
import bigframes.core.indexes as indexes
38-
import bigframes.core.joins as joins
3938
import bigframes.core.joins.name_resolution as join_names
4039
import bigframes.core.ordering as ordering
4140
import bigframes.core.utils
@@ -378,7 +377,7 @@ def _to_dataframe(self, result) -> pd.DataFrame:
378377
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
379378
dtypes = dict(zip(self.index_columns, self.index_dtypes))
380379
dtypes.update(zip(self.value_columns, self.dtypes))
381-
return self._expr._session._rows_to_dataframe(result, dtypes)
380+
return self._expr.session._rows_to_dataframe(result, dtypes)
382381

383382
def to_pandas(
384383
self,
@@ -422,7 +421,7 @@ def to_pandas_batches(self):
422421
dtypes.update(zip(self.value_columns, self.dtypes))
423422
results_iterator, _ = self._expr.start_query()
424423
for arrow_table in results_iterator.to_arrow_iterable(
425-
bqstorage_client=self._expr._session.bqstoragereadclient
424+
bqstorage_client=self._expr.session.bqstoragereadclient
426425
):
427426
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
428427
self._copy_index_to_pandas(df)
@@ -454,7 +453,9 @@ def _compute_and_count(
454453

455454
results_iterator, query_job = expr.start_query(max_results=max_results)
456455

457-
table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
456+
table_size = (
457+
expr.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
458+
)
458459
fraction = (
459460
max_download_size / table_size
460461
if (max_download_size is not None) and (table_size != 0)
@@ -819,7 +820,9 @@ def aggregate_all_and_stack(
819820
axis: int | str = 0,
820821
value_col_id: str = "values",
821822
dropna: bool = True,
822-
dtype=pd.Float64Dtype(),
823+
dtype: typing.Union[
824+
bigframes.dtypes.Dtype, typing.Tuple[bigframes.dtypes.Dtype, ...]
825+
] = pd.Float64Dtype(),
823826
) -> Block:
824827
axis_n = utils.get_axis_number(axis)
825828
if axis_n == 0:
@@ -829,7 +832,7 @@ def aggregate_all_and_stack(
829832
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
830833
row_labels=self.column_labels.to_list(),
831834
index_col_ids=["index"],
832-
unpivot_columns=[(value_col_id, self.value_columns)],
835+
unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]),
833836
dtype=dtype,
834837
)
835838
return Block(result_expr, index_columns=["index"], column_labels=[None])
@@ -841,7 +844,7 @@ def aggregate_all_and_stack(
841844
stacked_expr = expr_with_offsets.unpivot(
842845
row_labels=self.column_labels.to_list(),
843846
index_col_ids=[guid.generate_guid()],
844-
unpivot_columns=[(value_col_id, self.value_columns)],
847+
unpivot_columns=[(value_col_id, tuple(self.value_columns))],
845848
passthrough_columns=[*self.index_columns, offset_col],
846849
dtype=dtype,
847850
)
@@ -1029,13 +1032,13 @@ def summarize(
10291032
for col_id in column_ids
10301033
]
10311034
columns = [
1032-
(col_id, [f"{col_id}-{stat.name}" for stat in stats])
1035+
(col_id, tuple(f"{col_id}-{stat.name}" for stat in stats))
10331036
for col_id in column_ids
10341037
]
10351038
expr = self.expr.aggregate(aggregations).unpivot(
10361039
labels,
1037-
unpivot_columns=columns,
1038-
index_col_ids=[label_col_id],
1040+
unpivot_columns=tuple(columns),
1041+
index_col_ids=tuple([label_col_id]),
10391042
)
10401043
labels = self._get_labels_for_columns(column_ids)
10411044
return Block(expr, column_labels=labels, index_columns=[label_col_id])
@@ -1342,7 +1345,7 @@ def stack(self, how="left", levels: int = 1):
13421345
passthrough_columns=self.index_columns,
13431346
unpivot_columns=unpivot_columns,
13441347
index_col_ids=added_index_columns,
1345-
dtype=dtypes,
1348+
dtype=tuple(dtypes),
13461349
how=how,
13471350
)
13481351
new_index_level_names = self.column_labels.names[-levels:]
@@ -1382,7 +1385,7 @@ def _create_stack_column(
13821385
dtype = self._column_type(input_id)
13831386
input_columns.append(input_id)
13841387
# Input column i is the first one that
1385-
return input_columns, dtype or pd.Float64Dtype()
1388+
return tuple(input_columns), dtype or pd.Float64Dtype()
13861389

13871390
def _column_type(self, col_id: str) -> bigframes.dtypes.Dtype:
13881391
col_offset = self.value_columns.index(col_id)
@@ -1497,8 +1500,7 @@ def merge(
14971500
sort: bool,
14981501
suffixes: tuple[str, str] = ("_x", "_y"),
14991502
) -> Block:
1500-
joined_expr = joins.join_by_column(
1501-
self.expr,
1503+
joined_expr = self.expr.join(
15021504
left_join_ids,
15031505
other.expr,
15041506
right_join_ids,
@@ -1708,7 +1710,7 @@ def _is_monotonic(
17081710
return result
17091711

17101712

1711-
def block_from_local(data, session=None) -> Block:
1713+
def block_from_local(data) -> Block:
17121714
pd_data = pd.DataFrame(data)
17131715
columns = pd_data.columns
17141716

@@ -1730,7 +1732,7 @@ def block_from_local(data, session=None) -> Block:
17301732
)
17311733
index_ids = pd_data.columns[: len(index_labels)]
17321734

1733-
keys_expr = core.ArrayValue.mem_expr_from_pandas(pd_data, session)
1735+
keys_expr = core.ArrayValue.from_pandas(pd_data)
17341736
return Block(
17351737
keys_expr,
17361738
column_labels=columns,

bigframes/core/compile/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from bigframes.core.compile.compiled import CompiledArrayValue
16+
from bigframes.core.compile.compiler import compile_node
17+
18+
__all__ = [
19+
"compile_node",
20+
"CompiledArrayValue",
21+
]

0 commit comments

Comments
 (0)