Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f1d9e93
Log Warning if process function return None
Sep 24, 2024
3cdb629
Fix get function without inner
Sep 25, 2024
9170808
check the first def_line also
Sep 25, 2024
9370ed2
rename variable
Sep 25, 2024
6219664
add strip function
Sep 25, 2024
5a3c1a6
reformat function
Sep 25, 2024
3bfc43d
refactor code
Sep 25, 2024
278d186
fix bug in get function body
Sep 29, 2024
f07d2cd
Merge branch 'master' into master
DKER2 Sep 29, 2024
e819366
retrigger test
Sep 29, 2024
25d5431
retrigger test
Sep 29, 2024
52f59ff
fix: unexpected error when transform two pcoll
DKER2 Aug 17, 2025
69ed085
revert redundant
DKER2 Aug 17, 2025
e6b636c
Merge branch 'master' into fx-30445
DKER2 Aug 17, 2025
d194838
fix test
DKER2 Aug 20, 2025
4acae69
reformat file
DKER2 Aug 20, 2025
12c4973
simply change test case
DKER2 Aug 20, 2025
d5f052c
change test case
DKER2 Aug 20, 2025
9193339
change test case
DKER2 Aug 20, 2025
f1e5fd7
retrigger test
DKER2 Aug 23, 2025
faa4fd0
Merge branch 'master' into fx-30445
DKER2 Aug 23, 2025
f04d6c7
Merge branch 'master' into fx-30445
DKER2 Sep 4, 2025
d3ceae7
update change.md
DKER2 Sep 4, 2025
a966505
update change.md
DKER2 Sep 4, 2025
4338599
Merge branch 'master' into fx-30445
DKER2 Sep 21, 2025
06d0bda
update change.md
DKER2 Sep 21, 2025
5364c52
update change.md
DKER2 Sep 21, 2025
4539d04
update format of change.md
DKER2 Sep 21, 2025
422b3f7
update format of change.md
DKER2 Sep 21, 2025
e18b6be
update format of change.md
DKER2 Sep 21, 2025
c36572e
update format of change.md
DKER2 Sep 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ def expand(self, input_pcolls):
for tag in input_dict
}
input_frames = {
k: convert.to_dataframe(pc, proxies[k])
for k, pc in input_dict.items()
k: convert.to_dataframe(pc, proxies[k], str(k))
for k,
pc in input_dict.items()
} # type: Dict[Any, DeferredFrame] # noqa: F821

# Apply the function.
Expand Down
39 changes: 39 additions & 0 deletions sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,45 @@ def check(actual):
lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')
assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')

def test_multiple_dataframe_transforms(self):
# Define test data
data1 = [
beam.Row(id=1, name="abc"),
beam.Row(id=2, name="def"),
beam.Row(id=3, name="ghi")
]
data2 = [
beam.Row(addr="addr1"), beam.Row(addr="addr2"), beam.Row(addr="addr3")
]

# Create a TestPipeline
with beam.Pipeline() as p:
# Create PCollections for testing
pcol1 = p | "Create1" >> beam.Create(data1)
pcol2 = p | "Create2" >> beam.Create(data2)

# Apply the DataframeTransform to the PCollections
pcol = ({
"a": pcol1, "b": pcol2
}
| "TransformedDF" >> transforms.DataframeTransform(
lambda a, b: a.assign(addr="addr-common")))

# Assert the expected output
expected_output = [
{
"id": 1, "name": "abc", "addr": "addr-common"
},
{
"id": 2, "name": "def", "addr": "addr-common"
},
{
"id": 3, "name": "ghi", "addr": "addr-common"
},
]
assert_that(pcol | "Map" >> beam.Map(lambda row: row.asdict())) \
.equal_to(expected_output)

def test_cat(self):
# verify that cat works with a List[Series] since this is
# missing from doctests
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,8 +1504,8 @@ def _check_fn_use_yield_and_return(fn):
has_yield = False
has_return = False
return_none_warning = (
"No iterator is returned by the process method in %s.",
fn.__self__.__class__)
f"No iterator is returned by the "
f"process method in {fn.__self__.__class__}.")
for line in source_code.split("\n"):
lstripped_line = line.lstrip()
if lstripped_line.startswith("yield ") or lstripped_line.startswith(
Expand Down
Loading