Skip to content

Commit 26515c3

Browse files
authored
feat: df.join lsuffix and rsuffix support (#1857)
* feat: df.join lsuffix and rsuffix support * raise error when on is duplicated. * rename * error update. * test fix. * add doc and test fixes * skip pandas 1.x test * test fixes * create join on key helper function * test fix * test fix * update join to avoid inplace changes. * add assertion for columns not changed * add assertion for columns not changed
1 parent feb3ff4 commit 26515c3

File tree

4 files changed

+313
-45
lines changed

4 files changed

+313
-45
lines changed

bigframes/dataframe.py

Lines changed: 155 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3520,70 +3520,138 @@ def join(
35203520
*,
35213521
on: Optional[str] = None,
35223522
how: str = "left",
3523+
lsuffix: str = "",
3524+
rsuffix: str = "",
35233525
) -> DataFrame:
35243526
if isinstance(other, bigframes.series.Series):
35253527
other = other.to_frame()
35263528

35273529
left, right = self, other
35283530

3529-
if not left.columns.intersection(right.columns).empty:
3530-
raise NotImplementedError(
3531-
f"Deduping column names is not implemented. {constants.FEEDBACK_LINK}"
3532-
)
3531+
col_intersection = left.columns.intersection(right.columns)
3532+
3533+
if not col_intersection.empty:
3534+
if lsuffix == rsuffix == "":
3535+
raise ValueError(
3536+
f"columns overlap but no suffix specified: {col_intersection}"
3537+
)
3538+
35333539
if how == "cross":
35343540
if on is not None:
35353541
raise ValueError("'on' is not supported for cross join.")
35363542
result_block = left._block.merge(
35373543
right._block,
35383544
left_join_ids=[],
35393545
right_join_ids=[],
3540-
suffixes=("", ""),
3546+
suffixes=(lsuffix, rsuffix),
35413547
how="cross",
35423548
sort=True,
35433549
)
35443550
return DataFrame(result_block)
35453551

35463552
# Join left columns with right index
35473553
if on is not None:
3554+
if left._has_index and (on in left.index.names):
3555+
if on in left.columns:
3556+
raise ValueError(
3557+
f"'{on}' is both an index level and a column label, which is ambiguous."
3558+
)
3559+
else:
3560+
raise NotImplementedError(
3561+
f"Joining on index level '{on}' is not yet supported. {constants.FEEDBACK_LINK}"
3562+
)
3563+
if (left.columns == on).sum() > 1:
3564+
raise ValueError(f"The column label '{on}' is not unique.")
3565+
35483566
if other._block.index.nlevels != 1:
35493567
raise ValueError(
35503568
"Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported."
35513569
)
3552-
# Switch left index with on column
3553-
left_columns = left.columns
3554-
left_idx_original_names = left.index.names if left._has_index else ()
3555-
left_idx_names_in_cols = [
3556-
f"bigframes_left_idx_name_{i}"
3557-
for i in range(len(left_idx_original_names))
3558-
]
3559-
if left._has_index:
3560-
left.index.names = left_idx_names_in_cols
3561-
left = left.reset_index(drop=False)
3562-
left = left.set_index(on)
3563-
3564-
# Join on index and switch back
3565-
combined_df = left._perform_join_by_index(right, how=how)
3566-
combined_df.index.name = on
3567-
combined_df = combined_df.reset_index(drop=False)
3568-
combined_df = combined_df.set_index(left_idx_names_in_cols)
3569-
3570-
# To be consistent with Pandas
3571-
if combined_df._has_index:
3572-
combined_df.index.names = (
3573-
left_idx_original_names
3574-
if how in ("inner", "left")
3575-
else ([None] * len(combined_df.index.names))
3576-
)
35773570

3578-
# Reorder columns
3579-
combined_df = combined_df[list(left_columns) + list(right.columns)]
3580-
return combined_df
3571+
return self._join_on_key(
3572+
other,
3573+
on=on,
3574+
how=how,
3575+
lsuffix=lsuffix,
3576+
rsuffix=rsuffix,
3577+
should_duplicate_on_key=(on in col_intersection),
3578+
)
35813579

35823580
# Join left index with right index
35833581
if left._block.index.nlevels != right._block.index.nlevels:
35843582
raise ValueError("Index to join on must have the same number of levels.")
35853583

3586-
return left._perform_join_by_index(right, how=how)
3584+
return left._perform_join_by_index(right, how=how)._add_join_suffix(
3585+
left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix
3586+
)
3587+
3588+
def _join_on_key(
3589+
self,
3590+
other: DataFrame,
3591+
on: str,
3592+
how: str,
3593+
lsuffix: str,
3594+
rsuffix: str,
3595+
should_duplicate_on_key: bool,
3596+
) -> DataFrame:
3597+
left, right = self.copy(), other
3598+
# Replace all columns names with unique names for reordering.
3599+
left_col_original_names = left.columns
3600+
on_col_name = "bigframes_left_col_on"
3601+
dup_on_col_name = "bigframes_left_col_on_dup"
3602+
left_col_temp_names = [
3603+
f"bigframes_left_col_name_{i}" if col_name != on else on_col_name
3604+
for i, col_name in enumerate(left_col_original_names)
3605+
]
3606+
left.columns = pandas.Index(left_col_temp_names)
3607+
# if on column is also in right df, we need to duplicate the column
3608+
# and set it to be the first column
3609+
if should_duplicate_on_key:
3610+
left[dup_on_col_name] = left[on_col_name]
3611+
on_col_name = dup_on_col_name
3612+
left_col_temp_names = [on_col_name] + left_col_temp_names
3613+
left = left[left_col_temp_names]
3614+
3615+
# Switch left index with on column
3616+
left_idx_original_names = left.index.names if left._has_index else ()
3617+
left_idx_names_in_cols = [
3618+
f"bigframes_left_idx_name_{i}" for i in range(len(left_idx_original_names))
3619+
]
3620+
if left._has_index:
3621+
left.index.names = left_idx_names_in_cols
3622+
left = left.reset_index(drop=False)
3623+
left = left.set_index(on_col_name)
3624+
3625+
right_col_original_names = right.columns
3626+
right_col_temp_names = [
3627+
f"bigframes_right_col_name_{i}"
3628+
for i in range(len(right_col_original_names))
3629+
]
3630+
right.columns = pandas.Index(right_col_temp_names)
3631+
3632+
# Join on index and switch back
3633+
combined_df = left._perform_join_by_index(right, how=how)
3634+
combined_df.index.name = on_col_name
3635+
combined_df = combined_df.reset_index(drop=False)
3636+
combined_df = combined_df.set_index(left_idx_names_in_cols)
3637+
3638+
# To be consistent with Pandas
3639+
if combined_df._has_index:
3640+
combined_df.index.names = (
3641+
left_idx_original_names
3642+
if how in ("inner", "left")
3643+
else ([None] * len(combined_df.index.names))
3644+
)
3645+
3646+
# Reorder columns
3647+
combined_df = combined_df[left_col_temp_names + right_col_temp_names]
3648+
return combined_df._add_join_suffix(
3649+
left_col_original_names,
3650+
right_col_original_names,
3651+
lsuffix=lsuffix,
3652+
rsuffix=rsuffix,
3653+
extra_col=on if on_col_name == dup_on_col_name else None,
3654+
)
35873655

35883656
def _perform_join_by_index(
35893657
self,
@@ -3597,6 +3665,59 @@ def _perform_join_by_index(
35973665
)
35983666
return DataFrame(block)
35993667

3668+
def _add_join_suffix(
3669+
self,
3670+
left_columns,
3671+
right_columns,
3672+
lsuffix: str = "",
3673+
rsuffix: str = "",
3674+
extra_col: typing.Optional[str] = None,
3675+
):
3676+
"""Applies suffixes to overlapping column names to mimic a pandas join.
3677+
3678+
This method identifies columns that are common to both a "left" and "right"
3679+
set of columns and renames them using the provided suffixes. Columns that
3680+
are not in the intersection are kept with their original names.
3681+
3682+
Args:
3683+
left_columns (pandas.Index):
3684+
The column labels from the left DataFrame.
3685+
right_columns (pandas.Index):
3686+
The column labels from the right DataFrame.
3687+
lsuffix (str):
3688+
The suffix to apply to overlapping column names from the left side.
3689+
rsuffix (str):
3690+
The suffix to apply to overlapping column names from the right side.
3691+
extra_col (typing.Optional[str]):
3692+
An optional column name to prepend to the final list of columns.
3693+
This argument is used specifically to match the behavior of a
3694+
pandas join. When a join key (i.e., the 'on' column) exists
3695+
in both the left and right DataFrames, pandas creates two versions
3696+
of that column: one copy keeps its original name and is placed as
3697+
the first column, while the other instances receive the normal
3698+
suffix. Passing the join key's name here replicates that behavior.
3699+
3700+
Returns:
3701+
DataFrame:
3702+
A new DataFrame with the columns renamed to resolve overlaps.
3703+
"""
3704+
combined_df = self.copy()
3705+
col_intersection = left_columns.intersection(right_columns)
3706+
final_col_names = [] if extra_col is None else [extra_col]
3707+
for col_name in left_columns:
3708+
if col_name in col_intersection:
3709+
final_col_names.append(f"{col_name}{lsuffix}")
3710+
else:
3711+
final_col_names.append(col_name)
3712+
3713+
for col_name in right_columns:
3714+
if col_name in col_intersection:
3715+
final_col_names.append(f"{col_name}{rsuffix}")
3716+
else:
3717+
final_col_names.append(col_name)
3718+
combined_df.columns = pandas.Index(final_col_names)
3719+
return combined_df
3720+
36003721
@validations.requires_ordering()
36013722
def rolling(
36023723
self,

tests/system/small/test_dataframe.py

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2981,12 +2981,102 @@ def test_join_different_table(
29812981
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)
29822982

29832983

2984-
def test_join_duplicate_columns_raises_not_implemented(scalars_dfs):
2985-
scalars_df, _ = scalars_dfs
2986-
df_a = scalars_df[["string_col", "float64_col"]]
2987-
df_b = scalars_df[["float64_col"]]
2988-
with pytest.raises(NotImplementedError):
2989-
df_a.join(df_b, how="outer").to_pandas()
2984+
@all_joins
2985+
def test_join_different_table_with_duplicate_column_name(
2986+
scalars_df_index, scalars_pandas_df_index, how
2987+
):
2988+
bf_df_a = scalars_df_index[["string_col", "int64_col", "int64_too"]].rename(
2989+
columns={"int64_too": "int64_col"}
2990+
)
2991+
bf_df_b = scalars_df_index.dropna()[
2992+
["string_col", "int64_col", "int64_too"]
2993+
].rename(columns={"int64_too": "int64_col"})
2994+
bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas()
2995+
pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename(
2996+
columns={"int64_too": "int64_col"}
2997+
)
2998+
pd_df_b = scalars_pandas_df_index.dropna()[
2999+
["string_col", "int64_col", "int64_too"]
3000+
].rename(columns={"int64_too": "int64_col"})
3001+
pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r")
3002+
3003+
# Ensure no inplace changes
3004+
pd.testing.assert_index_equal(bf_df_a.columns, pd_df_a.columns)
3005+
pd.testing.assert_index_equal(bf_df_b.index.to_pandas(), pd_df_b.index)
3006+
pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False)
3007+
3008+
3009+
@all_joins
3010+
def test_join_param_on_with_duplicate_column_name_not_on_col(
3011+
scalars_df_index, scalars_pandas_df_index, how
3012+
):
3013+
# This test is for duplicate column names, but the 'on' column is not duplicated.
3014+
if how == "cross":
3015+
return
3016+
bf_df_a = scalars_df_index[
3017+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3018+
].rename(columns={"timestamp_col": "datetime_col"})
3019+
bf_df_b = scalars_df_index.dropna()[
3020+
["string_col", "datetime_col", "timestamp_col"]
3021+
].rename(columns={"timestamp_col": "datetime_col"})
3022+
bf_result = bf_df_a.join(
3023+
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
3024+
).to_pandas()
3025+
pd_df_a = scalars_pandas_df_index[
3026+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3027+
].rename(columns={"timestamp_col": "datetime_col"})
3028+
pd_df_b = scalars_pandas_df_index.dropna()[
3029+
["string_col", "datetime_col", "timestamp_col"]
3030+
].rename(columns={"timestamp_col": "datetime_col"})
3031+
pd_result = pd_df_a.join(
3032+
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
3033+
)
3034+
pd.testing.assert_frame_equal(
3035+
bf_result.sort_index(),
3036+
pd_result.sort_index(),
3037+
check_like=True,
3038+
check_index_type=False,
3039+
check_names=False,
3040+
)
3041+
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns)
3042+
3043+
3044+
@pytest.mark.skipif(
3045+
pandas.__version__.startswith("1."), reason="bad left join in pandas 1.x"
3046+
)
3047+
@all_joins
3048+
def test_join_param_on_with_duplicate_column_name_on_col(
3049+
scalars_df_index, scalars_pandas_df_index, how
3050+
):
3051+
# This test is for duplicate column names, and the 'on' column is duplicated.
3052+
if how == "cross":
3053+
return
3054+
bf_df_a = scalars_df_index[
3055+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3056+
].rename(columns={"timestamp_col": "datetime_col"})
3057+
bf_df_b = scalars_df_index.dropna()[
3058+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3059+
].rename(columns={"timestamp_col": "datetime_col"})
3060+
bf_result = bf_df_a.join(
3061+
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
3062+
).to_pandas()
3063+
pd_df_a = scalars_pandas_df_index[
3064+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3065+
].rename(columns={"timestamp_col": "datetime_col"})
3066+
pd_df_b = scalars_pandas_df_index.dropna()[
3067+
["string_col", "datetime_col", "timestamp_col", "int64_too"]
3068+
].rename(columns={"timestamp_col": "datetime_col"})
3069+
pd_result = pd_df_a.join(
3070+
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r"
3071+
)
3072+
pd.testing.assert_frame_equal(
3073+
bf_result.sort_index(),
3074+
pd_result.sort_index(),
3075+
check_like=True,
3076+
check_index_type=False,
3077+
check_names=False,
3078+
)
3079+
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns)
29903080

29913081

29923082
@all_joins

tests/unit/test_dataframe_polars.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2445,12 +2445,40 @@ def test_join_different_table(
24452445
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)
24462446

24472447

2448-
def test_join_duplicate_columns_raises_not_implemented(scalars_dfs):
2448+
@all_joins
2449+
def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how):
2450+
if how == "cross":
2451+
return
2452+
bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename(
2453+
columns={"int64_col": "string_col"}
2454+
)
2455+
bf_df_b = scalars_df_index.dropna()["string_col"]
2456+
with pytest.raises(
2457+
ValueError, match="The column label 'string_col' is not unique."
2458+
):
2459+
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r")
2460+
2461+
2462+
def test_join_duplicate_columns_raises_value_error(scalars_dfs):
24492463
scalars_df, _ = scalars_dfs
24502464
df_a = scalars_df[["string_col", "float64_col"]]
24512465
df_b = scalars_df[["float64_col"]]
2452-
with pytest.raises(NotImplementedError):
2453-
df_a.join(df_b, how="outer").to_pandas()
2466+
with pytest.raises(ValueError, match="columns overlap but no suffix specified"):
2467+
df_a.join(df_b, how="outer")
2468+
2469+
2470+
@all_joins
2471+
def test_join_param_on_duplicate_with_index_raises_value_error(scalars_df_index, how):
2472+
if how == "cross":
2473+
return
2474+
bf_df_a = scalars_df_index[["string_col"]]
2475+
bf_df_a.index.name = "string_col"
2476+
bf_df_b = scalars_df_index.dropna()["string_col"]
2477+
with pytest.raises(
2478+
ValueError,
2479+
match="'string_col' is both an index level and a column label, which is ambiguous.",
2480+
):
2481+
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r")
24542482

24552483

24562484
@all_joins
@@ -2462,7 +2490,7 @@ def test_join_param_on(scalars_dfs, how):
24622490
bf_df_b = bf_df[["float64_col"]]
24632491

24642492
if how == "cross":
2465-
with pytest.raises(ValueError):
2493+
with pytest.raises(ValueError, match="'on' is not supported for cross join."):
24662494
bf_df_a.join(bf_df_b, on="rowindex_2", how=how)
24672495
else:
24682496
bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas()

0 commit comments

Comments
 (0)