@@ -57,12 +57,13 @@ async def _resolve_named_schema(
5757 named_schemas = {}
5858 if schema .references is not None :
5959 for ref in schema .references :
60- referenced_schema = await schema_registry_client .get_version (ref .subject , ref .version , True )
61- ref_named_schemas = await _resolve_named_schema (referenced_schema .schema , schema_registry_client )
60+ # References in registered schemas are validated by server to be complete
61+ referenced_schema = await schema_registry_client .get_version (ref .subject , ref .version , True ) # type: ignore[arg-type]
62+ ref_named_schemas = await _resolve_named_schema (referenced_schema .schema , schema_registry_client ) # type: ignore[arg-type]
6263 parsed_schema = parse_schema_with_repo (
63- referenced_schema .schema .schema_str , named_schemas = ref_named_schemas )
64+ referenced_schema .schema .schema_str , named_schemas = ref_named_schemas ) # type: ignore[union-attr,arg-type]
6465 named_schemas .update (ref_named_schemas )
65- named_schemas [ref .name ] = parsed_schema
66+ named_schemas [ref .name ] = parsed_schema # type: ignore[index]
6667 return named_schemas
6768
6869
@@ -204,7 +205,7 @@ async def __init_impl(
204205 schema = None
205206
206207 self ._registry = schema_registry_client
207- self ._schema_id = None
208+ self ._schema_id : Optional [ SchemaId ] = None
208209 self ._rule_registry = rule_registry if rule_registry else RuleRegistry .get_global_instance ()
209210 self ._known_subjects : set [str ] = set ()
210211 self ._parsed_schemas = ParsedSchemaCache ()
@@ -219,26 +220,26 @@ async def __init_impl(
219220 if conf is not None :
220221 conf_copy .update (conf )
221222
222- self ._auto_register = conf_copy .pop ('auto.register.schemas' )
223+ self ._auto_register = cast ( bool , conf_copy .pop ('auto.register.schemas' ) )
223224 if not isinstance (self ._auto_register , bool ):
224225 raise ValueError ("auto.register.schemas must be a boolean value" )
225226
226- self ._normalize_schemas = conf_copy .pop ('normalize.schemas' )
227+ self ._normalize_schemas = cast ( bool , conf_copy .pop ('normalize.schemas' ) )
227228 if not isinstance (self ._normalize_schemas , bool ):
228229 raise ValueError ("normalize.schemas must be a boolean value" )
229230
230- self ._use_schema_id = conf_copy .pop ('use.schema.id' )
231+ self ._use_schema_id = cast ( Optional [ int ], conf_copy .pop ('use.schema.id' ) )
231232 if (self ._use_schema_id is not None and
232233 not isinstance (self ._use_schema_id , int )):
233234 raise ValueError ("use.schema.id must be an int value" )
234235
235- self ._use_latest_version = conf_copy .pop ('use.latest.version' )
236+ self ._use_latest_version = cast ( bool , conf_copy .pop ('use.latest.version' ) )
236237 if not isinstance (self ._use_latest_version , bool ):
237238 raise ValueError ("use.latest.version must be a boolean value" )
238239 if self ._use_latest_version and self ._auto_register :
239240 raise ValueError ("cannot enable both use.latest.version and auto.register.schemas" )
240241
241- self ._use_latest_with_metadata = conf_copy .pop ('use.latest.with.metadata' )
242+ self ._use_latest_with_metadata = cast ( Optional [ dict ], conf_copy .pop ('use.latest.with.metadata' ) )
242243 if (self ._use_latest_with_metadata is not None and
243244 not isinstance (self ._use_latest_with_metadata , dict )):
244245 raise ValueError ("use.latest.with.metadata must be a dict value" )
@@ -276,8 +277,11 @@ async def __init_impl(
276277 # i.e. {"type": "string"} has a name of string.
277278 # This function does not comply.
278279 # https://github.com/fastavro/fastavro/issues/415
279- schema_dict = json .loads (schema .schema_str )
280- schema_name = parsed_schema .get ("name" , schema_dict .get ("type" ))
280+ if schema .schema_str is not None :
281+ schema_dict = json .loads (schema .schema_str )
282+ schema_name = parsed_schema .get ("name" , schema_dict .get ("type" )) # type: ignore[union-attr]
283+ else :
284+ schema_name = None
281285 else :
282286 schema_name = None
283287 parsed_schema = None
@@ -292,7 +296,7 @@ async def __init_impl(
292296
293297 __init__ = __init_impl
294298
295- def __call__ (self , obj : object , ctx : Optional [SerializationContext ] = None ) -> Coroutine [Any , Any , Optional [bytes ]]:
299+ def __call__ (self , obj : object , ctx : Optional [SerializationContext ] = None ) -> Coroutine [Any , Any , Optional [bytes ]]: # type: ignore[override]
296300 return self .__serialize (obj , ctx )
297301
298302 async def __serialize (self , obj : object , ctx : Optional [SerializationContext ] = None ) -> Optional [bytes ]:
@@ -319,10 +323,10 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
319323 return None
320324
321325 subject = self ._subject_name_func (ctx , self ._schema_name )
322- latest_schema = await self ._get_reader_schema (subject )
326+ latest_schema = await self ._get_reader_schema (subject ) if subject else None # type: ignore[arg-type]
323327 if latest_schema is not None :
324328 self ._schema_id = SchemaId (AVRO_TYPE , latest_schema .schema_id , latest_schema .guid )
325- elif subject not in self ._known_subjects :
329+ elif subject is not None and subject not in self ._known_subjects :
326330 # Check to ensure this schema has been registered under subject_name.
327331 if self ._auto_register :
328332 # The schema name will always be the same. We can't however register
@@ -339,26 +343,26 @@ async def __serialize(self, obj: object, ctx: Optional[SerializationContext] = N
339343 self ._known_subjects .add (subject )
340344
341345 if self ._to_dict is not None :
342- value = self ._to_dict (obj , ctx )
346+ value = self ._to_dict (obj , ctx ) # type: ignore[arg-type]
343347 else :
344- value = obj
348+ value = obj # type: ignore[assignment]
345349
346350 if latest_schema is not None :
347- parsed_schema = await self ._get_parsed_schema (latest_schema .schema )
351+ parsed_schema = await self ._get_parsed_schema (latest_schema .schema ) # type: ignore[arg-type]
348352 def field_transformer (rule_ctx , field_transform , msg ): return ( # noqa: E731
349353 transform (rule_ctx , parsed_schema , msg , field_transform ))
350- value = self ._execute_rules (ctx , subject , RuleMode .WRITE , None ,
354+ value = self ._execute_rules (ctx , subject , RuleMode .WRITE , None , # type: ignore[arg-type]
351355 latest_schema .schema , value , get_inline_tags (parsed_schema ),
352356 field_transformer )
353357 else :
354- parsed_schema = self ._parsed_schema
358+ parsed_schema = self ._parsed_schema # type: ignore[assignment]
355359
356360 with _ContextStringIO () as fo :
357361 # write the record to the rest of the buffer
358362 schemaless_writer (fo , parsed_schema , value )
359363 buffer = fo .getvalue ()
360364
361- if latest_schema is not None :
365+ if latest_schema is not None and ctx is not None and subject is not None :
362366 buffer = self ._execute_rules_with_phase (
363367 ctx , subject , RulePhase .ENCODING , RuleMode .WRITE ,
364368 None , latest_schema .schema , buffer , None , None )
@@ -371,7 +375,11 @@ async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
371375 return parsed_schema
372376
373377 named_schemas = await _resolve_named_schema (schema , self ._registry )
378+ if schema .schema_str is None :
379+ raise ValueError ("Schema string cannot be None" )
374380 prepared_schema = _schema_loads (schema .schema_str )
381+ if prepared_schema .schema_str is None :
382+ raise ValueError ("Prepared schema string cannot be None" )
375383 parsed_schema = parse_schema_with_repo (
376384 prepared_schema .schema_str , named_schemas = named_schemas )
377385
@@ -483,11 +491,11 @@ async def __init_impl(
483491 if conf is not None :
484492 conf_copy .update (conf )
485493
486- self ._use_latest_version = conf_copy .pop ('use.latest.version' )
494+ self ._use_latest_version = cast ( bool , conf_copy .pop ('use.latest.version' ) )
487495 if not isinstance (self ._use_latest_version , bool ):
488496 raise ValueError ("use.latest.version must be a boolean value" )
489497
490- self ._use_latest_with_metadata = conf_copy .pop ('use.latest.with.metadata' )
498+ self ._use_latest_with_metadata = cast ( Optional [ dict ], conf_copy .pop ('use.latest.with.metadata' ) )
491499 if (self ._use_latest_with_metadata is not None and
492500 not isinstance (self ._use_latest_with_metadata , dict )):
493501 raise ValueError ("use.latest.with.metadata must be a dict value" )
@@ -511,9 +519,9 @@ async def __init_impl(
511519 .format (", " .join (conf_copy .keys ())))
512520
513521 if schema :
514- self ._reader_schema = await self ._get_parsed_schema (self ._schema )
522+ self ._reader_schema = await self ._get_parsed_schema (self ._schema ) # type: ignore[arg-type]
515523 else :
516- self ._reader_schema = None
524+ self ._reader_schema = None # type: ignore[assignment]
517525
518526 if from_dict is not None and not callable (from_dict ):
519527 raise ValueError ("from_dict must be callable with the signature "
@@ -571,23 +579,24 @@ async def __deserialize(
571579 payload = self ._schema_id_deserializer (data , ctx , schema_id )
572580
573581 writer_schema_raw = await self ._get_writer_schema (schema_id , subject )
574- writer_schema = await self ._get_parsed_schema (writer_schema_raw )
582+ writer_schema = await self ._get_parsed_schema (writer_schema_raw ) # type: ignore[arg-type]
575583
576584 if subject is None :
577- subject = self ._subject_name_func (ctx , writer_schema .get ("name" )) if ctx else None
585+ subject = self ._subject_name_func (ctx , writer_schema .get ("name" )) if ctx else None # type: ignore[union-attr]
578586 if subject is not None :
579587 latest_schema = await self ._get_reader_schema (subject )
580588
581- payload = self ._execute_rules_with_phase (
582- ctx , subject , RulePhase .ENCODING , RuleMode .READ ,
583- None , writer_schema_raw , payload , None , None )
589+ if ctx is not None and subject is not None :
590+ payload = self ._execute_rules_with_phase (
591+ ctx , subject , RulePhase .ENCODING , RuleMode .READ ,
592+ None , writer_schema_raw , payload , None , None )
584593 if isinstance (payload , bytes ):
585594 payload = io .BytesIO (payload )
586595
587- if latest_schema is not None :
588- migrations = await self ._get_migrations (subject , writer_schema_raw , latest_schema , None )
596+ if latest_schema is not None and subject is not None :
597+ migrations = await self ._get_migrations (subject , writer_schema_raw , latest_schema , None ) # type: ignore[arg-type]
589598 reader_schema_raw = latest_schema .schema
590- reader_schema = await self ._get_parsed_schema (latest_schema .schema )
599+ reader_schema = await self ._get_parsed_schema (latest_schema .schema ) # type: ignore[arg-type]
591600 elif self ._schema is not None :
592601 migrations = None
593602 reader_schema_raw = self ._schema
@@ -597,7 +606,7 @@ async def __deserialize(
597606 reader_schema_raw = writer_schema_raw
598607 reader_schema = writer_schema
599608
600- if migrations :
609+ if migrations and ctx is not None and subject is not None :
601610 obj_dict = schemaless_reader (payload ,
602611 writer_schema ,
603612 None ,
@@ -611,12 +620,13 @@ async def __deserialize(
611620
612621 def field_transformer (rule_ctx , field_transform , message ): return ( # noqa: E731
613622 transform (rule_ctx , reader_schema , message , field_transform ))
614- obj_dict = self ._execute_rules (ctx , subject , RuleMode .READ , None ,
615- reader_schema_raw , obj_dict , get_inline_tags (reader_schema ),
616- field_transformer )
623+ if ctx is not None and subject is not None :
624+ obj_dict = self ._execute_rules (ctx , subject , RuleMode .READ , None , # type: ignore[arg-type]
625+ reader_schema_raw , obj_dict , get_inline_tags (reader_schema ),
626+ field_transformer )
617627
618628 if self ._from_dict is not None :
619- return self ._from_dict (obj_dict , ctx )
629+ return self ._from_dict (obj_dict , ctx ) # type: ignore[arg-type]
620630
621631 return obj_dict
622632
@@ -626,7 +636,11 @@ async def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
626636 return parsed_schema
627637
628638 named_schemas = await _resolve_named_schema (schema , self ._registry )
639+ if schema .schema_str is None :
640+ raise ValueError ("Schema string cannot be None" )
629641 prepared_schema = _schema_loads (schema .schema_str )
642+ if prepared_schema .schema_str is None :
643+ raise ValueError ("Prepared schema string cannot be None" )
630644 parsed_schema = parse_schema_with_repo (
631645 prepared_schema .schema_str , named_schemas = named_schemas )
632646
0 commit comments