29
29
import bigframes .core .compile
30
30
import bigframes .core .expression as ex
31
31
import bigframes .core .guid
32
+ import bigframes .core .identifiers as ids
32
33
import bigframes .core .join_def as join_def
33
34
import bigframes .core .local_data as local_data
34
35
import bigframes .core .nodes as nodes
@@ -169,7 +170,7 @@ def row_count(self) -> ArrayValue:
169
170
# Operations
170
171
def filter_by_id (self , predicate_id : str , keep_null : bool = False ) -> ArrayValue :
171
172
"""Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression."""
172
- predicate : ex .Expression = ex .free_var (predicate_id )
173
+ predicate : ex .Expression = ex .deref (predicate_id )
173
174
if keep_null :
174
175
predicate = ops .fillna_op .as_expr (predicate , ex .const (True ))
175
176
return self .filter (predicate )
@@ -200,7 +201,9 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
200
201
)
201
202
202
203
return (
203
- ArrayValue (nodes .PromoteOffsetsNode (child = self .node , col_id = col_id )),
204
+ ArrayValue (
205
+ nodes .PromoteOffsetsNode (child = self .node , col_id = ids .ColumnId (col_id ))
206
+ ),
204
207
col_id ,
205
208
)
206
209
@@ -212,7 +215,9 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
212
215
213
216
def compute_values (self , assignments : Sequence [ex .Expression ]):
214
217
col_ids = self ._gen_namespaced_uids (len (assignments ))
215
- ex_id_pairs = tuple ((ex , id ) for ex , id in zip (assignments , col_ids ))
218
+ ex_id_pairs = tuple (
219
+ (ex , ids .ColumnId (id )) for ex , id in zip (assignments , col_ids )
220
+ )
216
221
return (
217
222
ArrayValue (nodes .ProjectionNode (child = self .node , assignments = ex_id_pairs )),
218
223
col_ids ,
@@ -228,14 +233,19 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue:
228
233
if destination_id in self .column_ids : # Mutate case
229
234
exprs = [
230
235
(
231
- (source_id if (col_id == destination_id ) else col_id ),
232
- col_id ,
236
+ ex . deref (source_id if (col_id == destination_id ) else col_id ),
237
+ ids . ColumnId ( col_id ) ,
233
238
)
234
239
for col_id in self .column_ids
235
240
]
236
241
else : # append case
237
- self_projection = ((col_id , col_id ) for col_id in self .column_ids )
238
- exprs = [* self_projection , (source_id , destination_id )]
242
+ self_projection = (
243
+ (ex .deref (col_id ), ids .ColumnId (col_id )) for col_id in self .column_ids
244
+ )
245
+ exprs = [
246
+ * self_projection ,
247
+ (ex .deref (source_id ), ids .ColumnId (destination_id )),
248
+ ]
239
249
return ArrayValue (
240
250
nodes .SelectionNode (
241
251
child = self .node ,
@@ -248,24 +258,15 @@ def create_constant(
248
258
value : typing .Any ,
249
259
dtype : typing .Optional [bigframes .dtypes .Dtype ],
250
260
) -> Tuple [ArrayValue , str ]:
251
- destination_id = self ._gen_namespaced_uid ()
252
261
if pandas .isna (value ):
253
262
# Need to assign a data type when value is NaN.
254
263
dtype = dtype or bigframes .dtypes .DEFAULT_DTYPE
255
264
256
- return (
257
- ArrayValue (
258
- nodes .ProjectionNode (
259
- child = self .node ,
260
- assignments = ((ex .const (value , dtype ), destination_id ),),
261
- )
262
- ),
263
- destination_id ,
264
- )
265
+ return self .project_to_id (ex .const (value , dtype ))
265
266
266
267
def select_columns (self , column_ids : typing .Sequence [str ]) -> ArrayValue :
267
268
# This basically just drops and reorders columns - logically a no-op except as a final step
268
- selections = ((col_id , col_id ) for col_id in column_ids )
269
+ selections = ((ex . deref ( col_id ), ids . ColumnId ( col_id ) ) for col_id in column_ids )
269
270
return ArrayValue (
270
271
nodes .SelectionNode (
271
272
child = self .node ,
@@ -274,14 +275,8 @@ def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
274
275
)
275
276
276
277
def drop_columns (self , columns : Iterable [str ]) -> ArrayValue :
277
- new_projection = (
278
- (col_id , col_id ) for col_id in self .column_ids if col_id not in columns
279
- )
280
- return ArrayValue (
281
- nodes .SelectionNode (
282
- child = self .node ,
283
- input_output_pairs = tuple (new_projection ),
284
- )
278
+ return self .select_columns (
279
+ [col_id for col_id in self .column_ids if col_id not in columns ]
285
280
)
286
281
287
282
def aggregate (
@@ -297,11 +292,12 @@ def aggregate(
297
292
by_column_id: column id of the aggregation key, this is preserved through the transform
298
293
dropna: whether null keys should be dropped
299
294
"""
295
+ agg_defs = tuple ((agg , ids .ColumnId (name )) for agg , name in aggregations )
300
296
return ArrayValue (
301
297
nodes .AggregateNode (
302
298
child = self .node ,
303
- aggregations = tuple ( aggregations ) ,
304
- by_column_ids = tuple (by_column_ids ),
299
+ aggregations = agg_defs ,
300
+ by_column_ids = tuple (map ( ex . deref , by_column_ids ) ),
305
301
dropna = dropna ,
306
302
)
307
303
)
@@ -342,10 +338,10 @@ def project_window_op(
342
338
ArrayValue (
343
339
nodes .WindowOpNode (
344
340
child = self .node ,
345
- column_name = column_name ,
341
+ column_name = ex . deref ( column_name ) ,
346
342
op = op ,
347
343
window_spec = window_spec ,
348
- output_name = output_name ,
344
+ output_name = ids . ColumnId ( output_name ) ,
349
345
never_skip_nulls = never_skip_nulls ,
350
346
skip_reproject_unsafe = skip_reproject_unsafe ,
351
347
)
@@ -376,7 +372,9 @@ def relational_join(
376
372
join_node = nodes .JoinNode (
377
373
left_child = self .node ,
378
374
right_child = other .node ,
379
- conditions = conditions ,
375
+ conditions = tuple (
376
+ (ex .deref (l_col ), ex .deref (r_col )) for l_col , r_col in conditions
377
+ ),
380
378
type = type ,
381
379
)
382
380
# Maps input ids to output ids for caller convenience
@@ -414,7 +412,7 @@ def explode(self, column_ids: typing.Sequence[str]) -> ArrayValue:
414
412
for column_id in column_ids :
415
413
assert bigframes .dtypes .is_array_like (self .get_column_type (column_id ))
416
414
417
- offsets = tuple (self . get_offset_for_name (id ) for id in column_ids )
415
+ offsets = tuple (ex . deref (id ) for id in column_ids )
418
416
return ArrayValue (nodes .ExplodeNode (child = self .node , column_ids = offsets ))
419
417
420
418
def _uniform_sampling (self , fraction : float ) -> ArrayValue :
@@ -425,9 +423,6 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue:
425
423
"""
426
424
return ArrayValue (nodes .RandomSampleNode (self .node , fraction ))
427
425
428
- def get_offset_for_name (self , name : str ):
429
- return self .schema .names .index (name )
430
-
431
426
# Deterministically generate namespaced ids for new variables
432
427
# These new ids are only unique within the current namespace.
433
428
# Many operations, such as joins, create new namespaces. See: BigFrameNode.defines_namespace
0 commit comments