Skip to content

Commit c2c40ed

Browse files
committed
First draft of serializer and deserializer along with tests for the serializer
1 parent 58e51e2 commit c2c40ed

File tree

13 files changed

+695
-0
lines changed

13 files changed

+695
-0
lines changed

.github/workflows/scala.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
9+
jobs:
10+
build:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v2
14+
- name: Set up JDK 11
15+
uses: actions/setup-java@v2
16+
with:
17+
java-version: '11'
18+
distribution: 'adopt'
19+
- name: Run tests
20+
run: sbt test

.gitignore

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
### SBT ###
2+
dist/*
3+
target/
4+
lib_managed/
5+
src_managed/
6+
project/boot/
7+
project/plugins/project/
8+
.history
9+
.cache
10+
.lib/
11+
12+
### Scala ###
13+
*.class
14+
*.log
15+
16+
### IntelliJ ###
17+
.idea
18+
19+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
20+
hs_err_pid*

.scalafmt.conf

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
version = 2.7.5
2+
maxColumn = 120
3+
align = most
4+
continuationIndent.defnSite = 2
5+
assumeStandardLibraryStripMargin = true
6+
docstrings = JavaDoc
7+
lineEndings = preserve
8+
includeCurlyBraceInSelectChains = false
9+
danglingParentheses = true
10+
optIn.annotationNewlines = true
11+
rewrite.rules = [RedundantBraces]

build.sbt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
organization := "io.kaizensolutions"
2+
3+
name := "fs2-kafka-jsonschema-support"
4+
5+
version := "0.1"
6+
7+
scalaVersion := "2.13.6"
8+
9+
scalacOptions ++= Seq(
10+
"-deprecation",
11+
"-encoding",
12+
"UTF-8",
13+
"-feature",
14+
"-language:implicitConversions",
15+
"-unchecked",
16+
"-language:higherKinds",
17+
"-Xlint",
18+
"-Ywarn-dead-code",
19+
"-Ywarn-numeric-widen",
20+
"-Ywarn-value-discard",
21+
"-Ywarn-unused",
22+
"-Xfatal-warnings"
23+
)
24+
25+
resolvers ++= Seq(
26+
"confluent" at "https://packages.confluent.io/maven/",
27+
"jitpack" at "https://jitpack.io"
28+
)
29+
30+
libraryDependencies ++= {
31+
val circe = "io.circe"
32+
val fd4s = "com.github.fd4s"
33+
val fs2KafkaV = "2.1.0"
34+
Seq(
35+
fd4s %% "fs2-kafka" % fs2KafkaV,
36+
fd4s %% "fs2-kafka-vulcan" % fs2KafkaV,
37+
"com.github.andyglow" %% "scala-jsonschema" % "0.7.2",
38+
circe %% "circe-jackson212" % "0.14.0",
39+
circe %% "circe-generic" % "0.14.1",
40+
"org.typelevel" %% "munit-cats-effect-3" % "1.0.0" % Test,
41+
"com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5" % Test,
42+
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
43+
"io.confluent" % "kafka-json-schema-serializer" % "6.2.0"
44+
)
45+
}

docker-compose.yaml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
version: '2.1'
2+
3+
services:
4+
zoo1:
5+
image: zookeeper:3.4.9
6+
restart: unless-stopped
7+
hostname: zoo1
8+
ports:
9+
- "2181:2181"
10+
environment:
11+
ZOO_MY_ID: 1
12+
ZOO_PORT: 2181
13+
ZOO_SERVERS: server.1=zoo1:2888:3888
14+
15+
kafka1:
16+
image: confluentinc/cp-kafka:6.2.0
17+
hostname: kafka1
18+
ports:
19+
- "9092:9092"
20+
- "9999:9999"
21+
environment:
22+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
23+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
24+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
25+
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
26+
KAFKA_BROKER_ID: 1
27+
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
28+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
29+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
30+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
31+
KAFKA_JMX_PORT: 9999
32+
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
33+
depends_on:
34+
- zoo1
35+
36+
kafka-schema-registry:
37+
image: confluentinc/cp-schema-registry:6.2.0
38+
hostname: kafka-schema-registry
39+
ports:
40+
- "8081:8081"
41+
environment:
42+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
43+
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
44+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
45+
depends_on:
46+
- zoo1
47+
- kafka1

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 1.5.4
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.kaizensolutions.jsonschema
2+
3+
import cats.effect.{Ref, Sync}
4+
import cats.syntax.all._
5+
import com.github.andyglow.jsonschema._
6+
import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
7+
import fs2.kafka.Deserializer
8+
import io.circe.Decoder
9+
import io.circe.jackson.jacksonToCirce
10+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
11+
import io.confluent.kafka.schemaregistry.json.JsonSchema
12+
import io.confluent.kafka.schemaregistry.json.jackson.Jackson
13+
import org.apache.kafka.common.errors.SerializationException
14+
15+
import java.io.{ByteArrayInputStream, IOException}
16+
import java.nio.ByteBuffer
17+
import scala.reflect.ClassTag
18+
19+
// See AbstractKafkaJsonSchemaDeserializer
20+
object JsonSchemaDeserializer {
21+
def make[F[_]: Sync, A: Decoder](
22+
settings: JsonSchemaDeserializerSettings,
23+
client: SchemaRegistryClient
24+
)(implicit jsonSchema: json.Schema[A], tag: ClassTag[A]): F[Deserializer[F, A]] = {
25+
val fObjectMapper: F[ObjectMapper] =
26+
Sync[F].delay {
27+
val instance = Jackson.newObjectMapper()
28+
if (settings.failOnUnknownKeys)
29+
instance.configure(
30+
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
31+
settings.failOnUnknownKeys
32+
)
33+
instance
34+
}
35+
val fClientJsonSchema: F[JsonSchema] =
36+
Sync[F].delay {
37+
val schema =
38+
new JsonSchema(
39+
jsonSchema.draft07(
40+
settings.jsonSchemaId
41+
.getOrElse(tag.runtimeClass.getSimpleName.toLowerCase + "schema.json")
42+
)
43+
)
44+
schema.validate()
45+
schema
46+
}
47+
48+
(fObjectMapper, fClientJsonSchema, Ref.of[F, Set[Int]](Set.empty[Int])).mapN {
49+
case (objectMapper, clientSchema, cache) =>
50+
new JsonSchemaDeserializer[F, A](settings, clientSchema, objectMapper, cache, client).jsonSchemaDeserializer
51+
}
52+
}
53+
}
54+
private[jsonschema] class JsonSchemaDeserializer[F[_]: Sync, A] private (
55+
settings: JsonSchemaDeserializerSettings,
56+
clientSchema: JsonSchema,
57+
objectMapper: ObjectMapper,
58+
compatSubjectIdCache: Ref[F, Set[Int]],
59+
client: SchemaRegistryClient
60+
)(implicit decoder: Decoder[A]) {
61+
private val MagicByte: Byte = 0x0
62+
private val IdSize: Int = 4
63+
64+
def jsonSchemaDeserializer: Deserializer[F, A] =
65+
Deserializer.instance { (_, _, bytes) =>
66+
Sync[F].delay {
67+
val buffer = getByteBuffer(bytes)
68+
val id = buffer.getInt()
69+
val serverSchema = client.getSchemaById(id).asInstanceOf[JsonSchema]
70+
val bufferLength = buffer.limit() - 1 - IdSize
71+
val start = buffer.position() + buffer.arrayOffset()
72+
val jsonNode: JsonNode =
73+
objectMapper.readTree(new ByteArrayInputStream(buffer.array, start, bufferLength))
74+
75+
if (settings.validatePayloadAgainstServerSchema) {
76+
serverSchema.validate(jsonNode)
77+
}
78+
79+
if (settings.validatePayloadAgainstClientSchema) {
80+
clientSchema.validate(jsonNode)
81+
}
82+
83+
(id, serverSchema, jsonNode)
84+
}.flatMap { case (serverId, serverSchema, jsonNode) =>
85+
val check =
86+
if (settings.validateClientSchemaAgainstServer)
87+
checkSchemaCompatibility(serverId, serverSchema)
88+
else Sync[F].unit
89+
90+
check.as(jacksonToCirce(jsonNode))
91+
}
92+
.map(decoder.decodeJson)
93+
.rethrow
94+
}
95+
96+
private def getByteBuffer(payload: Array[Byte]): ByteBuffer = {
97+
val buffer = ByteBuffer.wrap(payload)
98+
if (buffer.get() != MagicByte)
99+
throw new SerializationException("Unknown magic byte when deserializing from Kafka")
100+
buffer
101+
}
102+
103+
private def checkSchemaCompatibility(serverSubjectId: Int, serverSchema: JsonSchema): F[Unit] = {
104+
val checkSchemaUpdateCache =
105+
Sync[F].delay {
106+
val incompatibilities = clientSchema.isBackwardCompatible(serverSchema)
107+
if (!incompatibilities.isEmpty)
108+
throw new IOException(
109+
s"Incompatible consumer schema with server schema: ${incompatibilities.toArray.mkString(", ")}"
110+
)
111+
else ()
112+
} *> compatSubjectIdCache.update(_ + serverSubjectId)
113+
114+
for {
115+
existing <- compatSubjectIdCache.get
116+
_ <- if (existing.contains(serverSubjectId)) Sync[F].unit else checkSchemaUpdateCache
117+
} yield ()
118+
}
119+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.kaizensolutions.jsonschema
2+
3+
object JsonSchemaDeserializerSettings {
4+
val default: JsonSchemaDeserializerSettings = JsonSchemaDeserializerSettings()
5+
}
6+
final case class JsonSchemaDeserializerSettings(
7+
validatePayloadAgainstServerSchema: Boolean = false,
8+
validatePayloadAgainstClientSchema: Boolean = false,
9+
validateClientSchemaAgainstServer: Boolean = false,
10+
failOnUnknownKeys: Boolean = false,
11+
jsonSchemaId: Option[String] = None
12+
) { self =>
13+
def withPayloadValidationAgainstServerSchema(b: Boolean): JsonSchemaDeserializerSettings =
14+
self.copy(validatePayloadAgainstServerSchema = b)
15+
16+
def withPayloadValidationAgainstClientSchema(b: Boolean): JsonSchemaDeserializerSettings =
17+
self.copy(validatePayloadAgainstClientSchema = b)
18+
19+
def withAggressiveSchemaValidation(b: Boolean): JsonSchemaDeserializerSettings =
20+
self.copy(validateClientSchemaAgainstServer = b)
21+
22+
def withFailOnUnknownKeys(b: Boolean): JsonSchemaDeserializerSettings =
23+
self.copy(failOnUnknownKeys = b)
24+
25+
def withAggressiveValidation(b: Boolean): JsonSchemaDeserializerSettings =
26+
self.copy(
27+
validatePayloadAgainstServerSchema = b,
28+
validatePayloadAgainstClientSchema = b,
29+
validateClientSchemaAgainstServer = b,
30+
failOnUnknownKeys = b
31+
)
32+
33+
def withJsonSchemaId(id: String): JsonSchemaDeserializerSettings =
34+
self.copy(jsonSchemaId = Some(id))
35+
}

0 commit comments

Comments
 (0)