1010import com .provectus .kafka .ui .model .MessageSchemaDTO ;
1111import com .provectus .kafka .ui .model .TopicMessageSchemaDTO ;
1212import com .provectus .kafka .ui .serde .RecordSerDe ;
13- import com .provectus .kafka .ui .util .ConsumerRecordUtil ;
1413import com .provectus .kafka .ui .util .jsonschema .AvroJsonSchemaConverter ;
1514import com .provectus .kafka .ui .util .jsonschema .JsonSchema ;
1615import com .provectus .kafka .ui .util .jsonschema .ProtobufSchemaConverter ;
2726import io .confluent .kafka .schemaregistry .protobuf .ProtobufSchemaProvider ;
2827import java .net .URI ;
2928import java .nio .ByteBuffer ;
30- import java .util .Collections ;
3129import java .util .HashMap ;
3230import java .util .List ;
3331import java .util .Map ;
@@ -47,31 +45,32 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
4745
4846 private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100 ;
4947
48+ private static final StringMessageFormatter stringFormatter = new StringMessageFormatter ();
49+ private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter ();
50+ private static final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter ();
51+
5052 private final KafkaCluster cluster ;
5153 private final Map <String , MessageFormatter > valueFormatMap = new ConcurrentHashMap <>();
5254 private final Map <String , MessageFormatter > keyFormatMap = new ConcurrentHashMap <>();
5355
5456 @ Nullable
5557 private final SchemaRegistryClient schemaRegistryClient ;
56-
5758 @ Nullable
5859 private final AvroMessageFormatter avroFormatter ;
59-
6060 @ Nullable
6161 private final ProtobufMessageFormatter protobufFormatter ;
62-
6362 @ Nullable
6463 private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter ;
6564
66- private final StringMessageFormatter stringFormatter = new StringMessageFormatter ();
67- private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter ();
68- private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter ();
69- private final ObjectMapper objectMapper = new ObjectMapper ();
65+ private ObjectMapper objectMapper ;
7066
71- private static SchemaRegistryClient createSchemaRegistryClient (KafkaCluster cluster ) {
67+ private SchemaRegistryClient createSchemaRegistryClient (KafkaCluster cluster ,
68+ ObjectMapper objectMapper ) {
7269 if (cluster .getSchemaRegistry () == null ) {
7370 throw new ValidationException ("schemaRegistry is not specified" );
7471 }
72+ this .objectMapper = objectMapper ;
73+
7574 List <SchemaProvider > schemaProviders =
7675 List .of (new AvroSchemaProvider (), new ProtobufSchemaProvider (), new JsonSchemaProvider ());
7776
@@ -97,10 +96,10 @@ private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster clus
9796 );
9897 }
9998
100- public SchemaRegistryAwareRecordSerDe (KafkaCluster cluster ) {
99+ public SchemaRegistryAwareRecordSerDe (KafkaCluster cluster , ObjectMapper objectMapper ) {
101100 this .cluster = cluster ;
102101 this .schemaRegistryClient = cluster .getSchemaRegistry () != null
103- ? createSchemaRegistryClient (cluster )
102+ ? createSchemaRegistryClient (cluster , objectMapper )
104103 : null ;
105104 if (schemaRegistryClient != null ) {
106105 this .avroFormatter = new AvroMessageFormatter (schemaRegistryClient );
@@ -147,41 +146,45 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
147146 @ Nullable String key ,
148147 @ Nullable String data ,
149148 @ Nullable Integer partition ) {
150- final Optional <SchemaMetadata > maybeValueSchema = getSchemaBySubject (topic , false );
151149 final Optional <SchemaMetadata > maybeKeySchema = getSchemaBySubject (topic , true );
150+ final Optional <SchemaMetadata > maybeValueSchema = getSchemaBySubject (topic , false );
152151
153- final byte [] serializedValue = data != null
154- ? serialize (maybeValueSchema , topic , data , false )
155- : null ;
156- final byte [] serializedKey = key != null
157- ? serialize (maybeKeySchema , topic , key , true )
158- : null ;
152+ final byte [] serializedKey = maybeKeySchema .isPresent ()
153+ ? serialize (maybeKeySchema .get (), topic , key , true )
154+ : serialize (key );
155+
156+ final byte [] serializedValue = maybeValueSchema .isPresent ()
157+ ? serialize (maybeValueSchema .get (), topic , data , false )
158+ : serialize (data );
159159
160160 return new ProducerRecord <>(topic , partition , serializedKey , serializedValue );
161161 }
162162
163163 @ SneakyThrows
164- private byte [] serialize (
165- Optional <SchemaMetadata > maybeSchema , String topic , String value , boolean isKey ) {
166- if (maybeSchema .isPresent ()) {
167- final SchemaMetadata schema = maybeSchema .get ();
168-
169- MessageReader <?> reader ;
170- if (schema .getSchemaType ().equals (MessageFormat .PROTOBUF .name ())) {
171- reader = new ProtobufMessageReader (topic , isKey , schemaRegistryClient , schema );
172- } else if (schema .getSchemaType ().equals (MessageFormat .AVRO .name ())) {
173- reader = new AvroMessageReader (topic , isKey , schemaRegistryClient , schema );
174- } else if (schema .getSchemaType ().equals (MessageFormat .JSON .name ())) {
175- reader = new JsonSchemaMessageReader (topic , isKey , schemaRegistryClient , schema );
176- } else {
177- throw new IllegalStateException ("Unsupported schema type: " + schema .getSchemaType ());
178- }
179-
180- return reader .read (value );
164+ private byte [] serialize (SchemaMetadata schema , String topic , String value , boolean isKey ) {
165+ if (value == null ) {
166+ return null ;
167+ }
168+ MessageReader <?> reader ;
169+ if (schema .getSchemaType ().equals (MessageFormat .PROTOBUF .name ())) {
170+ reader = new ProtobufMessageReader (topic , isKey , schemaRegistryClient , schema );
171+ } else if (schema .getSchemaType ().equals (MessageFormat .AVRO .name ())) {
172+ reader = new AvroMessageReader (topic , isKey , schemaRegistryClient , schema );
173+ } else if (schema .getSchemaType ().equals (MessageFormat .JSON .name ())) {
174+ reader = new JsonSchemaMessageReader (topic , isKey , schemaRegistryClient , schema );
181175 } else {
182- // if no schema provided serialize input as raw string
183- return value .getBytes ();
176+ throw new IllegalStateException ("Unsupported schema type: " + schema .getSchemaType ());
177+ }
178+
179+ return reader .read (value );
180+ }
181+
182+ private byte [] serialize (String value ) {
183+ if (value == null ) {
184+ return null ;
184185 }
186+ // if no schema provided serialize input as raw string
187+ return value .getBytes ();
185188 }
186189
187190 @ Override
0 commit comments