Skip to content

Commit ecbe1b6

Browse files
committed
add kafka parameter explicitly and uncover kafka test for a try
1 parent 6e4bad2 commit ecbe1b6

File tree

3 files changed

+30
-27
lines changed

3 files changed

+30
-27
lines changed

sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,33 +50,33 @@ pipelines:
5050
# Locally runs fine.
5151
# Kafka read pipeline
5252
# Need a separate read pipeline to make sure the write pipeline is flushed
53-
# - pipeline:
54-
# type: chain
55-
# transforms:
56-
# - type: ReadFromKafka
57-
# config:
58-
# format: "RAW"
59-
# topic: "silly_topic"
60-
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
61-
# consumer_config:
62-
# auto.offset.reset: "earliest"
63-
# group.id: "yaml-kafka-test-group"
64-
# max_read_time_seconds: 10 # will read forever if not set
65-
# - type: MapToFields
66-
# config:
67-
# language: python
68-
# fields:
69-
# value:
70-
# callable: |
71-
# # Kafka RAW format reads messages as bytes in the 'payload' field of a Row
72-
# lambda row: row.payload.decode('utf-8')
73-
# output_type: string
74-
# - type: AssertEqual
75-
# config:
76-
# elements:
77-
# - {value: "123"}
78-
# - {value: "456"}
79-
# - {value: "789"}
53+
- pipeline:
54+
type: chain
55+
transforms:
56+
- type: ReadFromKafka
57+
config:
58+
format: "RAW"
59+
topic: "silly_topic"
60+
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
61+
consumer_config:
62+
auto.offset.reset: "earliest"
63+
group.id: "yaml-kafka-test-group"
64+
max_read_time_seconds: 10 # will read forever if not set
65+
- type: MapToFields
66+
config:
67+
language: python
68+
fields:
69+
value:
70+
callable: |
71+
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
72+
lambda row: row.payload.decode('utf-8')
73+
output_type: string
74+
- type: AssertEqual
75+
config:
76+
elements:
77+
- {value: "123"}
78+
- {value: "456"}
79+
- {value: "789"}
8080

8181
options:
8282
streaming: true

sdks/python/apache_beam/yaml/standard_io.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
'error_handling': 'error_handling'
7575
'file_descriptor_path': 'file_descriptor_path'
7676
'message_name': 'message_name'
77+
'max_read_time_seconds': 'max_read_time_seconds'
7778
'WriteToKafka':
7879
'format': 'format'
7980
'topic': 'topic'

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,6 +1510,8 @@ def create_transform(
15101510
"""Creates a PTransform instance for the given transform type and arguments.
15111511
"""
15121512
mappings = self._mappings[typ]
1513+
# NOTE: If the `key` is not found in the mappings (e.g. standard_io.py), the
1514+
# `key` is passed down as is to the underlying transform.
15131515
remapped_args = {
15141516
mappings.get(key, key): value
15151517
for key, value in args.items()

0 commit comments

Comments
 (0)