Skip to content

Commit 4066bc3

Browse files
committed
Add documentation
1 parent bb8b8b6 commit 4066bc3

File tree

3 files changed

+165
-3
lines changed

3 files changed

+165
-3
lines changed

README.md

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,148 @@ This functionality is backed by the following libraries:
1212
- [fs2-kafka & fs2-kafka-vulcan](https://github.com/fd4s/fs2-kafka) which provides the serializers and deserializers interfaces that we implement along with the Schema Registry client that we enrich
1313
- [confluent-schema-registry](https://github.com/confluentinc/schema-registry) is used as a basis for implementation and small portions are used for JSON Schema validation
1414

15+
### Usage ###
16+
17+
1. Define your data-types
18+
```scala
19+
object Book {}
20+
final case class Book(
21+
name: String,
22+
isbn: Int
23+
)
24+
25+
object Person {}
26+
final case class PersonV1(
27+
name: String,
28+
age: Int,
29+
books: List[Book]
30+
)
31+
```
32+
33+
2. Derive JSON Schemas for your case classes and add extra JSON Schema information using `scala-jsonschema`
34+
```scala
35+
import json.schema.description
36+
import json.{Json, Schema}
37+
38+
object Book {
39+
implicit val bookJsonSchema: Schema[Book] = Json.schema[Book]
40+
}
41+
final case class Book(
42+
@description("name of the book") name: String,
43+
@description("international standard book number") isbn: Int
44+
)
45+
46+
object Person {
47+
implicit val personJsonSchema: Schema[Person] = Json.schema[Person]
48+
}
49+
final case class Person(
50+
@description("name of the person") name: String,
51+
@description("age of the person") age: Int,
52+
@description("A list of books that the person has read") books: List[Book]
53+
)
54+
```
55+
56+
3. Use `circe` to derive Encoders & Decoders (or Codecs) for your data-types:
57+
```scala
58+
import io.circe.generic.semiauto._
59+
import io.circe.Codec
60+
import json.schema.description
61+
import json.{Json, Schema}
62+
63+
object Book {
64+
implicit val bookJsonSchema: Schema[Book] = Json.schema[Book]
65+
implicit val bookCodec: Codec[Book] = deriveCodec[Book]
66+
}
67+
final case class Book(
68+
@description("name of the book") name: String,
69+
@description("international standard book number") isbn: Int
70+
)
71+
72+
object Person {
73+
implicit val personJsonSchema: Schema[Person] = Json.schema[Person]
74+
implicit val personCodec: Codec[Person] = deriveCodec[Person]
75+
}
76+
final case class Person(
77+
@description("name of the person") name: String,
78+
@description("age of the person") age: Int,
79+
@description("A list of books that the person has read") books: List[Book]
80+
)
81+
```
82+
83+
4. Instantiate and configure the Schema Registry
84+
```scala
85+
import cats.effect._
86+
import io.kaizensolutions.jsonschema._
87+
88+
def schemaRegistry[F[_]: Sync]: F[SchemaRegistryClient] =
89+
SchemaRegistryClientSettings("http://localhost:8081")
90+
.withJsonSchemaSupport
91+
.createSchemaRegistryClient
92+
```
93+
94+
5. Configure your FS2 Kafka Producers and Consumers to pull Serializers (or do this process manually)
95+
```scala
96+
import cats.effect._
97+
import fs2.Stream
98+
import fs2.kafka._
99+
100+
def kafkaProducer[F[_]: Async, K, V](implicit
101+
keySerializer: Serializer[F, K],
102+
valueSerializer: Serializer[F, V]
103+
): Stream[F, KafkaProducer[F, K, V]] = {
104+
val settings: ProducerSettings[F, K, V] =
105+
ProducerSettings[F, K, V].withBootstrapServers("localhost:9092")
106+
KafkaProducer.stream(settings)
107+
}
108+
109+
def kafkaConsumer[F[_]: Async, K, V](groupId: String)(implicit
110+
keyDeserializer: Deserializer[F, K],
111+
valueDeserializer: Deserializer[F, V]
112+
): Stream[F, KafkaConsumer[F, K, V]] = {
113+
val settings = ConsumerSettings[F, K, V]
114+
.withBootstrapServers("localhost:9092")
115+
.withGroupId(groupId)
116+
.withAutoOffsetReset(AutoOffsetReset.Earliest)
117+
KafkaConsumer.stream(settings)
118+
}
119+
```
120+
**Note:** In some cases you will need to adjust the Decoder to account for missing data
121+
122+
6. Produce data to Kafka with automatic Confluent Schema Registry support:
123+
```scala
124+
import cats.effect._
125+
import fs2._
126+
import fs2.kafka._
127+
import json._
128+
import io.circe._
129+
import io.kaizensolutions.jsonschema._
130+
import scala.reflect.ClassTag
131+
132+
def jsonSchemaProducer[F[_]: Async, A: Encoder: json.Schema: ClassTag](
133+
settings: JsonSchemaSerializerSettings
134+
): Stream[F, KafkaProducer[F, String, A]] =
135+
Stream
136+
.eval[F, SchemaRegistryClient](schemaRegistry[F])
137+
.evalMap(schemaRegistryClient => JsonSchemaSerializer[F, A](settings, schemaRegistryClient))
138+
.evalMap(_.forValue)
139+
.flatMap(implicit serializer => kafkaProducer[F, String, A])
140+
141+
142+
def jsonSchemaConsumer[F[_]: Async, A: Decoder: json.Schema: ClassTag](
143+
settings: JsonSchemaDeserializerSettings,
144+
groupId: String
145+
): Stream[F, KafkaConsumer[F, String, A]] =
146+
Stream
147+
.eval(schemaRegistry[F])
148+
.evalMap(client => JsonSchemaDeserializer[F, A](settings, client))
149+
.flatMap(implicit des => kafkaConsumer[F, String, A](groupId))
150+
```
151+
152+
### Settings ###
153+
There are a number of settings that control a number of behaviors when it comes to serialization and deserialization of data.
154+
Please check `JsonSchemaDeserializerSettings` and `JsonSchemaSerializerSettings` for more information. The `default` settings
155+
work great unless you need fine-grained control
156+
15157
### Notes ###
16-
- Please note that this is only an initial design to prove the functionality and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there
158+
- Please note that this is only an initial design to prove the functionality, and I'm very happy to integrate this back into FS2 Kafka (and other Kafka libraries) so please submit an issue and we can take it from there
17159
- This library provides additional validation checks for the Deserialization side on top of what Confluent provides in their Java JSON Schema Deserializer

src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaDeserializerSettings.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema
33
object JsonSchemaDeserializerSettings {
44
val default: JsonSchemaDeserializerSettings = JsonSchemaDeserializerSettings()
55
}
6+
7+
/**
8+
* Settings that describe how to interact with Confluent's Schema Registry when deserializing data
9+
*
10+
* @param validatePayloadAgainstServerSchema will validate the payload against the schema on the server
11+
* @param validatePayloadAgainstClientSchema will validate the payload against the schema derived from the datatype you specify
12+
* @param validateClientSchemaAgainstServer will validate the schema you specify against the server's schema
13+
* @param failOnUnknownKeys will specify failure when unknown JSON keys are encountered
14+
* @param jsonSchemaId is used to override the schema ID of the data that is being consumed
15+
*/
616
final case class JsonSchemaDeserializerSettings(
717
validatePayloadAgainstServerSchema: Boolean = false,
818
validatePayloadAgainstClientSchema: Boolean = false,
919
validateClientSchemaAgainstServer: Boolean = false,
1020
failOnUnknownKeys: Boolean = false,
1121
jsonSchemaId: Option[String] = None
12-
) { self =>
22+
) { self =>
1323
def withPayloadValidationAgainstServerSchema(b: Boolean): JsonSchemaDeserializerSettings =
1424
self.copy(validatePayloadAgainstServerSchema = b)
1525

src/main/scala/io/kaizensolutions/jsonschema/JsonSchemaSerializerSettings.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,23 @@ package io.kaizensolutions.jsonschema
33
object JsonSchemaSerializerSettings {
44
val default: JsonSchemaSerializerSettings = JsonSchemaSerializerSettings()
55
}
6+
7+
/**
8+
* Settings that describe how to interact with Confluent's Schema Registry when serializing data
9+
*
10+
* @param automaticRegistration dictates whether we try to automatically register the schema we have with the server
11+
* @param useLatestVersion dictates whether to use the latest schema on the server instead of registering a new one
12+
* @param validatePayload dictates whether to validate the JSON payload against the schema
13+
* @param latestCompatStrict dictates whether to use strict compatibility
14+
* @param jsonSchemaId is used to override the schema ID of the data that is being produced
15+
*/
616
final case class JsonSchemaSerializerSettings(
717
automaticRegistration: Boolean = true,
818
useLatestVersion: Boolean = false,
919
validatePayload: Boolean = false,
1020
latestCompatStrict: Boolean = true,
1121
jsonSchemaId: Option[String] = None
12-
) { self =>
22+
) { self =>
1323
def withSchemaId(id: String): JsonSchemaSerializerSettings =
1424
self.copy(jsonSchemaId = Some(id))
1525

0 commit comments

Comments
 (0)