39
39
import bigframes .core .guid as guid
40
40
import bigframes .core .indexes as indexes
41
41
import bigframes .core .joins as joins
42
+ import bigframes .core .joins .name_resolution as join_names
42
43
import bigframes .core .ordering as ordering
43
44
import bigframes .core .utils
44
45
import bigframes .core .utils as utils
@@ -97,7 +98,8 @@ def __init__(
97
98
"'index_columns' and 'index_labels' must have equal length"
98
99
)
99
100
if len (index_columns ) == 0 :
100
- expr , new_index_col_id = expr .promote_offsets ()
101
+ new_index_col_id = guid .generate_guid ()
102
+ expr = expr .promote_offsets (new_index_col_id )
101
103
index_columns = [new_index_col_id ]
102
104
self ._index_columns = tuple (index_columns )
103
105
# Index labels don't need complicated hierarchical access so can store as tuple
@@ -260,7 +262,8 @@ def reset_index(self, drop: bool = True) -> Block:
260
262
from Index classes that point to this block.
261
263
"""
262
264
block = self
263
- expr , new_index_col_id = self ._expr .promote_offsets ()
265
+ new_index_col_id = guid .generate_guid ()
266
+ expr = self ._expr .promote_offsets (new_index_col_id )
264
267
if drop :
265
268
# Even though the index might be part of the ordering, keep that
266
269
# ordering expression as reset_index shouldn't change the row
@@ -833,7 +836,8 @@ def aggregate_all_and_stack(
833
836
else : # axis_n == 1
834
837
# using offsets as identity to group on.
835
838
# TODO: Allow to promote identity/total_order columns instead for better perf
836
- expr_with_offsets , offset_col = self .expr .promote_offsets ()
839
+ offset_col = guid .generate_guid ()
840
+ expr_with_offsets = self .expr .promote_offsets (offset_col )
837
841
stacked_expr = expr_with_offsets .unpivot (
838
842
row_labels = self .column_labels .to_list (),
839
843
index_col_ids = [guid .generate_guid ()],
@@ -952,9 +956,10 @@ def aggregate(
952
956
]
953
957
by_column_labels = self ._get_labels_for_columns (by_value_columns )
954
958
labels = (* by_column_labels , * aggregate_labels )
955
- result_expr_pruned , offsets_id = result_expr .select_columns (
959
+ offsets_id = guid .generate_guid ()
960
+ result_expr_pruned = result_expr .select_columns (
956
961
[* by_value_columns , * output_col_ids ]
957
- ).promote_offsets ()
962
+ ).promote_offsets (offsets_id )
958
963
959
964
return (
960
965
Block (
@@ -975,7 +980,8 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):
975
980
976
981
aggregations = [(column_id , stat , stat .name ) for stat in stats_to_fetch ]
977
982
expr = self .expr .aggregate (aggregations )
978
- expr , offset_index_id = expr .promote_offsets ()
983
+ offset_index_id = guid .generate_guid ()
984
+ expr = expr .promote_offsets (offset_index_id )
979
985
block = Block (
980
986
expr ,
981
987
index_columns = [offset_index_id ],
@@ -999,7 +1005,8 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str):
999
1005
)
1000
1006
]
1001
1007
expr = self .expr .corr_aggregate (corr_aggregations )
1002
- expr , offset_index_id = expr .promote_offsets ()
1008
+ offset_index_id = guid .generate_guid ()
1009
+ expr = expr .promote_offsets (offset_index_id )
1003
1010
block = Block (
1004
1011
expr ,
1005
1012
index_columns = [offset_index_id ],
@@ -1197,7 +1204,8 @@ def retrieve_repr_request_results(
1197
1204
return formatted_df , count , query_job
1198
1205
1199
1206
def promote_offsets (self , label : Label = None ) -> typing .Tuple [Block , str ]:
1200
- expr , result_id = self ._expr .promote_offsets ()
1207
+ result_id = guid .generate_guid ()
1208
+ expr = self ._expr .promote_offsets (result_id )
1201
1209
return (
1202
1210
Block (
1203
1211
expr ,
@@ -1471,67 +1479,76 @@ def merge(
1471
1479
"outer" ,
1472
1480
"right" ,
1473
1481
],
1474
- left_col_ids : typing .Sequence [str ],
1475
- right_col_ids : typing .Sequence [str ],
1482
+ left_join_ids : typing .Sequence [str ],
1483
+ right_join_ids : typing .Sequence [str ],
1476
1484
sort : bool ,
1477
1485
suffixes : tuple [str , str ] = ("_x" , "_y" ),
1478
1486
) -> Block :
1479
- (
1480
- joined_expr ,
1481
- coalesced_join_cols ,
1482
- (get_column_left , get_column_right ),
1483
- ) = joins .join_by_column (
1487
+ joined_expr = joins .join_by_column (
1484
1488
self .expr ,
1485
- left_col_ids ,
1489
+ left_join_ids ,
1486
1490
other .expr ,
1487
- right_col_ids ,
1491
+ right_join_ids ,
1488
1492
how = how ,
1489
- sort = sort ,
1490
1493
)
1494
+ get_column_left , get_column_right = join_names .JOIN_NAME_REMAPPER (
1495
+ self .expr .column_ids , other .expr .column_ids
1496
+ )
1497
+ result_columns = []
1498
+ matching_join_labels = []
1499
+
1500
+ coalesced_ids = []
1501
+ for left_id , right_id in zip (left_join_ids , right_join_ids ):
1502
+ coalesced_id = guid .generate_guid ()
1503
+ joined_expr = joined_expr .project_binary_op (
1504
+ get_column_left [left_id ],
1505
+ get_column_right [right_id ],
1506
+ ops .coalesce_op ,
1507
+ coalesced_id ,
1508
+ )
1509
+ coalesced_ids .append (coalesced_id )
1510
+
1511
+ for col_id in self .value_columns :
1512
+ if col_id in left_join_ids :
1513
+ key_part = left_join_ids .index (col_id )
1514
+ matching_right_id = right_join_ids [key_part ]
1515
+ if (
1516
+ self .col_id_to_label [col_id ]
1517
+ == other .col_id_to_label [matching_right_id ]
1518
+ ):
1519
+ matching_join_labels .append (self .col_id_to_label [col_id ])
1520
+ result_columns .append (coalesced_ids [key_part ])
1521
+ else :
1522
+ result_columns .append (get_column_left [col_id ])
1523
+ else :
1524
+ result_columns .append (get_column_left [col_id ])
1525
+ for col_id in other .value_columns :
1526
+ if col_id in right_join_ids :
1527
+ key_part = right_join_ids .index (col_id )
1528
+ if other .col_id_to_label [matching_right_id ] in matching_join_labels :
1529
+ pass
1530
+ else :
1531
+ result_columns .append (get_column_right [col_id ])
1532
+ else :
1533
+ result_columns .append (get_column_right [col_id ])
1491
1534
1492
- # which join key parts should be coalesced
1493
- merge_join_key_mask = [
1494
- str (self .col_id_to_label [left_id ]) == str (other .col_id_to_label [right_id ])
1495
- for left_id , right_id in zip (left_col_ids , right_col_ids )
1496
- ]
1497
- labels_to_coalesce = [
1498
- self .col_id_to_label [col_id ]
1499
- for i , col_id in enumerate (left_col_ids )
1500
- if merge_join_key_mask [i ]
1501
- ]
1502
-
1503
- def left_col_mapping (col_id : str ) -> str :
1504
- if col_id in left_col_ids :
1505
- join_key_part = left_col_ids .index (col_id )
1506
- if merge_join_key_mask [join_key_part ]:
1507
- return coalesced_join_cols [join_key_part ]
1508
- return get_column_left (col_id )
1509
-
1510
- def right_col_mapping (col_id : str ) -> typing .Optional [str ]:
1511
- if col_id in right_col_ids :
1512
- join_key_part = right_col_ids .index (col_id )
1513
- if merge_join_key_mask [join_key_part ]:
1514
- return None
1515
- return get_column_right (col_id )
1516
-
1517
- left_columns = [left_col_mapping (col_id ) for col_id in self .value_columns ]
1518
-
1519
- right_columns = [
1520
- typing .cast (str , right_col_mapping (col_id ))
1521
- for col_id in other .value_columns
1522
- if right_col_mapping (col_id )
1523
- ]
1535
+ if sort :
1536
+ # sort uses coalesced join keys always
1537
+ joined_expr = joined_expr .order_by (
1538
+ [ordering .OrderingColumnReference (col_id ) for col_id in coalesced_ids ],
1539
+ stable = True ,
1540
+ )
1524
1541
1525
- expr = joined_expr .select_columns ([ * left_columns , * right_columns ] )
1542
+ joined_expr = joined_expr .select_columns (result_columns )
1526
1543
labels = utils .merge_column_labels (
1527
1544
self .column_labels ,
1528
1545
other .column_labels ,
1529
- coalesce_labels = labels_to_coalesce ,
1546
+ coalesce_labels = matching_join_labels ,
1530
1547
suffixes = suffixes ,
1531
1548
)
1532
-
1533
1549
# Constructs default index
1534
- expr , offset_index_id = expr .promote_offsets ()
1550
+ offset_index_id = guid .generate_guid ()
1551
+ expr = joined_expr .promote_offsets (offset_index_id )
1535
1552
return Block (expr , index_columns = [offset_index_id ], column_labels = labels )
1536
1553
1537
1554
def _force_reproject (self ) -> Block :
0 commit comments