Skip to content

Commit bb8b8b6

Browse files
committed
- Improve error reporting when you fail to add JSON support
- Add tests - Add more compiler options for implicit resolution
1 parent 3068eaa commit bb8b8b6

File tree

6 files changed

+361
-173
lines changed

6 files changed

+361
-173
lines changed

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ scalacOptions ++= Seq(
1919
"-Ywarn-numeric-widen",
2020
"-Ywarn-value-discard",
2121
"-Ywarn-unused",
22-
"-Xfatal-warnings"
22+
"-Xfatal-warnings",
23+
"-Vimplicits",
24+
"-Vtype-diffs",
25+
"-Xsource:3"
2326
)
2427

2528
resolvers ++= Seq(

src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import scala.reflect.ClassTag
1818

1919
// See AbstractKafkaJsonSchemaDeserializer
2020
object JsonSchemaDeserializer {
21-
def make[F[_]: Sync, A: Decoder](
21+
def apply[F[_]: Sync, A: Decoder](
2222
settings: JsonSchemaDeserializerSettings,
2323
client: SchemaRegistryClient
2424
)(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[Deserializer[F, A]] = {

src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,14 @@ private[jsonschema] final case class JsonSchemaSerializer[F[_]: Sync, A: Encoder
145145
)
146146
}.flatMap { optSchema =>
147147
if (optSchema.isPresent) Sync[F].pure(optSchema.get)
148-
else Sync[F].raiseError(new IOException(s"Invalid schema ${metadata.getSchema}"))
148+
else
149+
Sync[F].delay(new JsonSchema(metadata.getSchema).validate()) >>
150+
// successfully parsed the schema locally means that the client was not properly configured
151+
Sync[F].raiseError[ParsedSchema](
152+
new RuntimeException(
153+
"Please enable JSON support in SchemaRegistryClientSettings by using withJsonSchemaSupport"
154+
)
155+
)
149156
}
150157
}
151158

src/test/resources/logback-test.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
</encoder>
66
</appender>
77

8+
<logger name="org.apache.kafka" level="error" additivity="false">
9+
<appender-ref ref="STDOUT" />
10+
</logger>
11+
812
<root level="INFO">
913
<appender-ref ref="STDOUT" />
1014
</root>
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
package io.kaizensolutions.jsonschema
2+
3+
import cats.effect._
4+
import cats.syntax.all._
5+
import com.dimafeng.testcontainers.DockerComposeContainer.ComposeFile
6+
import com.dimafeng.testcontainers.munit.TestContainersForAll
7+
import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
8+
import fs2.Stream
9+
import fs2.kafka._
10+
import fs2.kafka.vulcan.SchemaRegistryClientSettings
11+
import io.circe.generic.semiauto._
12+
import io.circe.{Codec, Decoder, Encoder}
13+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
14+
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
15+
import json.schema.description
16+
import json.{Json, Schema}
17+
import munit.CatsEffectSuite
18+
19+
import java.io.{File, IOException}
20+
import scala.annotation.nowarn
21+
import scala.concurrent.duration.DurationInt
22+
import scala.reflect.ClassTag
23+
24+
class JsonSchemaSerDesSpec extends CatsEffectSuite with TestContainersForAll {
25+
test(
26+
"JsonSchemaSerialization will automatically register the JSON Schema and allow you to send JSON data to Kafka"
27+
) {
28+
val examplePersons = List.fill(100)(PersonV1("Bob", 40, List(Book("Bob the builder", 1337))))
29+
val serSettings = JsonSchemaSerializerSettings.default.withAutomaticRegistration(true)
30+
producerTest[IO, PersonV1](
31+
schemaRegistry[IO],
32+
serSettings,
33+
"example-topic-persons",
34+
examplePersons,
35+
result => assertIO(result, examplePersons)
36+
)
37+
}
38+
39+
test("Enabling use latest (and disabling auto-registration) without configuring the client will fail") {
40+
val examplePersons = List.fill(100)(PersonV1("Bob", 40, List(Book("Bob the builder", 1337))))
41+
val serSettings = JsonSchemaSerializerSettings.default.withAutomaticRegistration(false).withUseLatestVersion(true)
42+
producerTest[IO, PersonV1](
43+
noJsonSupportSchemaRegistry[IO],
44+
serSettings,
45+
"example-topic-persons",
46+
examplePersons,
47+
result =>
48+
interceptMessageIO[RuntimeException](
49+
"Please enable JSON support in SchemaRegistryClientSettings by using withJsonSchemaSupport"
50+
)(result)
51+
)
52+
}
53+
54+
test("Attempting to publish an incompatible change with auto-registration will fail") {
55+
val settings =
56+
JsonSchemaSerializerSettings.default
57+
.withAutomaticRegistration(true)
58+
.withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
59+
60+
val topic = "example-topic-persons"
61+
62+
val examplePersons =
63+
List.fill(100)(PersonV2Bad("Bob", 40, List(Book("Bob the builder - incompatible rename edition", 1337))))
64+
65+
producerTest[IO, PersonV2Bad](
66+
schemaRegistry[IO],
67+
settings,
68+
topic,
69+
examplePersons,
70+
result =>
71+
interceptMessageIO[RestClientException](
72+
s"""Schema being registered is incompatible with an earlier schema for subject "$topic-value"; error code: 409"""
73+
)(result)
74+
)
75+
}
76+
77+
test(
78+
"Attempting to publish an incompatible change without auto-registration (using latest server schema) will fail"
79+
) {
80+
val settings =
81+
JsonSchemaSerializerSettings.default
82+
.withAutomaticRegistration(false)
83+
.withUseLatestVersion(true)
84+
.withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
85+
86+
val topic = "example-topic-persons"
87+
88+
val examplePersons =
89+
List.fill(100)(PersonV2Bad("Bob", 40, List(Book("Bob the builder - incompatible rename edition", 1337))))
90+
91+
producerTest[IO, PersonV2Bad](
92+
schemaRegistry[IO],
93+
settings,
94+
topic,
95+
examplePersons,
96+
result =>
97+
interceptMessageIO[IOException](
98+
"""Incompatible schema: Found incompatible change: Difference{jsonPath='#/properties/books', type=REQUIRED_PROPERTY_ADDED_TO_UNOPEN_CONTENT_MODEL}, Found incompatible change: Difference{jsonPath='#/properties/booksRead', type=PROPERTY_REMOVED_FROM_CLOSED_CONTENT_MODEL}"""
99+
)(result)
100+
)
101+
}
102+
103+
test(
104+
"Attempting to publish an incompatible change without auto-registration and not using the latest schema will fail"
105+
) {
106+
val settings =
107+
JsonSchemaSerializerSettings.default
108+
.withAutomaticRegistration(false)
109+
.withUseLatestVersion(false)
110+
.withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
111+
112+
val topic = "example-topic-persons"
113+
114+
val examplePersons =
115+
List.fill(100)(PersonV2Bad("Bob", 40, List(Book("Bob the builder - incompatible rename edition", 1337))))
116+
117+
producerTest[IO, PersonV2Bad](
118+
schemaRegistry[IO],
119+
settings,
120+
topic,
121+
examplePersons,
122+
result =>
123+
interceptMessageIO[RestClientException](
124+
"""Schema not found; error code: 40403"""
125+
)(result)
126+
)
127+
}
128+
129+
test("Publishing a compatible change with auto-registration is allowed") {
130+
val settings =
131+
JsonSchemaSerializerSettings.default
132+
.withAutomaticRegistration(true)
133+
.withSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
134+
135+
val topic = "example-topic-persons"
136+
137+
val examplePersons =
138+
List.fill(100)(
139+
PersonV2Good(
140+
"Bob",
141+
40,
142+
List(Book("Bob the builder - incompatible rename edition", 1337)),
143+
List("coding"),
144+
Some("more information")
145+
)
146+
)
147+
148+
producerTest[IO, PersonV2Good](
149+
schemaRegistry[IO],
150+
settings,
151+
topic,
152+
examplePersons,
153+
result => assertIO(result, examplePersons)
154+
)
155+
}
156+
157+
test(
158+
"Reading data back from the topic with the latest schema is allowed provided you compensate for missing fields in your Decoder"
159+
) {
160+
val settings = JsonSchemaDeserializerSettings.default
161+
.withJsonSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
162+
.withAggressiveValidation(true)
163+
164+
val result: IO[(Boolean, Boolean)] =
165+
consumeFromKafka[IO, PersonV2Good](
166+
schemaRegistry[IO],
167+
settings,
168+
"example-consumer",
169+
"example-topic-persons",
170+
200
171+
).compile.toList
172+
.map(list =>
173+
(
174+
list.take(100).forall(each => each.hobbies.isEmpty && each.optionalField.isEmpty),
175+
list.drop(100).forall(each => each.hobbies.nonEmpty && each.optionalField.nonEmpty)
176+
)
177+
)
178+
179+
assertIO(result, (true, true))
180+
}
181+
182+
test("Reading data back from the topic with an older schema is allowed") {
183+
val settings = JsonSchemaDeserializerSettings.default
184+
.withJsonSchemaId(PersonV1.getClass.getSimpleName.toLowerCase + ".schema.json")
185+
.withPayloadValidationAgainstServerSchema(true)
186+
187+
val result: IO[Long] =
188+
consumeFromKafka[IO, PersonV1](
189+
schemaRegistry[IO],
190+
settings,
191+
"example-consumer-older",
192+
"example-topic-persons",
193+
200
194+
).compile.foldChunks(0L)((acc, next) => acc + next.size)
195+
196+
assertIO(result, 200L)
197+
}
198+
199+
def producerTest[F[_]: Async, A: Encoder: json.Schema: ClassTag](
200+
fClient: F[SchemaRegistryClient],
201+
settings: JsonSchemaSerializerSettings,
202+
topic: String,
203+
input: List[A],
204+
assertion: F[List[A]] => F[Any]
205+
): F[Any] = {
206+
val produceElements: F[List[A]] =
207+
Stream
208+
.eval[F, SchemaRegistryClient](fClient)
209+
.evalMap(JsonSchemaSerializer[F, A](settings, _))
210+
.evalMap(_.forValue)
211+
.flatMap(implicit serializer => kafkaProducer[F, Option[String], A])
212+
.flatMap { kafkaProducer =>
213+
Stream
214+
.emits[F, A](input)
215+
.chunks
216+
.evalMap { chunkA =>
217+
kafkaProducer.produce(
218+
ProducerRecords(
219+
chunkA.map(ProducerRecord[Option[String], A](topic, None, _)),
220+
chunkA
221+
)
222+
)
223+
}
224+
.groupWithin(1000, 1.second)
225+
.evalMap(_.sequence)
226+
.map(_.flatMap(_.passthrough))
227+
.flatMap(Stream.chunk)
228+
}
229+
.compile
230+
.toList
231+
232+
assertion(produceElements)
233+
}
234+
235+
def consumeFromKafka[F[_]: Async, A: Decoder: json.Schema: ClassTag](
236+
fClient: F[SchemaRegistryClient],
237+
settings: JsonSchemaDeserializerSettings,
238+
groupId: String,
239+
topic: String,
240+
numberOfElements: Long
241+
): Stream[F, A] =
242+
Stream
243+
.eval(fClient)
244+
.evalMap(client => JsonSchemaDeserializer[F, A](settings, client))
245+
.flatMap(implicit des => kafkaConsumer[F, Option[String], A](groupId))
246+
.evalTap(_.subscribeTo(topic))
247+
.flatMap(_.stream)
248+
.map(_.record.value)
249+
.take(numberOfElements)
250+
251+
override type Containers = DockerComposeContainer
252+
253+
override def startContainers(): Containers =
254+
DockerComposeContainer
255+
.Def(
256+
composeFiles = ComposeFile(Left(new File("./docker-compose.yaml"))),
257+
exposedServices = List(
258+
ExposedService(name = "kafka-schema-registry", 8081),
259+
ExposedService(name = "kafka1", 9092)
260+
)
261+
)
262+
.start()
263+
264+
def kafkaProducer[F[_]: Async, K, V](implicit
265+
keySerializer: Serializer[F, K],
266+
valueSerializer: Serializer[F, V]
267+
): Stream[F, KafkaProducer[F, K, V]] = {
268+
val settings: ProducerSettings[F, K, V] =
269+
ProducerSettings[F, K, V].withBootstrapServers("localhost:9092")
270+
KafkaProducer.stream(settings)
271+
}
272+
273+
def kafkaConsumer[F[_]: Async, K, V](groupId: String)(implicit
274+
keyDeserializer: Deserializer[F, K],
275+
valueDeserializer: Deserializer[F, V]
276+
): Stream[F, KafkaConsumer[F, K, V]] = {
277+
val settings = ConsumerSettings[F, K, V]
278+
.withBootstrapServers("localhost:9092")
279+
.withGroupId(groupId)
280+
.withAutoOffsetReset(AutoOffsetReset.Earliest)
281+
KafkaConsumer.stream(settings)
282+
}
283+
284+
def schemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] =
285+
SchemaRegistryClientSettings("http://localhost:8081").withJsonSchemaSupport.createSchemaRegistryClient
286+
287+
def noJsonSupportSchemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] =
288+
SchemaRegistryClientSettings("http://localhost:8081").createSchemaRegistryClient
289+
}
290+
291+
object Book {
292+
@nowarn implicit val bookJsonSchema: Schema[Book] = Json.schema[Book]
293+
@nowarn implicit val bookCodec: Codec[Book] = deriveCodec[Book]
294+
}
295+
final case class Book(
296+
@description("name of the book") name: String,
297+
@description("international standard book number") isbn: Int
298+
)
299+
300+
object PersonV1 {
301+
@nowarn implicit val personJsonSchema: Schema[PersonV1] = Json.schema[PersonV1]
302+
@nowarn implicit val personCodec: Codec[PersonV1] = deriveCodec[PersonV1]
303+
}
304+
final case class PersonV1(
305+
@description("name of the person") name: String,
306+
@description("age of the person") age: Int,
307+
@description("A list of books that the person has read") books: List[Book]
308+
)
309+
310+
// V2 is backwards incompatible with V1 because the key has changed
311+
object PersonV2Bad {
312+
@nowarn implicit val personV2BadJsonSchema: Schema[PersonV2Bad] = Json.schema[PersonV2Bad]
313+
@nowarn implicit val personV2BadCodec: Codec[PersonV2Bad] = deriveCodec[PersonV2Bad]
314+
}
315+
final case class PersonV2Bad(
316+
@description("name of the person") name: String,
317+
@description("age of the person") age: Int,
318+
@description("A list of books that the person has read") booksRead: List[Book]
319+
)
320+
321+
object PersonV2Good {
322+
@nowarn implicit val personV2GoodJsonSchema: Schema[PersonV2Good] = Json.schema[PersonV2Good]
323+
@nowarn implicit val personV2GoodCodec: Codec[PersonV2Good] = {
324+
val encoder: Encoder[PersonV2Good] = deriveEncoder[PersonV2Good]
325+
326+
val decoder: Decoder[PersonV2Good] = cursor =>
327+
for {
328+
name <- cursor.downField("name").as[String]
329+
age <- cursor.downField("age").as[Int]
330+
books <- cursor.downField("books").as[List[Book]]
331+
hobbies <- cursor.downField("hobbies").as[Option[List[String]]] // account for missing hobbies
332+
optField <- cursor.downField("optionalField").as[Option[String]]
333+
} yield PersonV2Good(name, age, books, hobbies.getOrElse(Nil), optField)
334+
335+
Codec.from(decoder, encoder)
336+
}
337+
}
338+
final case class PersonV2Good(
339+
@description("name of the person") name: String,
340+
@description("age of the person") age: Int,
341+
@description("A list of books that the person has read") books: List[Book],
342+
@description("A list of hobbies") hobbies: List[String] = Nil,
343+
@description("An optional field to add extra information") optionalField: Option[String]
344+
)

0 commit comments

Comments
 (0)