Skip to content

Commit adc399a

Browse files
committed
Use schema registry directly.
1 parent 622e053 commit adc399a

File tree

2 files changed

+5
-9
lines changed

2 files changed

+5
-9
lines changed

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def generate_new_id(self):
4040
"schemas.")
4141

4242
def add(self, typing, schema):
43-
if not schema.id:
43+
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

4646
def get_typing_by_id(self, unique_id):

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@
135135
# Serialized schema_pb2.Schema w/o id to id.
136136
_SCHEMA_ID_CACHE = {}
137137

138-
# Schema id to Schema
139-
_SCHEMA_CACHE = {}
140138

141139
def named_fields_to_schema(
142140
names_and_types: Union[Dict[str, type], Sequence[Tuple[str, type]]],
@@ -171,7 +169,6 @@ def named_fields_to_schema(
171169
if key not in _SCHEMA_ID_CACHE:
172170
_SCHEMA_ID_CACHE[key] = schema_registry.generate_new_id()
173171
schema_id = _SCHEMA_ID_CACHE[key]
174-
_SCHEMA_CACHE[schema_id] = schema
175172

176173
schema.id = schema_id
177174
return schema
@@ -581,8 +578,6 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
581578
_named_tuple_reduce_method(schema.SerializeToString()))
582579
setattr(user_type, "_field_descriptions", descriptions)
583580
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
584-
if schema.id not in _SCHEMA_CACHE:
585-
_SCHEMA_CACHE[schema.id] = schema
586581

587582
self.schema_registry.add(user_type, schema)
588583
coders.registry.register_coder(user_type, coders.RowCoder)
@@ -622,11 +617,12 @@ def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
622617
return named_fields_to_schema(element_type._fields)
623618
elif match_is_named_tuple(element_type):
624619
if hasattr(element_type, row_type._BEAM_SCHEMA_ID):
625-
# if the named tuple's schema is in cache, we just use it instead of
620+
# if the named tuple's schema is in registry, we just use it instead of
626621
# regenerating one.
627622
schema_id = getattr(element_type, row_type._BEAM_SCHEMA_ID)
628-
if schema_id in _SCHEMA_CACHE:
629-
return _SCHEMA_CACHE[schema_id]
623+
schema = SCHEMA_REGISTRY.get_schema_by_id(schema_id)
624+
if schema is not None:
625+
return schema
630626
return named_tuple_to_schema(element_type)
631627
else:
632628
raise TypeError(

0 commit comments

Comments
 (0)