Skip to content

Commit 24df28a

Browse files
committed
Fix debezium port type, fix gemini post processor fn
1 parent a097680 commit 24df28a

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

sdks/python/apache_beam/examples/inference/gemini_text_classification.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,18 @@ def parse_known_args(argv):
6767

6868
class PostProcessor(beam.DoFn):
6969
def process(self, element: PredictionResult) -> Iterable[str]:
70-
yield "Input: " + str(element.example) + " Output: " + str(
71-
element.inference[1][0].content.parts[0].text)
70+
71+
inference = getattr(element, "inference", None)
72+
73+
if hasattr(inference[1], "content"):
74+
yield inference[1].content.parts[0].text
75+
return
76+
77+
if isinstance(inference[1], (tuple, list)) and len(inference) > 1:
78+
yield "Input: " + str(element.example) + " Output: " + str(
79+
inference[1][0].content.parts[0].text)
80+
else:
81+
yield "Can't decode inference for element: " + str(element.example)
7282

7383

7484
def run(

sdks/python/apache_beam/io/debezium.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def __init__(
155155
username=username,
156156
password=password,
157157
host=host,
158-
port=port,
158+
port=str(port),
159159
max_number_of_records=max_number_of_records,
160160
connection_properties=connection_properties)
161161
self.expansion_service = expansion_service or default_io_expansion_service()

0 commit comments

Comments
 (0)