Skip to content
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 2
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 11
"modification": 12
}
1 change: 1 addition & 0 deletions sdks/python/apache_beam/typehints/row_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def __init__(
schema_id=schema_id,
schema_options=schema_options,
field_options=field_options,
field_descriptions=field_descriptions,
**kwargs)
user_type = named_tuple_from_schema(schema, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/typehints/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def generate_new_id(self):
"schemas.")

def add(self, typing, schema):
if not schema.id:
if schema.id:
Copy link
Copy Markdown
Collaborator Author

@shunping shunping Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why we change this line in #28169 to disable the look up of schema registry.

Let's see if this breaks anythong.

Copy link
Copy Markdown
Collaborator Author

@shunping shunping Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn, do you know which test workflow covers this test?

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local tests passed with ./gradlew --info sdk:java:io:google-cloud-platform:test --tests 'org.apache.beam.sdk.io.gcp.bigquery.providers.*'.

self.by_id[schema.id] = (typing, schema)

def get_typing_by_id(self, unique_id):
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ def named_fields_to_schema(
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY,
field_descriptions: Optional[Dict[str, str]] = None,
):
schema_options = schema_options or []
field_options = field_options or {}
field_descriptions = field_descriptions or {}

if isinstance(names_and_types, dict):
names_and_types = names_and_types.items()
Expand All @@ -158,6 +160,7 @@ def named_fields_to_schema(
option_to_runner_api(option_tuple)
for option_tuple in field_options.get(name, [])
],
description=field_descriptions.get(name, None)
) for (name, type) in names_and_types
],
options=[
Expand Down Expand Up @@ -616,6 +619,13 @@ def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
if isinstance(element_type, row_type.RowTypeConstraint):
return named_fields_to_schema(element_type._fields)
elif match_is_named_tuple(element_type):
if hasattr(element_type, row_type._BEAM_SCHEMA_ID):
# if the named tuple's schema is in registry, we just use it instead of
# regenerating one.
schema_id = getattr(element_type, row_type._BEAM_SCHEMA_ID)
schema = SCHEMA_REGISTRY.get_schema_by_id(schema_id)
if schema is not None:
return schema
return named_tuple_to_schema(element_type)
else:
raise TypeError(
Expand Down
Loading