@@ -279,7 +279,7 @@ public Serializer serializer(String topic, Target type) {
279279 @ Override
280280 public Deserializer deserializer (String topic , Target type ) {
281281 return (headers , data ) -> {
282- var schemaId = extractSchemaIdFromMsg (data );
282+ int schemaId = getSchemaIdFromMessageOrTopic (data , topic , type );
283283 SchemaType format = getMessageFormatBySchemaId (schemaId );
284284 MessageFormatter formatter = schemaRegistryFormatters .get (format );
285285 return new DeserializeResult (
@@ -293,22 +293,30 @@ public Deserializer deserializer(String topic, Target type) {
293293 };
294294 }
295295
296+ private int getSchemaIdFromMessageOrTopic (byte [] data , String topic , Target type ) {
297+ return extractSchemaIdFromMsg (data ).orElseGet (
298+ () -> {
299+ String subject = schemaSubject (topic , type );
300+ return getSchemaBySubject (subject )
301+ .map (SchemaMetadata ::getId )
302+ .orElseThrow (() -> new ValidationException (
303+ String .format ("No schema for subject '%s' found and no magic byte in avro data" , subject )));
304+ }
305+ );
306+ }
307+
296308 private SchemaType getMessageFormatBySchemaId (int schemaId ) {
297309 return getSchemaById (schemaId )
298310 .map (ParsedSchema ::schemaType )
299311 .flatMap (SchemaType ::fromString )
300312 .orElseThrow (() -> new ValidationException (String .format ("Schema for id '%d' not found " , schemaId )));
301313 }
302314
303- private int extractSchemaIdFromMsg (byte [] data ) {
315+ private Optional < Integer > extractSchemaIdFromMsg (byte [] data ) {
304316 ByteBuffer buffer = ByteBuffer .wrap (data );
305317 if (buffer .remaining () >= SR_PAYLOAD_PREFIX_LENGTH && buffer .get () == SR_PAYLOAD_MAGIC_BYTE ) {
306- return buffer .getInt ();
318+ return Optional . of ( buffer .getInt () );
307319 }
308- throw new ValidationException (
309- String .format (
310- "Data doesn't contain magic byte and schema id prefix, so it can't be deserialized with %s serde" ,
311- name ())
312- );
320+ return Optional .empty ();
313321 }
314322}
0 commit comments