Skip to content

Commit ac2706a

Browse files
authored
Fix timestamp issue in a yaml pipeline that calls SQL transform (#35789)
1 parent 720f961 commit ac2706a

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,21 @@ def named_fields_to_schema(
151151
if isinstance(names_and_types, dict):
152152
names_and_types = names_and_types.items()
153153

154+
_, cached_schema = schema_registry.by_id.get(schema_id, (None, None))
155+
if cached_schema:
156+
type_by_name_from_schema = {
157+
field.name: field.type
158+
for field in cached_schema.fields
159+
}
160+
else:
161+
type_by_name_from_schema = {}
162+
154163
schema = schema_pb2.Schema(
155164
fields=[
156165
schema_pb2.Field(
157166
name=name,
158-
type=typing_to_runner_api(type),
167+
type=type_by_name_from_schema.get(
168+
name, typing_to_runner_api(type)),
159169
options=[
160170
option_to_runner_api(option_tuple)
161171
for option_tuple in field_options.get(name, [])

sdks/python/apache_beam/yaml/tests/sql.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,21 @@ pipelines:
7575
- {a: "x", s: "2"}
7676
- {a: "x", s: "3"}
7777
- {a: "y", s: "10"}
78+
79+
- pipeline:
80+
type: chain
81+
transforms:
82+
- type: Create
83+
name: CreateSampleData
84+
config:
85+
elements:
86+
- { id: 1, name: "John" }
87+
- { id: 2, name: "Jane" }
88+
- type: Sql
89+
name: sql
90+
config:
91+
query: >
92+
SELECT *, CURRENT_TIMESTAMP AS ingest_timestamp FROM PCOLLECTION
93+
- type: PyTransform
94+
config:
95+
constructor: apache_beam.transforms.util.LogElements

0 commit comments

Comments
 (0)