Skip to content

Commit 996fed9

Browse files
committed
Another fix for jdbc timestamp logical type.
- Used schema registry directly if it exists. - Fixed a bug on schema registry. - Triggered post commit tests for jdbcio. - Fixed failed tests and trigger dataflow test on xlang. - Fix schema test by adding field description in named_fields_to_schema
1 parent 4d58e6d commit 996fed9

File tree

5 files changed

+14
-3
lines changed

5 files changed

+14
-3
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 1
3+
"modification": 2
44
}
55

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 11
3+
"modification": 12
44
}

sdks/python/apache_beam/typehints/row_type.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def __init__(
228228
schema_id=schema_id,
229229
schema_options=schema_options,
230230
field_options=field_options,
231+
field_descriptions=field_descriptions,
231232
**kwargs)
232233
user_type = named_tuple_from_schema(schema, **kwargs)
233234

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def generate_new_id(self):
4040
"schemas.")
4141

4242
def add(self, typing, schema):
43-
if not schema.id:
43+
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

4646
def get_typing_by_id(self, unique_id):

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,11 @@ def named_fields_to_schema(
142142
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
143143
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
144144
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY,
145+
field_descriptions: Optional[Dict[str, str]] = None,
145146
):
146147
schema_options = schema_options or []
147148
field_options = field_options or {}
149+
field_descriptions = field_descriptions or {}
148150

149151
if isinstance(names_and_types, dict):
150152
names_and_types = names_and_types.items()
@@ -158,6 +160,7 @@ def named_fields_to_schema(
158160
option_to_runner_api(option_tuple)
159161
for option_tuple in field_options.get(name, [])
160162
],
163+
description=field_descriptions.get(name, None)
161164
) for (name, type) in names_and_types
162165
],
163166
options=[
@@ -616,6 +619,13 @@ def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
616619
if isinstance(element_type, row_type.RowTypeConstraint):
617620
return named_fields_to_schema(element_type._fields)
618621
elif match_is_named_tuple(element_type):
622+
if hasattr(element_type, row_type._BEAM_SCHEMA_ID):
623+
# if the named tuple's schema is in registry, we just use it instead of
624+
# regenerating one.
625+
schema_id = getattr(element_type, row_type._BEAM_SCHEMA_ID)
626+
schema = SCHEMA_REGISTRY.get_schema_by_id(schema_id)
627+
if schema is not None:
628+
return schema
619629
return named_tuple_to_schema(element_type)
620630
else:
621631
raise TypeError(

0 commit comments

Comments
 (0)