Skip to content

Commit 08b0572

Browse files
authored
Fix bug when execute DataframeTransform a dictionary of Pcoll (#35893)
* Log Warning if process function return None * Fix get function without inner * check the first def_line also * rename variable * add strip function * reformat function * refactor code * fix bug in get function body * retrigger test * retrigger test * fix: unexpected error when transform two pcoll * revert redundant * fix test * reformat file * simply change test case * change test case * change test case * retrigger test * update change.md * update change.md * update change.md * update change.md * update format of change.md * update format of change.md * update format of change.md * update format of change.md
1 parent 42aed71 commit 08b0572

File tree

3 files changed

+23
-1
lines changed

3 files changed

+23
-1
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@
8181
## Breaking Changes
8282

8383
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
84+
* (Python) Fixed transform naming conflict when executing DataTransform on a dictionary of PColls ([#30445](https://github.com/apache/beam/issues/30445)).
85+
This may break update compatibility if you don't provide a `--transform_name_mapping`.
8486

8587
## Deprecations
8688

sdks/python/apache_beam/dataframe/transforms.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def expand(self, input_pcolls):
108108
for tag in input_dict
109109
}
110110
input_frames: dict[Any, frame_base.DeferredFrame] = {
111-
k: convert.to_dataframe(pc, proxies[k])
111+
k: convert.to_dataframe(pc, proxies[k], str(k))
112112
for k, pc in input_dict.items()
113113
} # noqa: F821
114114

sdks/python/apache_beam/dataframe/transforms_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,26 @@ def check(actual):
317317
lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')
318318
assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')
319319

320+
def test_multiple_dataframes_transforms(self):
321+
expected_output = ["Bryan", "DKER2"]
322+
323+
def transform_func(a, b):
324+
b["name"] = "DKER2"
325+
return a, b
326+
327+
with beam.Pipeline() as p:
328+
pcol1 = p | "Create1" >> beam.Create([beam.Row(name="Bryan")])
329+
pcol2 = p | "Create2" >> beam.Create([beam.Row(name="common")])
330+
331+
result = ({
332+
"a": pcol1, "b": pcol2
333+
}
334+
|
335+
"TransformDF" >> transforms.DataframeTransform(transform_func)
336+
| "Flatten" >> beam.Flatten()
337+
| transforms.DataframeTransform(lambda df: df.name))
338+
assert_that(result, equal_to(expected_output))
339+
320340
def test_cat(self):
321341
# verify that cat works with a List[Series] since this is
322342
# missing from doctests

0 commit comments

Comments
 (0)