4040HAS_FAST = False
4141try :
4242 from fastavro import schemaless_reader , schemaless_writer
43+ from fastavro .schema import parse_schema
4344
4445 HAS_FAST = True
4546except ImportError :
@@ -79,7 +80,8 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
7980 def _get_encoder_func (self , writer_schema ):
8081 if HAS_FAST :
8182 schema = writer_schema .to_json ()
82- return lambda record , fp : schemaless_writer (fp , schema , record )
83+ parsed_schema = parse_schema (schema )
84+ return lambda record , fp : schemaless_writer (fp , parsed_schema , record )
8385 writer = avro .io .DatumWriter (writer_schema )
8486 return lambda record , fp : writer .write (record , avro .io .BinaryEncoder (fp ))
8587
@@ -169,9 +171,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
169171 if HAS_FAST :
170172 # try to use fast avro
171173 try :
172- writer_schema = writer_schema_obj .to_json ()
173- reader_schema = reader_schema_obj .to_json ()
174- schemaless_reader (payload , writer_schema )
174+ fast_avro_writer_schema = parse_schema ( writer_schema_obj .to_json () )
175+ fast_avro_reader_schema = parse_schema ( reader_schema_obj .to_json () )
176+ schemaless_reader (payload , fast_avro_writer_schema )
175177
176178 # If we reach this point, this means we have fastavro and it can
177179 # do this deserialization. Rewind since this method just determines
@@ -180,7 +182,7 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
180182 payload .seek (curr_pos )
181183
182184 self .id_to_decoder_func [schema_id ] = lambda p : schemaless_reader (
183- p , writer_schema , reader_schema )
185+ p , fast_avro_writer_schema , fast_avro_reader_schema )
184186 return self .id_to_decoder_func [schema_id ]
185187 except Exception :
186188 # Fast avro failed, fall thru to standard avro below.
0 commit comments