@@ -2,14 +2,16 @@ package io.kaizensolutions.jsonschema
22
33import cats .effect .{Ref , Sync }
44import cats .syntax .all ._
5+ import com .fasterxml .jackson .databind .JsonNode
56import com .github .andyglow .jsonschema ._
67import fs2 .kafka .{RecordSerializer , Serializer }
8+ import io .circe .Encoder
79import io .circe .jackson .circeToJackson
810import io .circe .syntax ._
9- import io .circe .{Encoder , Json }
1011import io .confluent .kafka .schemaregistry .ParsedSchema
1112import io .confluent .kafka .schemaregistry .client .SchemaRegistryClient
1213import io .confluent .kafka .schemaregistry .json .JsonSchema
14+ import io .confluent .kafka .schemaregistry .json .jackson .Jackson
1315import io .kaizensolutions .jsonschema .JsonSchemaSerializer .SubjectSchema
1416
1517import java .io .{ByteArrayOutputStream , IOException }
@@ -40,73 +42,85 @@ object JsonSchemaSerializer {
4042 schema : ParsedSchema
4143 )
4244
43- def apply [F [_]: Sync , A : Encoder : json. Schema : ClassTag ](
45+ def apply [F [_]: Sync , A : Encoder ](
4446 settings : JsonSchemaSerializerSettings ,
4547 client : SchemaRegistryClient
46- ): F [RecordSerializer [F , A ]] =
47- Ref .of[F , Map [SubjectSchema , ParsedSchema ]](Map .empty).map { cache =>
48- val serializer = JsonSchemaSerializer [F , A ](client, settings, cache)
48+ )(implicit jsonSchema : json.Schema [A ], tag : ClassTag [A ]): F [RecordSerializer [F , A ]] = {
49+ val fSchema =
50+ Sync [F ].delay {
51+ val instance = new JsonSchema (
52+ jsonSchema.draft07(
53+ settings.jsonSchemaId.getOrElse(tag.runtimeClass.getSimpleName.toLowerCase + " schema.json" )
54+ )
55+ )
56+ instance.validate()
57+ instance
58+ }
59+
60+ val fCache = Ref .of[F , Map [SubjectSchema , ParsedSchema ]](Map .empty)
61+
62+ (fSchema, fCache).mapN { case (schema, cache) =>
63+ val serializer = JsonSchemaSerializer [F , A ](client, settings, cache, schema)
4964 RecordSerializer .instance(
5065 forKey = Sync [F ].pure(serializer.jsonSchemaSerializer(true )),
5166 forValue = Sync [F ].pure(serializer.jsonSchemaSerializer(false ))
5267 )
5368 }
69+ }
5470}
5571
5672private [jsonschema] final case class JsonSchemaSerializer [F [_]: Sync , A : Encoder ] private (
5773 client : SchemaRegistryClient ,
5874 settings : JsonSchemaSerializerSettings ,
59- cache : Ref [F , Map [SubjectSchema , ParsedSchema ]]
60- )(implicit jsonSchema : json.Schema [A ], tag : ClassTag [A ]) {
75+ cache : Ref [F , Map [SubjectSchema , ParsedSchema ]],
76+ clientSchema : JsonSchema
77+ ) {
6178 private val MagicByte : Byte = 0x0
6279 private val IdSize : Int = 4
6380
64- private val schema = new JsonSchema (
65- jsonSchema
66- .draft07(
67- settings.jsonSchemaId
68- .getOrElse(tag.runtimeClass.getSimpleName.toLowerCase + " .schema.json" )
69- )
70- )
81+ private val objectMapper = Jackson .newObjectMapper()
82+ private val objectWriter = objectMapper.writer()
7183
7284 def jsonSchemaSerializer (isKey : Boolean ): Serializer [F , A ] = {
7385 val mkSubject = subjectName(isKey) _
7486
7587 Serializer .instance[F , A ] { (topic, _, data) =>
76- val jsonPayload = data.asJson
77- val subject = mkSubject(topic)
88+ val jsonPayload : JsonNode = circeToJackson(data.asJson)
89+ val subject = mkSubject(topic)
90+
7891 val fSchema : F [JsonSchema ] =
7992 if (! settings.automaticRegistration && settings.useLatestVersion)
80- lookupLatestVersion(subject, schema , cache, settings.latestCompatStrict)
93+ lookupLatestVersion(subject, clientSchema , cache, settings.latestCompatStrict)
8194 .map(_.asInstanceOf [JsonSchema ])
82- else Sync [F ].pure(schema )
95+ else Sync [F ].pure(clientSchema )
8396
8497 val fId : F [Int ] =
85- if (settings.automaticRegistration) registerSchema(subject, schema )
98+ if (settings.automaticRegistration) registerSchema(subject, clientSchema )
8699 else if (settings.useLatestVersion)
87- lookupLatestVersion(subject, schema , cache, settings.latestCompatStrict)
100+ lookupLatestVersion(subject, clientSchema , cache, settings.latestCompatStrict)
88101 .flatMap(s => getId(subject, s.asInstanceOf [JsonSchema ]))
89- else getId(subject, schema )
102+ else getId(subject, clientSchema )
90103
91104 for {
92105 schema <- fSchema
93106 _ <- validatePayload(schema, jsonPayload)
94107 id <- fId
95108 bytes <- Sync [F ].delay {
96- val baos = new ByteArrayOutputStream ()
109+ val payloadBytes = objectWriter.writeValueAsBytes(jsonPayload)
110+ val baos = new ByteArrayOutputStream ()
97111 baos.write(MagicByte .toInt)
98112 baos.write(ByteBuffer .allocate(IdSize ).putInt(id).array())
99- baos.write(jsonPayload.noSpaces.getBytes( " UTF-8 " ) )
100- val bytes = baos.toByteArray
113+ baos.write(payloadBytes )
114+ val bytes = baos.toByteArray
101115 baos.close()
102116 bytes
103117 }
104118 } yield bytes
105119 }
106120 }
107121
108- private def validatePayload (schema : JsonSchema , jsonPayload : Json ): F [Unit ] =
109- if (settings.validatePayload) Sync [F ].delay(schema.validate(circeToJackson( jsonPayload) ))
122+ private def validatePayload (schema : JsonSchema , jsonPayload : JsonNode ): F [Unit ] =
123+ if (settings.validatePayload) Sync [F ].delay(schema.validate(jsonPayload))
110124 else Sync [F ].unit
111125
112126 private def subjectName (isKey : Boolean )(topic : String ): String =
0 commit comments