This document demonstrates how to publish messages to a topic, using a JSON schema to validate before sending.
When a JSON schema is provided, kafka-java producer validates the value of the message against the schema and then
uses the byte array serializer org.apache.kafka.common.serialization.ByteArraySerializer to serialize the value of the
message. For the key, string serializer org.apache.kafka.common.serialization.StringSerializer.
In this example, we create a pipeline that reads from the
builtin generator source and write the messages to a
target topic numagen-json with JSON schema numagen-json-value registered for the value of the message.
Create a topic called numagen-json in your Kafka cluster with the following JSON schema numagen-json-value
registered in Confluent schema registry.
{
"$id": "test-id",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "schema for the topic numagen-json",
"properties": {
"Createdts": {
"format": "int64",
"type": "integer"
},
"Data": {
"additionalProperties": false,
"properties": {
"value": {
"format": "int64",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"Data",
"Createdts"
],
"title": "numagen-json",
"type": "object"
}Use the example ConfigMap to configure the Kafka sinker.
In the ConfigMap:
-
producer.propertiesholds the properties as well as schema registry properties to configure the producer. Ensure that the schema registry configurations are set because JSON schema is used to validate the data. -
user.configurationis the user configuration for the sink vertex.topicNameis the Kafka topic name to write data to.schemaTypeis set tojsonto indicate that JSON schema is used to validate the data before publishing.schemaSubjectis the subject name in the schema registry for the JSON schema.schemaVersionis the version of the schema in the schema registry.
Deploy the ConfigMap to the Kubernetes cluster.
Use the example pipeline to create the pipeline, using the ConfigMap created in the previous step. Please make sure that the args list under the sink vertex matches the file paths in the ConfigMap.
Wait for the pipeline to be up and running. You can observe the messages in the numagen-json topic. A sample message
{
"key": "a406ad8d-62b0-4a1d-bd1b-6792d656fbf0",
"value": {
"Data": {
"value": "1736448031039625125"
},
"Createdts": "1736448031039625125"
}
}Although we use Pipeline to demonstrate, it is highly recommended to use the MonoVertex to build your streaming data processing application on Numaflow. The way you specify the sink specification stays the same.
In the example, the producer.properties contains the credentials. Please
see credentials management to protect your credentials.
By default, the sink generates a UUID for each message key. To specify a custom key, include a datum key prefixed with KAFKA_KEY:. The remaining string after the prefix will be used as the Kafka message key. If no KAFKA_KEY: prefix is found, a UUID is generated automatically.