This document demonstrates how to publish messages to a topic that has an Avro schema registered. When a topic has an
Avro schema, kafka-java producer serializes the value of the message using the schema. For the key, string serializer
org.apache.kafka.common.serialization.StringSerializer is used. For the value, confluent Avro serializer
io.confluent.kafka.serializers.KafkaAvroSerializer.
Current Limitations:
- The Avro sink assumes the input payload is in json format, it uses the
org.apache.avro.io.JsonDecoderto decode the payload to Avro GenericRecord before sending to the Kafka topic. Please consider contributing if you want other formats to be supported. We have an open issue to track the feature.
In this example, we create a pipeline that reads
from the builtin generator source and writes the messages
to a target topic numagen-avro with Avro schema numagen-avro-value registered for the value of the message.
Create a topic called numagen-avro in your Kafka cluster with the following Avro schema numagen-avro-value
registered.
{
"fields": [
{
"name": "Data",
"type": {
"fields": [
{
"name": "value",
"type": "long"
}
],
"name": "Data",
"type": "record"
}
},
{
"name": "Createdts",
"type": "long"
}
],
"name": "numagen-avro",
"type": "record"
}
Use the example ConfigMap to configure the Kafka sinker.
In the ConfigMap:
-
producer.propertiesholds the producer properties as well as schema registry properties to configure the producer. Ensure that the schema registry configurations are set because Avro schema is used to serialize the data. -
user.configurationis the user configuration for the sink vertex.topicNameis the Kafka topic name to write data to.schemaTypeis set toavroto indicate that Avro schema is used to validate and serialize the data.schemaSubjectis the subject name in the schema registry for the Avro schema.schemaVersionis the version of the schema in the schema registry.
Deploy the ConfigMap to the Kubernetes cluster.
Use the example pipeline to create the pipeline, using the ConfigMap created in the previous step. Please make sure that the args list under the sink vertex matches the file paths in the ConfigMap.
Wait for the pipeline to be up and running. You can observe the messages in the numagen-avro topic. A sample message
{
"key": "a406ad8d-62b0-4a1d-bd1b-6792d656fbf0",
"value": {
"Data": {
"value": 1736439076729944818
},
"Createdts": 1736439076729944818
}
}Although we use Pipeline to demonstrate, it is highly recommended to use the MonoVertex to build your streaming data processing application on Numaflow. The way you specify the sink specification stays the same.
In the example, the producer.properties contains the credentials. Please
see credentials management to protect your credentials.
By default, the sink generates a UUID for each message key. To specify a custom key, include a datum key prefixed with KAFKA_KEY:. The remaining string after the prefix will be used as the Kafka message key. If no KAFKA_KEY: prefix is found, a UUID is generated automatically.