File tree Expand file tree Collapse file tree 2 files changed +29
-1
lines changed
Expand file tree Collapse file tree 2 files changed +29
-1
lines changed Original file line number Diff line number Diff line change @@ -151,11 +151,21 @@ def named_fields_to_schema(
151151 if isinstance (names_and_types , dict ):
152152 names_and_types = names_and_types .items ()
153153
154+ _ , cached_schema = schema_registry .by_id .get (schema_id , (None , None ))
155+ if cached_schema :
156+ type_by_name_from_schema = {
157+ field .name : field .type
158+ for field in cached_schema .fields
159+ }
160+ else :
161+ type_by_name_from_schema = {}
162+
154163 schema = schema_pb2 .Schema (
155164 fields = [
156165 schema_pb2 .Field (
157166 name = name ,
158- type = typing_to_runner_api (type ),
167+ type = type_by_name_from_schema .get (
168+ name , typing_to_runner_api (type )),
159169 options = [
160170 option_to_runner_api (option_tuple )
161171 for option_tuple in field_options .get (name , [])
Original file line number Diff line number Diff line change @@ -76,3 +76,21 @@ pipelines:
7676 - {a: "x", s: "2"}
7777 - {a: "x", s: "3"}
7878 - {a: "y", s: "10"}
79+
80+ - pipeline :
81+ type : chain
82+ transforms :
83+ - type : Create
84+ name : CreateSampleData
85+ config :
86+ elements :
87+ - { id: 1, name: "John" }
88+ - { id: 2, name: "Jane" }
89+ - type : Sql
90+ name : sql
91+ config :
92+ query : >
93+ SELECT *, CURRENT_TIMESTAMP AS ingest_timestamp FROM PCOLLECTION
94+ - type : PyTransform
95+ config :
96+ constructor : apache_beam.transforms.util.LogElements
You can’t perform that action at this time.
0 commit comments