Skip to content

Commit 38abfd8

Browse files
authored
refactor: move main merge logic from df to reshape package (#2217)
* refactor: move main merge logic from df to reshape package * fix mypy and tests
1 parent 0ff1395 commit 38abfd8

File tree

2 files changed

+102
-101
lines changed

2 files changed

+102
-101
lines changed

bigframes/core/reshape/merge.py

Lines changed: 95 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,93 @@
1818

1919
from __future__ import annotations
2020

21-
import typing
22-
from typing import Literal, Optional
21+
from typing import Literal, Sequence
2322

2423
import bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge
2524

26-
# Avoid cirular imports.
27-
if typing.TYPE_CHECKING:
28-
import bigframes.dataframe
29-
import bigframes.series
25+
from bigframes import dataframe, series
26+
from bigframes.core import blocks, utils
3027

3128

3229
def merge(
33-
left: bigframes.dataframe.DataFrame,
34-
right: bigframes.dataframe.DataFrame,
30+
left: dataframe.DataFrame,
31+
right: dataframe.DataFrame,
3532
how: Literal[
3633
"inner",
3734
"left",
3835
"outer",
3936
"right",
4037
"cross",
4138
] = "inner",
42-
on: Optional[str] = None,
39+
on: blocks.Label | Sequence[blocks.Label] | None = None,
4340
*,
44-
left_on: Optional[str] = None,
45-
right_on: Optional[str] = None,
41+
left_on: blocks.Label | Sequence[blocks.Label] | None = None,
42+
right_on: blocks.Label | Sequence[blocks.Label] | None = None,
4643
sort: bool = False,
4744
suffixes: tuple[str, str] = ("_x", "_y"),
48-
) -> bigframes.dataframe.DataFrame:
45+
) -> dataframe.DataFrame:
4946
left = _validate_operand(left)
5047
right = _validate_operand(right)
5148

52-
return left.merge(
53-
right,
54-
how=how,
55-
on=on,
56-
left_on=left_on,
57-
right_on=right_on,
49+
if how == "cross":
50+
if on is not None:
51+
raise ValueError("'on' is not supported for cross join.")
52+
result_block = left._block.merge(
53+
right._block,
54+
left_join_ids=[],
55+
right_join_ids=[],
56+
suffixes=suffixes,
57+
how=how,
58+
sort=True,
59+
)
60+
return dataframe.DataFrame(result_block)
61+
62+
left_on, right_on = _validate_left_right_on(
63+
left, right, on, left_on=left_on, right_on=right_on
64+
)
65+
66+
if utils.is_list_like(left_on):
67+
left_on = list(left_on) # type: ignore
68+
else:
69+
left_on = [left_on]
70+
71+
if utils.is_list_like(right_on):
72+
right_on = list(right_on) # type: ignore
73+
else:
74+
right_on = [right_on]
75+
76+
left_join_ids = []
77+
for label in left_on: # type: ignore
78+
left_col_id = left._resolve_label_exact(label)
79+
# 0 elements already throws an exception
80+
if not left_col_id:
81+
raise ValueError(f"No column {label} found in self.")
82+
left_join_ids.append(left_col_id)
83+
84+
right_join_ids = []
85+
for label in right_on: # type: ignore
86+
right_col_id = right._resolve_label_exact(label)
87+
if not right_col_id:
88+
raise ValueError(f"No column {label} found in other.")
89+
right_join_ids.append(right_col_id)
90+
91+
block = left._block.merge(
92+
right._block,
93+
how,
94+
left_join_ids,
95+
right_join_ids,
5896
sort=sort,
5997
suffixes=suffixes,
6098
)
99+
return dataframe.DataFrame(block)
61100

62101

63102
merge.__doc__ = vendored_pandas_merge.merge.__doc__
64103

65104

66105
def _validate_operand(
67-
obj: bigframes.dataframe.DataFrame | bigframes.series.Series,
68-
) -> bigframes.dataframe.DataFrame:
106+
obj: dataframe.DataFrame | series.Series,
107+
) -> dataframe.DataFrame:
69108
import bigframes.dataframe
70109
import bigframes.series
71110

@@ -79,3 +118,39 @@ def _validate_operand(
79118
raise TypeError(
80119
f"Can only merge bigframes.series.Series or bigframes.dataframe.DataFrame objects, a {type(obj)} was passed"
81120
)
121+
122+
123+
def _validate_left_right_on(
124+
left: dataframe.DataFrame,
125+
right: dataframe.DataFrame,
126+
on: blocks.Label | Sequence[blocks.Label] | None = None,
127+
*,
128+
left_on: blocks.Label | Sequence[blocks.Label] | None = None,
129+
right_on: blocks.Label | Sequence[blocks.Label] | None = None,
130+
):
131+
if on is not None:
132+
if left_on is not None or right_on is not None:
133+
raise ValueError(
134+
"Can not pass both `on` and `left_on` + `right_on` params."
135+
)
136+
return on, on
137+
138+
if left_on is not None and right_on is not None:
139+
return left_on, right_on
140+
141+
left_cols = left.columns
142+
right_cols = right.columns
143+
common_cols = left_cols.intersection(right_cols)
144+
if len(common_cols) == 0:
145+
raise ValueError(
146+
"No common columns to perform merge on."
147+
f"Merge options: left_on={left_on}, "
148+
f"right_on={right_on}, "
149+
)
150+
if (
151+
not left_cols.join(common_cols, how="inner").is_unique
152+
or not right_cols.join(common_cols, how="inner").is_unique
153+
):
154+
raise ValueError(f"Data columns not unique: {repr(common_cols)}")
155+
156+
return common_cols, common_cols

bigframes/dataframe.py

Lines changed: 7 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -3653,92 +3653,18 @@ def merge(
36533653
sort: bool = False,
36543654
suffixes: tuple[str, str] = ("_x", "_y"),
36553655
) -> DataFrame:
3656-
if how == "cross":
3657-
if on is not None:
3658-
raise ValueError("'on' is not supported for cross join.")
3659-
result_block = self._block.merge(
3660-
right._block,
3661-
left_join_ids=[],
3662-
right_join_ids=[],
3663-
suffixes=suffixes,
3664-
how=how,
3665-
sort=True,
3666-
)
3667-
return DataFrame(result_block)
3668-
3669-
left_on, right_on = self._validate_left_right_on(
3670-
right, on, left_on=left_on, right_on=right_on
3671-
)
3672-
3673-
if utils.is_list_like(left_on):
3674-
left_on = list(left_on) # type: ignore
3675-
else:
3676-
left_on = [left_on]
3656+
from bigframes.core.reshape import merge
36773657

3678-
if utils.is_list_like(right_on):
3679-
right_on = list(right_on) # type: ignore
3680-
else:
3681-
right_on = [right_on]
3682-
3683-
left_join_ids = []
3684-
for label in left_on: # type: ignore
3685-
left_col_id = self._resolve_label_exact(label)
3686-
# 0 elements already throws an exception
3687-
if not left_col_id:
3688-
raise ValueError(f"No column {label} found in self.")
3689-
left_join_ids.append(left_col_id)
3690-
3691-
right_join_ids = []
3692-
for label in right_on: # type: ignore
3693-
right_col_id = right._resolve_label_exact(label)
3694-
if not right_col_id:
3695-
raise ValueError(f"No column {label} found in other.")
3696-
right_join_ids.append(right_col_id)
3697-
3698-
block = self._block.merge(
3699-
right._block,
3658+
return merge.merge(
3659+
self,
3660+
right,
37003661
how,
3701-
left_join_ids,
3702-
right_join_ids,
3662+
on,
3663+
left_on=left_on,
3664+
right_on=right_on,
37033665
sort=sort,
37043666
suffixes=suffixes,
37053667
)
3706-
return DataFrame(block)
3707-
3708-
def _validate_left_right_on(
3709-
self,
3710-
right: DataFrame,
3711-
on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
3712-
*,
3713-
left_on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
3714-
right_on: Union[blocks.Label, Sequence[blocks.Label], None] = None,
3715-
):
3716-
if on is not None:
3717-
if left_on is not None or right_on is not None:
3718-
raise ValueError(
3719-
"Can not pass both `on` and `left_on` + `right_on` params."
3720-
)
3721-
return on, on
3722-
3723-
if left_on is not None and right_on is not None:
3724-
return left_on, right_on
3725-
3726-
left_cols = self.columns
3727-
right_cols = right.columns
3728-
common_cols = left_cols.intersection(right_cols)
3729-
if len(common_cols) == 0:
3730-
raise ValueError(
3731-
"No common columns to perform merge on."
3732-
f"Merge options: left_on={left_on}, "
3733-
f"right_on={right_on}, "
3734-
)
3735-
if (
3736-
not left_cols.join(common_cols, how="inner").is_unique
3737-
or not right_cols.join(common_cols, how="inner").is_unique
3738-
):
3739-
raise ValueError(f"Data columns not unique: {repr(common_cols)}")
3740-
3741-
return common_cols, common_cols
37423668

37433669
def join(
37443670
self,

0 commit comments

Comments
 (0)