Skip to content

Commit 5b53cb4

Browse files
authored
Code examples for upcoming data contracts blog (#285)
* Example code for data-contracts blog. * Ignoring gradle wrapper stuff. * Ignoring gradle wrapper stuff. * update setup script. * Updates to README, adding console examples. * Image updates on documentation. * Shoutout Gilles. * Formatting of readme elements, like notes. * Disclaimer on setup script, how long it takes to execute.
1 parent ba92e31 commit 5b53cb4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1205
-0
lines changed

data-contracts/.gitignore

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
### Gradle template
2+
**/.gradle/
3+
**/gradle/
4+
**/.gradlew.bat
5+
**/.gradlew
6+
7+
**/build/
8+
9+
# Ignore Gradle GUI config
10+
gradle-app.setting
11+
12+
# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored)
13+
!**/gradle-wrapper.jar
14+
# Avoid ignore Gradle wrappper properties
15+
!**/gradle-wrapper.properties
16+
17+
# Cache of project
18+
.gradletasknamecache
19+
20+
# Eclipse Gradle plugin generated files
21+
# Eclipse Core
22+
.project
23+
# JDT-specific (Eclipse Java Development Tools)
24+
.classpath
25+
26+
.idea/
27+

data-contracts/README.md

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Managing Data Contracts in Confluent Cloud
2+
3+
Data contracts consist not only of the schemas to define the structure of events, but also rulesets allowing for more fine-grained validations,
4+
controls, and discovery. In this demo, we'll evolve a schema by adding migration rules.
5+
6+
I'd like to credit my colleague Gilles Philippart's work as the basis for the schemas and rules in this demo. My goal here is to add examples with commonly used
7+
developer tools and practices to his work in this area, which can be found [here in GitHub](https://github.com/gphilipp/migration-rules-demo).
8+
9+
## Running the Example
10+
11+
<img src="./images/overview.png" width="1000" height="600">
12+
13+
In the workflow above, we see these tools in action:
14+
* The [Confluent Terraform Provider](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs) is used to define Confluent Cloud assets (Kafka cluster(s), Data Governance, Kafka Topics, and Schema Configurations).
15+
* Using the newly created Schema Registry, data engineers and architects define the schema of the events that comprise the organization's canonical data model - i.e. entities, events, and commands that are shared across applications. - along with other parts of the data contract. This includes data quality rules, metadata, and migration rules. A gradle plugin is utilized to register the schemas and related elements of the data contract with the Schema Registry.
16+
* Applications which producer and/or consume these event types can download the schemas from the Schema Registry. In our example, this is a JVM application built using Gradle. A gradle plugin is used to download the schemas, after which another gradle plugin is used to generate Java classes from those schemas - thus providing the application with compile-time type safety.
17+
18+
### Prerequisites
19+
20+
Clone the `confluentinc/demo-scene` GitHub repository (if you haven't already) and navigate to the `demo-scene` directory:
21+
22+
```shell
23+
git clone [email protected]:confluentinc/demo-scene.git
24+
cd demo-scene
25+
```
26+
27+
Here are the tools needed to run this tutorial:
28+
* [Confluent Cloud](http://confluent.cloud)
29+
* [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html)
30+
* [Terraform](https://developer.hashicorp.com/terraform/install?product_intent=terraform)
31+
* [jq](https://jqlang.github.io/jq/)
32+
* [Gradle](https://gradle.org/install/) (version 8.5 or higher)
33+
* JDK 17
34+
* IDE of choice
35+
36+
> When installing and configuring the Confluent CLI, include the Confluent Cloud credentials as environment variables for future use. For instance with bash or zsh, include these export statements:
37+
>
38+
> ```shell
39+
> export CONFLUENT_CLOUD_API_KEY=<API KEY>
40+
> export CONFLUENT_CLOUD_API_SECRET<API SECRET>
41+
> ```
42+
>
43+
44+
Terraform can use the value of any environment variable whose name begins with `TF_VAR_` as the value of a terraform variable of the same name. For more on this functionality, see the [terraform documentation](https://developer.hashicorp.com/terraform/cli/config/environment-variables#tf_var_name).
45+
46+
Our example requires we set the value the `org_id` variable from Confluent Cloud. This denotes which organization will house the Confluent Cloud assets we are creating. So let's export the Confluent Cloud organization ID to a terraform environment variable.
47+
48+
This command may open a browser window asking you to authenticate to Confluent Cloud. Once that's complete, the result of
49+
`confluent organization list` is queried by `jq` to extract the `id` of the current organization to which you are authenticated:
50+
51+
```shell
52+
export TF_VAR_org_id=$(confluent organization list -o json | jq -c -r '.[] | select(.is_current)' | jq '.id')
53+
```
54+
55+
### Create Assets in Confluent Cloud
56+
57+
From the root of `data-contracts`, run the provided setup script - `setup.sh`. You may need to edit permissions to make this file executable.
58+
59+
Because this script is provisioning multiple assets in Confluent Cloud and downloading the appropriate gradle-wrapper JAR files for each module,
60+
it may take a few minutes to complete.
61+
62+
```shell
63+
chmod +x setup.sh
64+
./setup.sh
65+
Setup Confluent Cloud and all Gradle Builds for Demo
66+
-------------------------------------
67+
Initializing Confluent Cloud Environment...
68+
......
69+
......
70+
......
71+
......
72+
BUILD SUCCESSFUL in 2s
73+
6 actionable tasks: 6 executed
74+
Watched directory hierarchies: [/Users/sjacobs/code/confluentinc/demo-scene/data-contracts]
75+
Code Generation Complete
76+
-------------------------------------
77+
78+
Setup Complete!
79+
```
80+
81+
Let's have a look at what we've created in Confluent Cloud, we find a new Environment:
82+
83+
![CC Environment](./images/environment.png)
84+
85+
With a Kafka cluster:
86+
87+
![Kafka Cluster](./images/cluster.png)
88+
89+
And Data Contracts:
90+
91+
![Data Contracts](./images/schemas.png)
92+
93+
94+
Locally, we also create a `properties` file containing the parameters needed for our Kafka clients to connect to Confluent Cloud. For an example of this
95+
`properties` file, see [confluent.properties.orig](shared/src/main/resources/confluent.properties.orig).
96+
97+
> **NOTE**: The file-based approach we're using here is NOT recommended for a production-quality application. Perhaps a secrets manager implementation would be better suited - which the major cloud providers all offer, or perhaps a tool like Hashicorp Vault. Such a tool would also have client libraries in a Maven repository for the JVM applications to access the secrets.
98+
>
99+
100+
### Run the Examples
101+
102+
In `app-schema-v1`, the `ApplicationMain` object's `main` function starts a consumer in a new thread to subscribe to the `membership-avro` topic. It then begins
103+
producing randomly-generated events to `membership-avro` at a provided interval for a provided duration. By default, an event is produced every 1 second for 100 seconds. These events are created and consumed using version 1 of the membership schema. The console output of the consumer should look something like this:
104+
105+
```shell
106+
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - Received Membership d0e65c83-b1c5-451d-b08b-8d1ed6fca8d6, {"user_id": "d0e65c83-b1c5-451d-b08b-8d1ed6fca8d6", "start_date": "2023-01-14", "end_date": "2025-05-28"}
107+
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - Received Membership 940cf6fa-eb12-46af-87e8-5a9bc33df119, {"user_id": "940cf6fa-eb12-46af-87e8-5a9bc33df119", "start_date": "2023-05-23", "end_date": "2025-07-02"}
108+
```
109+
110+
The `app-schema-v2` module's `main` function starts a consumer subscribed to the `membership-avro` topic. But this time, events will be consumed using
111+
version 2 of the membership schema. Notice the `Map` of consumer overrides in the constructor of the `MembershipConsumer` in that module. As such, those
112+
same events which `app-schema-v1` produced using `major_version=1` of the schema are consumed using `major_version=2`:
113+
114+
```shell
115+
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - v2 - Received Membership b0e34c68-208c-4771-be19-79689fc9ad28, {"user_id": "b0e34c68-208c-4771-be19-79689fc9ad28", "validity_period": {"from": "2022-11-06", "to": "2025-09-03"}}
116+
[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - v2 - Received Membership 517f8c7e-4ae5-47ea-93a2-c1f00669d330, {"user_id": "517f8c7e-4ae5-47ea-93a2-c1f00669d330", "validity_period": {"from": "2023-01-29", "to": "2025-02-19"}}
117+
```
118+
119+
This illustrates how the use of migration rules allows producers and consumers to independently make any code and configuration changes needed to accommodate schema changes.
120+
121+
## Teardown
122+
123+
When you're done with the demo, issue this command from the `cc-terraform` directory to destroy the Confluent Cloud environment
124+
we created:
125+
126+
```shell
127+
terraform destroy -auto-approve
128+
```
129+
130+
Check the Confluent Cloud console to ensure this environment no longer exists.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/gradle/
2+
/.gradle/
3+
/gradlew
4+
/gradlew.bat
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import java.io.FileInputStream
2+
import java.util.Properties
3+
4+
buildscript {
5+
repositories {
6+
mavenCentral()
7+
maven("https://packages.confluent.io/maven/")
8+
maven("https://jitpack.io")
9+
gradlePluginPortal()
10+
}
11+
}
12+
13+
plugins {
14+
kotlin("jvm") version "2.0.21"
15+
id("com.google.protobuf") version "0.9.4"
16+
id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "2.1.0"
17+
id("com.bakdata.avro") version "1.2.1"
18+
}
19+
20+
group = "io.confluent.devrel"
21+
22+
repositories {
23+
mavenLocal()
24+
mavenCentral()
25+
maven("https://packages.confluent.io/maven/")
26+
maven("https://jitpack.io")
27+
}
28+
29+
sourceSets {
30+
main {
31+
kotlin.srcDirs("src/main/kotlin", "build/generated-main-avro-java")
32+
}
33+
}
34+
35+
dependencies {
36+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
37+
implementation("io.confluent.devrel:data-contracts-shared:+")
38+
39+
implementation("org.apache.kafka:kafka-clients:${project.property("kafkaVersion")}")
40+
implementation("io.confluent:kafka-avro-serializer:${project.property("confluentVersion")}")
41+
42+
implementation("io.github.serpro69:kotlin-faker:${project.property("fakerVersion")}")
43+
implementation("io.github.serpro69:kotlin-faker-books:${project.property("fakerVersion")}")
44+
implementation("io.github.serpro69:kotlin-faker-tech:${project.property("fakerVersion")}")
45+
46+
implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6")
47+
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1")
48+
}
49+
50+
kotlin {
51+
jvmToolchain(17)
52+
}
53+
54+
val schemaRegOutputDir = "${project.projectDir.absolutePath}/build/schema-registry-plugin"
55+
56+
tasks.downloadSchemasTask {
57+
doFirst {
58+
mkdir(schemaRegOutputDir)
59+
}
60+
}
61+
62+
schemaRegistry {
63+
val srProperties = Properties()
64+
// At the moment, this is a file with which we are LOCALLY aware.
65+
// In an ACTUAL CI/CD workflow, this would be externalized, perhaps provided from a base build image or other parameter.
66+
srProperties.load(FileInputStream(File("${project.projectDir.absolutePath}/../shared/src/main/resources/confluent.properties")))
67+
68+
url = srProperties.getProperty("schema.registry.url")
69+
70+
val srCredTokens = srProperties.get("basic.auth.user.info").toString().split(":")
71+
credentials {
72+
username = srCredTokens[0]
73+
password = srCredTokens[1]
74+
}
75+
outputDirectory = "${System.getProperty("user.home")}/tmp/schema-registry-plugin"
76+
pretty = true
77+
78+
download {
79+
// download the membership avro schema, version 1
80+
subject("membership-avro-value", "${projectDir}/src/main/avro", 1)
81+
}
82+
}
83+
84+
tasks.clean {
85+
doFirst {
86+
delete(fileTree("${projectDir}/src/main/avro/").include("**/*.avsc"))
87+
}
88+
}
89+
90+
tasks.register("generateCode") {
91+
group = "source generation"
92+
description = "wrapper task for all source generation"
93+
dependsOn("downloadSchemasTask", "generateAvroJava", "generateProto")
94+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
confluentVersion=7.8.0
2+
fakerVersion=2.0.0-rc.3
3+
grpcVersion=1.15.1
4+
kafkaVersion=3.8.0
5+
protobufVersion=3.6.1
6+
slf4jVersion=2.0.11
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rootProject.name="app-schema-v1"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.avsc
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.confluent.devrel.dc.v1
2+
3+
import io.confluent.devrel.Membership
4+
import io.confluent.devrel.dc.v1.kafka.MembershipConsumer
5+
import io.confluent.devrel.dc.v1.kafka.MembershipProducer
6+
import kotlinx.cli.ArgParser
7+
import kotlinx.cli.ArgType
8+
import kotlinx.cli.default
9+
import kotlinx.coroutines.coroutineScope
10+
import kotlinx.coroutines.delay
11+
import kotlinx.coroutines.launch
12+
import kotlinx.coroutines.runBlocking
13+
import kotlinx.datetime.Clock
14+
import java.time.LocalDate
15+
import java.util.*
16+
import kotlin.concurrent.thread
17+
import kotlin.random.Random
18+
import kotlin.time.DurationUnit
19+
import kotlin.time.toDuration
20+
21+
class ApplicationMain {
22+
23+
companion object {
24+
25+
26+
@JvmStatic
27+
fun main(args: Array<String>) {
28+
runBlocking {
29+
println("Starting application main...")
30+
println(args.joinToString(" "))
31+
val parser = ArgParser("schema-v1")
32+
33+
val interval by parser.option(ArgType.Int,
34+
shortName = "i", fullName = "interval",
35+
description = "message send interval, seconds")
36+
.default(1)
37+
val duration by parser.option(ArgType.Int,
38+
shortName = "d", fullName = "duration",
39+
description = "how long to run, seconds")
40+
.default(100)
41+
parser.parse(args)
42+
43+
val messageInterval = interval.toDuration(DurationUnit.SECONDS)
44+
val sendDuration = duration.toDuration(DurationUnit.SECONDS)
45+
46+
val producer = MembershipProducer()
47+
val consumer = MembershipConsumer()
48+
49+
thread {
50+
consumer.start(listOf("membership-avro"))
51+
}
52+
53+
coroutineScope {
54+
launch {
55+
val until = Clock.System.now().plus(sendDuration)
56+
while(Clock.System.now().compareTo(until) < 0) {
57+
val userId = UUID.randomUUID().toString()
58+
val membership = Membership.newBuilder()
59+
.setUserId(userId)
60+
.setStartDate(LocalDate.now().minusDays(Random.nextLong(100, 1000)))
61+
.setEndDate(LocalDate.now().plusWeeks(Random.nextLong(1, 52)))
62+
.build()
63+
producer.send("membership-avro", userId, membership)
64+
delay(messageInterval.inWholeSeconds)
65+
}
66+
}
67+
}
68+
producer.close()
69+
}
70+
}
71+
}
72+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.confluent.devrel.dc.v1.kafka
2+
3+
import io.confluent.devrel.Membership
4+
import io.confluent.devrel.datacontracts.shared.BaseConsumer
5+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
6+
import org.apache.kafka.clients.consumer.ConsumerConfig
7+
8+
class MembershipConsumer: BaseConsumer<String, Membership>(mapOf(
9+
ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v1",
10+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
11+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
12+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
13+
AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
14+
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
15+
AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
16+
)) {
17+
18+
override fun consumeRecord(
19+
key: String,
20+
value: Membership
21+
) {
22+
logger.info("Received Membership ${key}, ${value}")
23+
}
24+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.confluent.devrel.dc.v1.kafka
2+
3+
import io.confluent.devrel.Membership
4+
import io.confluent.devrel.datacontracts.shared.BaseProducer
5+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
6+
import org.apache.kafka.clients.producer.ProducerConfig
7+
8+
class MembershipProducer: BaseProducer<String, Membership>(mapOf(
9+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
10+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer",
11+
ProducerConfig.CLIENT_ID_CONFIG to "membership-producer-app-v1",
12+
AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true,
13+
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false,
14+
AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1"
15+
))

0 commit comments

Comments
 (0)