Skip to content

Commit 109ee24

Browse files
refactor: remove ibis references outside of arrayvalue code. (#37)
Change-Id: I1386355446e90f89a43cee8a9f447f0775639902
1 parent edabdbb commit 109ee24

File tree

9 files changed

+160
-189
lines changed

9 files changed

+160
-189
lines changed

bigframes/core/__init__.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def _get_hidden_ordering_column(self, key: str) -> ibis_types.Column:
269269
return typing.cast(ibis_types.Column, self._hidden_ordering_column_names[key])
270270

271271
def apply_limit(self, max_results: int) -> ArrayValue:
272-
table = self.to_ibis_expr(
272+
table = self._to_ibis_expr(
273273
ordering_mode="order_by",
274274
expose_hidden_cols=True,
275275
).limit(max_results)
@@ -285,11 +285,23 @@ def apply_limit(self, max_results: int) -> ArrayValue:
285285
ordering=self._ordering,
286286
)
287287

288-
def filter(self, predicate: ibis_types.BooleanValue) -> ArrayValue:
288+
def filter(self, predicate_id: str, keep_null: bool = False) -> ArrayValue:
289+
"""Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression."""
290+
condition = typing.cast(ibis_types.BooleanValue, self.get_column(predicate_id))
291+
if keep_null:
292+
condition = typing.cast(
293+
ibis_types.BooleanValue,
294+
condition.fillna(
295+
typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True))
296+
),
297+
)
298+
return self._filter(condition)
299+
300+
def _filter(self, predicate_value: ibis_types.BooleanValue) -> ArrayValue:
289301
"""Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression."""
290302
expr = self.builder()
291303
expr.ordering = expr.ordering.with_non_sequential()
292-
expr.predicates = [*self._predicates, predicate]
304+
expr.predicates = [*self._predicates, predicate_value]
293305
return expr.build()
294306

295307
def order_by(
@@ -310,7 +322,7 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue:
310322
.. warning::
311323
The row numbers of result is non-deterministic, avoid to use.
312324
"""
313-
table = self.to_ibis_expr(
325+
table = self._to_ibis_expr(
314326
ordering_mode="order_by", expose_hidden_cols=True, fraction=fraction
315327
)
316328
columns = [table[column_name] for column_name in self._column_names]
@@ -342,7 +354,7 @@ def project_offsets(self) -> ArrayValue:
342354
if self._ordering.is_sequential:
343355
return self
344356
# TODO(tbergeron): Enforce total ordering
345-
table = self.to_ibis_expr(
357+
table = self._to_ibis_expr(
346358
ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN
347359
)
348360
columns = [table[column_name] for column_name in self._column_names]
@@ -412,7 +424,7 @@ def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue:
412424
def shape(self) -> typing.Tuple[int, int]:
413425
"""Returns dimensions as (length, width) tuple."""
414426
width = len(self.columns)
415-
count_expr = self.to_ibis_expr(ordering_mode="unordered").count()
427+
count_expr = self._to_ibis_expr(ordering_mode="unordered").count()
416428
sql = self._session.ibis_client.compile(count_expr)
417429
row_iterator, _ = self._session._start_query(
418430
sql=sql,
@@ -435,7 +447,7 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
435447
)
436448
for i, expr in enumerate([self, *other]):
437449
ordering_prefix = str(i).zfill(prefix_size)
438-
table = expr.to_ibis_expr(
450+
table = expr._to_ibis_expr(
439451
ordering_mode="string_encoded", order_col_name=ORDER_ID_COLUMN
440452
)
441453
# Rename the value columns based on horizontal offset before applying union.
@@ -522,7 +534,7 @@ def aggregate(
522534
by_column_id: column id of the aggregation key, this is preserved through the transform
523535
dropna: whether null keys should be dropped
524536
"""
525-
table = self.to_ibis_expr(ordering_mode="unordered")
537+
table = self._to_ibis_expr(ordering_mode="unordered")
526538
stats = {
527539
col_out: agg_op._as_ibis(table[col_in])
528540
for col_in, agg_op, col_out in aggregations
@@ -541,7 +553,7 @@ def aggregate(
541553
expr = ArrayValue(self._session, result, columns=columns, ordering=ordering)
542554
if dropna:
543555
for column_id in by_column_ids:
544-
expr = expr.filter(
556+
expr = expr._filter(
545557
ops.notnull_op._as_ibis(expr.get_column(column_id))
546558
)
547559
# Can maybe remove this as Ordering id is redundant as by_column is unique after aggregation
@@ -572,7 +584,7 @@ def corr_aggregate(
572584
Arguments:
573585
corr_aggregations: left_column_id, right_column_id, output_column_id tuples
574586
"""
575-
table = self.to_ibis_expr(ordering_mode="unordered")
587+
table = self._to_ibis_expr(ordering_mode="unordered")
576588
stats = {
577589
col_out: table[col_left].corr(table[col_right], how="pop")
578590
for col_left, col_right, col_out in corr_aggregations
@@ -646,7 +658,24 @@ def project_window_op(
646658
# TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation.
647659
return result._reproject_to_table() if not skip_reproject_unsafe else result
648660

649-
def to_ibis_expr(
661+
def to_sql(
662+
self,
663+
ordering_mode: Literal[
664+
"order_by", "string_encoded", "offset_col", "unordered"
665+
] = "order_by",
666+
order_col_name: Optional[str] = ORDER_ID_COLUMN,
667+
col_id_overrides: typing.Mapping[str, str] = {},
668+
) -> str:
669+
sql = self._session.ibis_client.compile(
670+
self._to_ibis_expr(
671+
ordering_mode=ordering_mode,
672+
order_col_name=order_col_name,
673+
col_id_overrides=col_id_overrides,
674+
)
675+
)
676+
return typing.cast(str, sql)
677+
678+
def _to_ibis_expr(
650679
self,
651680
ordering_mode: Literal[
652681
"order_by", "string_encoded", "offset_col", "unordered"
@@ -814,7 +843,7 @@ def start_query(
814843
# a LocalSession for unit testing.
815844
# TODO(swast): Add a timeout here? If the query is taking a long time,
816845
# maybe we just print the job metadata that we have so far?
817-
table = self.to_ibis_expr(expose_hidden_cols=expose_extra_columns)
846+
table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns)
818847
sql = self._session.ibis_client.compile(table) # type:ignore
819848
return self._session._start_query(
820849
sql=sql,
@@ -833,7 +862,7 @@ def _reproject_to_table(self) -> ArrayValue:
833862
some operations such as window operations that cannot be used
834863
recursively in projections.
835864
"""
836-
table = self.to_ibis_expr(
865+
table = self._to_ibis_expr(
837866
ordering_mode="unordered",
838867
expose_hidden_cols=True,
839868
)
@@ -912,7 +941,7 @@ def unpivot(
912941
Returns:
913942
ArrayValue: The unpivoted ArrayValue
914943
"""
915-
table = self.to_ibis_expr(ordering_mode="offset_col")
944+
table = self._to_ibis_expr(ordering_mode="offset_col")
916945
sub_expressions = []
917946

918947
# Use ibis memtable to infer type of rowlabels (if possible)
@@ -1054,7 +1083,7 @@ def slice(
10541083
start = start if (start is not None) else last_offset
10551084
cond_list.append((start - expr_with_offsets.offsets) % (-step) == 0)
10561085

1057-
sliced_expr = expr_with_offsets.filter(
1086+
sliced_expr = expr_with_offsets._filter(
10581087
functools.reduce(lambda x, y: x & y, cond_list)
10591088
)
10601089
return sliced_expr if step > 0 else sliced_expr.reversed()

bigframes/core/blocks.py

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030

3131
import geopandas as gpd # type: ignore
3232
import google.cloud.bigquery as bigquery
33-
import ibis.expr.schema as ibis_schema
34-
import ibis.expr.types as ibis_types
3533
import numpy
3634
import pandas as pd
3735
import pyarrow as pa # type: ignore
@@ -42,6 +40,7 @@
4240
import bigframes.core.indexes as indexes
4341
import bigframes.core.ordering as ordering
4442
import bigframes.core.utils
43+
import bigframes.core.utils as utils
4544
import bigframes.dtypes
4645
import bigframes.operations as ops
4746
import bigframes.operations.aggregations as agg_ops
@@ -368,7 +367,10 @@ def reorder_levels(self, ids: typing.Sequence[str]):
368367
level_names = [self.col_id_to_index_name[index_id] for index_id in ids]
369368
return Block(self.expr, ids, self.column_labels, level_names)
370369

371-
def _to_dataframe(self, result, schema: ibis_schema.Schema) -> pd.DataFrame:
370+
@classmethod
371+
def _to_dataframe(
372+
cls, result, schema: typing.Mapping[str, bigframes.dtypes.Dtype]
373+
) -> pd.DataFrame:
372374
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
373375
df = result.to_dataframe(
374376
bool_dtype=pd.BooleanDtype(),
@@ -382,8 +384,8 @@ def _to_dataframe(self, result, schema: ibis_schema.Schema) -> pd.DataFrame:
382384
)
383385

384386
# Convert Geography column from StringDType to GeometryDtype.
385-
for column_name, ibis_dtype in schema.items():
386-
if ibis_dtype.is_geospatial():
387+
for column_name, dtype in schema.items():
388+
if dtype == gpd.array.GeometryDtype():
387389
df[column_name] = gpd.GeoSeries.from_wkt(
388390
# https://github.com/geopandas/geopandas/issues/1879
389391
df[column_name].replace({numpy.nan: None}),
@@ -473,7 +475,8 @@ def _compute_and_count(
473475
if sampling_method == _HEAD:
474476
total_rows = int(results_iterator.total_rows * fraction)
475477
results_iterator.max_results = total_rows
476-
df = self._to_dataframe(results_iterator, expr.to_ibis_expr().schema())
478+
schema = dict(zip(self.value_columns, self.dtypes))
479+
df = self._to_dataframe(results_iterator, schema)
477480

478481
if self.index_columns:
479482
df.set_index(list(self.index_columns), inplace=True)
@@ -508,7 +511,8 @@ def _compute_and_count(
508511
)
509512
else:
510513
total_rows = results_iterator.total_rows
511-
df = self._to_dataframe(results_iterator, expr.to_ibis_expr().schema())
514+
schema = dict(zip(self.value_columns, self.dtypes))
515+
df = self._to_dataframe(results_iterator, schema)
512516

513517
if self.index_columns:
514518
df.set_index(list(self.index_columns), inplace=True)
@@ -639,13 +643,6 @@ def with_index_labels(self, value: typing.Sequence[Label]) -> Block:
639643
index_labels=tuple(value),
640644
)
641645

642-
def get_value_col_exprs(
643-
self, column_names: Optional[Sequence[str]] = None
644-
) -> List[ibis_types.Value]:
645-
"""Retrive value column expressions."""
646-
column_names = self.value_columns if column_names is None else column_names
647-
return [self._expr.get_column(column_name) for column_name in column_names]
648-
649646
def apply_unary_op(
650647
self, column: str, op: ops.UnaryOp, result_label: Label = None
651648
) -> typing.Tuple[Block, str]:
@@ -816,20 +813,9 @@ def assign_label(self, column_id: str, new_label: Label) -> Block:
816813
)
817814
return self.with_column_labels(new_labels)
818815

819-
def filter(self, column_name: str, keep_null: bool = False):
820-
condition = typing.cast(
821-
ibis_types.BooleanValue, self._expr.get_column(column_name)
822-
)
823-
if keep_null:
824-
condition = typing.cast(
825-
ibis_types.BooleanValue,
826-
condition.fillna(
827-
typing.cast(ibis_types.BooleanScalar, ibis_types.literal(True))
828-
),
829-
)
830-
filtered_expr = self.expr.filter(condition)
816+
def filter(self, column_id: str, keep_null: bool = False):
831817
return Block(
832-
filtered_expr,
818+
self._expr.filter(column_id, keep_null),
833819
index_columns=self.index_columns,
834820
column_labels=self.column_labels,
835821
index_labels=self.index.names,
@@ -1436,6 +1422,50 @@ def is_monotonic_decreasing(
14361422
) -> bool:
14371423
return self._is_monotonic(column_id, increasing=False)
14381424

1425+
def to_sql_query(
1426+
self, include_index: bool
1427+
) -> typing.Tuple[str, list[str], list[Label]]:
1428+
"""
1429+
Compiles this DataFrame's expression tree to SQL, optionally
1430+
including index columns.
1431+
1432+
Args:
1433+
include_index (bool):
1434+
whether to include index columns.
1435+
1436+
Returns:
1437+
a tuple of (sql_string, index_column_id_list, index_column_label_list).
1438+
If include_index is set to False, index_column_id_list and index_column_label_list
1439+
return empty lists.
1440+
"""
1441+
array_value = self._expr
1442+
col_labels, idx_labels = list(self.column_labels), list(self.index_labels)
1443+
old_col_ids, old_idx_ids = list(self.value_columns), list(self.index_columns)
1444+
1445+
if not include_index:
1446+
idx_labels, old_idx_ids = [], []
1447+
array_value = array_value.drop_columns(self.index_columns)
1448+
1449+
old_ids = old_idx_ids + old_col_ids
1450+
1451+
new_col_ids, new_idx_ids = utils.get_standardized_ids(col_labels, idx_labels)
1452+
new_ids = new_idx_ids + new_col_ids
1453+
1454+
substitutions = {}
1455+
for old_id, new_id in zip(old_ids, new_ids):
1456+
# TODO(swast): Do we need to further escape this, or can we rely on
1457+
# the BigQuery unicode column name feature?
1458+
substitutions[old_id] = new_id
1459+
1460+
sql = array_value.to_sql(
1461+
ordering_mode="unordered", col_id_overrides=substitutions
1462+
)
1463+
return (
1464+
sql,
1465+
new_ids[: len(idx_labels)],
1466+
idx_labels,
1467+
)
1468+
14391469
def _is_monotonic(
14401470
self, column_ids: typing.Union[str, Sequence[str]], increasing: bool
14411471
) -> bool:

bigframes/core/indexers.py

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import pandas as pd
2222

2323
import bigframes.constants as constants
24-
import bigframes.core as core
2524
import bigframes.core.guid as guid
2625
import bigframes.core.indexes as indexes
2726
import bigframes.core.scalar
2827
import bigframes.dataframe
28+
import bigframes.operations as ops
2929
import bigframes.series
3030

3131
if typing.TYPE_CHECKING:
@@ -59,35 +59,23 @@ def __setitem__(self, key, value) -> None:
5959

6060
# Assume the key is for the index label.
6161
block = self._series._block
62-
value_column = self._series._value
63-
index_column = block.expr.get_column(block.index_columns[0])
64-
new_value = (
65-
ibis.case()
66-
.when(
67-
index_column == ibis.literal(key, index_column.type()),
68-
ibis.literal(value, value_column.type()),
69-
)
70-
.else_(value_column)
71-
.end()
72-
.name(value_column.get_name())
62+
value_column = self._series._value_column
63+
index_column = block.index_columns[0]
64+
65+
# if index == key return value else value_colum
66+
block, insert_cond = block.apply_unary_op(
67+
index_column, ops.partial_right(ops.eq_op, key)
7368
)
74-
all_columns = []
75-
for column in block.expr.columns:
76-
if column.get_name() != value_column.get_name():
77-
all_columns.append(column)
78-
else:
79-
all_columns.append(new_value)
80-
new_expr = block.expr.projection(all_columns)
81-
82-
# TODO(tbergeron): Use block operators rather than directly building desired ibis expressions.
83-
self._series._set_block(
84-
core.blocks.Block(
85-
new_expr,
86-
self._series._block.index_columns,
87-
self._series._block.column_labels,
88-
self._series._block.index.names,
89-
)
69+
block, result_id = block.apply_binary_op(
70+
insert_cond,
71+
self._series._value_column,
72+
ops.partial_arg1(ops.where_op, value),
9073
)
74+
block = block.copy_values(result_id, value_column).drop_columns(
75+
[insert_cond, result_id]
76+
)
77+
78+
self._series._set_block(block)
9179

9280

9381
class IlocSeriesIndexer:

bigframes/core/joins/single_column.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ def join_by_column(
122122
),
123123
)
124124
else:
125-
left_table = left.to_ibis_expr(
125+
left_table = left._to_ibis_expr(
126126
ordering_mode="unordered",
127127
expose_hidden_cols=True,
128128
)
129-
right_table = right.to_ibis_expr(
129+
right_table = right._to_ibis_expr(
130130
ordering_mode="unordered",
131131
expose_hidden_cols=True,
132132
)

0 commit comments

Comments
 (0)