16
16
import functools
17
17
import itertools
18
18
import typing
19
- from typing import Collection , Optional , Sequence
19
+ from typing import Optional , Sequence
20
20
21
21
import bigframes_vendored .ibis
22
22
import bigframes_vendored .ibis .backends .bigquery .backend as ibis_bigquery
38
38
import bigframes .dtypes
39
39
import bigframes .operations .aggregations as agg_ops
40
40
41
- PREDICATE_COLUMN = "bigframes_predicate"
42
-
43
-
44
41
op_compiler = op_compilers .scalar_op_compiler
45
42
46
43
@@ -50,11 +47,8 @@ def __init__(
50
47
self ,
51
48
table : ibis_types .Table ,
52
49
columns : Sequence [ibis_types .Value ],
53
- predicates : Optional [Collection [ibis_types .BooleanValue ]] = None ,
54
50
):
55
51
self ._table = table
56
- # Deferred predicates probably no longer needed?
57
- self ._predicates = tuple (predicates ) if predicates is not None else ()
58
52
# Allow creating a DataFrame directly from an Ibis table expression.
59
53
# TODO(swast): Validate that each column references the same table (or
60
54
# no table for literal values).
@@ -69,17 +63,6 @@ def __init__(
69
63
# dictionary mapping names to column values.
70
64
self ._column_names = {column .get_name (): column for column in self ._columns }
71
65
72
- def builder (self ):
73
- """Creates a mutable builder for expressions."""
74
- # Since ArrayValue is intended to be immutable (immutability offers
75
- # potential opportunities for caching, though we might need to introduce
76
- # more node types for that to be useful), we create a builder class.
77
- return UnorderedIR .Builder (
78
- self ._table ,
79
- columns = self ._columns ,
80
- predicates = self ._predicates ,
81
- )
82
-
83
66
def to_sql (
84
67
self ,
85
68
* ,
@@ -118,15 +101,6 @@ def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
118
101
def column_ids (self ) -> typing .Sequence [str ]:
119
102
return tuple (self ._column_names .keys ())
120
103
121
- @property
122
- def _reduced_predicate (self ) -> typing .Optional [ibis_types .BooleanValue ]:
123
- """Returns the frame's predicates as an equivalent boolean value, useful where a single predicate value is preferred."""
124
- return (
125
- _reduce_predicate_list (self ._predicates ).name (PREDICATE_COLUMN )
126
- if self ._predicates
127
- else None
128
- )
129
-
130
104
@property
131
105
def _ibis_bindings (self ) -> dict [str , ibis_types .Value ]:
132
106
return {col : self ._get_ibis_column (col ) for col in self .column_ids }
@@ -141,9 +115,7 @@ def projection(
141
115
op_compiler .compile_expression (expression , bindings ).name (id )
142
116
for expression , id in expression_id_pairs
143
117
]
144
- builder = self .builder ()
145
- builder .columns = tuple ([* self ._columns , * new_values ])
146
- return builder .build ()
118
+ return UnorderedIR (self ._table , (* self ._columns , * new_values ))
147
119
148
120
def selection (
149
121
self ,
@@ -155,9 +127,7 @@ def selection(
155
127
op_compiler .compile_expression (input , bindings ).name (id )
156
128
for input , id in input_output_pairs
157
129
]
158
- builder = self .builder ()
159
- builder .columns = tuple (values )
160
- return builder .build ()
130
+ return UnorderedIR (self ._table , tuple (values ))
161
131
162
132
def _get_ibis_column (self , key : str ) -> ibis_types .Value :
163
133
"""Gets the Ibis expression for a given column."""
@@ -192,7 +162,6 @@ def row_count(self, name: str) -> UnorderedIR:
192
162
def _to_ibis_expr (
193
163
self ,
194
164
* ,
195
- expose_hidden_cols : bool = False ,
196
165
fraction : Optional [float ] = None ,
197
166
):
198
167
"""
@@ -206,49 +175,25 @@ def _to_ibis_expr(
206
175
An ibis expression representing the data help by the ArrayValue object.
207
176
"""
208
177
columns = list (self ._columns )
209
- columns_to_drop : list [
210
- str
211
- ] = [] # Ordering/Filtering columns that will be dropped at end
212
-
213
- if self ._reduced_predicate is not None :
214
- columns .append (self ._reduced_predicate )
215
- # Usually drop predicate as it is will be all TRUE after filtering
216
- if not expose_hidden_cols :
217
- columns_to_drop .append (self ._reduced_predicate .get_name ())
218
-
219
178
# Special case for empty tables, since we can't create an empty
220
179
# projection.
221
180
if not columns :
222
181
return bigframes_vendored .ibis .memtable ([])
223
182
224
183
table = self ._table .select (columns )
225
- base_table = table
226
- if self ._reduced_predicate is not None :
227
- table = table .filter (base_table [PREDICATE_COLUMN ])
228
- table = table .drop (* columns_to_drop )
229
184
if fraction is not None :
230
185
table = table .filter (
231
186
bigframes_vendored .ibis .random () < ibis_types .literal (fraction )
232
187
)
233
188
return table
234
189
235
190
def filter (self , predicate : ex .Expression ) -> UnorderedIR :
236
- for ref in predicate .column_references :
237
- ibis_value = self ._get_ibis_column (ref .sql )
238
- if is_window (ibis_value ):
239
- # ibis doesn't support qualify syntax, so create CTE if filtering over window expression
240
- # https://github.com/ibis-project/ibis/issues/9775
241
- return self ._reproject_to_table ().filter (predicate )
242
-
243
- bindings = {col : self ._get_ibis_column (col ) for col in self .column_ids }
244
- condition = op_compiler .compile_expression (predicate , bindings )
245
- return self ._filter (condition ) # type:ignore
246
-
247
- def _filter (self , predicate_value : ibis_types .BooleanValue ) -> UnorderedIR :
248
- """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression."""
249
- expr = self .builder ()
250
- expr .predicates = [* self ._predicates , predicate_value ]
251
- return expr .build ()
191
+ table = self ._to_ibis_expr ()
192
+ condition = op_compiler .compile_expression (predicate , table )
193
+ table = table .filter (condition )
194
+ return UnorderedIR (
195
+ table , tuple (table [column_name ] for column_name in self ._column_names )
196
+ )
252
197
253
198
def aggregate (
254
199
self ,
@@ -279,18 +224,18 @@ def aggregate(
279
224
for aggregate , col_out in aggregations
280
225
}
281
226
if by_column_ids :
227
+ if dropna :
228
+ table = table .filter (
229
+ [table [ref .id .sql ].notnull () for ref in by_column_ids ]
230
+ )
282
231
result = table .group_by ((ref .id .sql for ref in by_column_ids )).aggregate (
283
232
** stats
284
233
)
285
- columns = tuple (result [key ] for key in result .columns )
286
- expr = UnorderedIR (result , columns = columns )
287
- if dropna :
288
- for ref in by_column_ids :
289
- expr = expr ._filter (expr ._compile_expression (ref ).notnull ())
290
- return expr
234
+ return UnorderedIR (
235
+ result , columns = tuple (result [key ] for key in result .columns )
236
+ )
291
237
else :
292
- aggregates = {** stats }
293
- result = table .aggregate (** aggregates )
238
+ result = table .aggregate (** stats )
294
239
return UnorderedIR (
295
240
result ,
296
241
columns = [result [col_id ] for col_id in [* stats .keys ()]],
@@ -310,19 +255,6 @@ def _uniform_sampling(self, fraction: float) -> UnorderedIR:
310
255
)
311
256
312
257
## Helpers
313
- def _set_or_replace_by_id (
314
- self , id : str , new_value : ibis_types .Value
315
- ) -> UnorderedIR :
316
- builder = self .builder ()
317
- if id in self .column_ids :
318
- builder .columns = [
319
- val if (col_id != id ) else new_value .name (id )
320
- for col_id , val in zip (self .column_ids , self ._columns )
321
- ]
322
- else :
323
- builder .columns = [* self .columns , new_value .name (id )]
324
- return builder .build ()
325
-
326
258
def _reproject_to_table (self ) -> UnorderedIR :
327
259
"""
328
260
Internal operators that projects the internal representation into a
@@ -338,24 +270,6 @@ def _reproject_to_table(self) -> UnorderedIR:
338
270
columns = columns ,
339
271
)
340
272
341
- class Builder :
342
- def __init__ (
343
- self ,
344
- table : ibis_types .Table ,
345
- columns : Collection [ibis_types .Value ] = (),
346
- predicates : Optional [Collection [ibis_types .BooleanValue ]] = None ,
347
- ):
348
- self .table = table
349
- self .columns = list (columns )
350
- self .predicates = list (predicates ) if predicates is not None else None
351
-
352
- def build (self ) -> UnorderedIR :
353
- return UnorderedIR (
354
- table = self .table ,
355
- columns = self .columns ,
356
- predicates = self .predicates ,
357
- )
358
-
359
273
@classmethod
360
274
def from_pandas (
361
275
cls ,
@@ -500,8 +414,7 @@ def project_window_op(
500
414
case_statement = case_statement .else_ (window_op ).end () # type: ignore
501
415
window_op = case_statement # type: ignore
502
416
503
- result = self ._set_or_replace_by_id (output_name , window_op )
504
- return result
417
+ return UnorderedIR (self ._table , (* self .columns , window_op .name (output_name )))
505
418
506
419
def _compile_expression (self , expr : ex .Expression ):
507
420
return op_compiler .compile_expression (expr , self ._ibis_bindings )
@@ -517,8 +430,6 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec):
517
430
if window_spec .grouping_keys
518
431
else []
519
432
)
520
- if self ._reduced_predicate is not None :
521
- group_by .append (self ._reduced_predicate )
522
433
523
434
# Construct ordering. There are basically 3 main cases
524
435
# 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed
@@ -569,18 +480,6 @@ def is_window(column: ibis_types.Value) -> bool:
569
480
return any (isinstance (op , ibis_ops .WindowFunction ) for op in matches )
570
481
571
482
572
- def _reduce_predicate_list (
573
- predicate_list : typing .Collection [ibis_types .BooleanValue ],
574
- ) -> ibis_types .BooleanValue :
575
- """Converts a list of predicates BooleanValues into a single BooleanValue."""
576
- if len (predicate_list ) == 0 :
577
- raise ValueError ("Cannot reduce empty list of predicates" )
578
- if len (predicate_list ) == 1 :
579
- (item ,) = predicate_list
580
- return item
581
- return functools .reduce (lambda acc , pred : acc .__and__ (pred ), predicate_list )
582
-
583
-
584
483
def _convert_ordering_to_table_values (
585
484
value_lookup : typing .Mapping [str , ibis_types .Value ],
586
485
ordering_columns : typing .Sequence [OrderingExpression ],
0 commit comments