- Demonstrate various ways, with and without Kafka Connect, to get data into Kafka topics and then loaded for use by the Kafka Streams API
KStream - Show some basic usage of the stream processing API
- Whitepaper "Kafka Serialization and Deserialization (SerDes) Examples"
- Blogpost "Building a Real-Time Streaming ETL Pipeline in 20 Minutes"
- Common demo prerequisites
- Confluent Platform 5.3
- Maven command
mvnto compile Java code - By default the
timeoutcommand is available on most Linux distributions but not Mac OS. Thistimeoutcommand is used by the bash scripts to terminate consumer processes after a period of time. To install it on a Mac:
# Install coreutils
brew install coreutils
# Add a "gnubin" directory to your PATH
PATH="/usr/local/opt/coreutils/libexec/gnubin:$PATH"Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples
- Command line
confluent local produceproducesStringkeys andStringvalues to a Kafka topic. - Client application reads from the Kafka topic using
Serdes.String()for both key and value.
KAFKA-2526: one cannot use the --key-serializer argument in the confluent local produce to serialize the key as a Long. As a result, in this example the key is serialized as a String. As a workaround, you could write your own kafka.common.MessageReader (e.g. check out the default implementation of LineMessageReader) and then you can specify --line-reader argument in the confluent local produce.
Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples
- Kafka Connect JDBC source connector produces JSON values, and inserts the key using single message transformations, also known as
SMTs. This is helpful because by default JDBC source connector does not insert a key. - Client application reads from the Kafka topic using
Serdes.String()for key and a custom JSON Serde for the value.
This example uses a few SMTs including one to cast the key to an int64. The key uses the org.apache.kafka.connect.converters.LongConverter provided by KAFKA-6913.
Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples
- Kafka Connect JDBC source connector produces Avro values, and null
Stringkeys, to a Kafka topic. - Client application reads from the Kafka topic using
SpecificAvroSerdefor the value and then themapfunction to convert the stream of messages to haveLongkeys and custom class values.
This example uses a simple message transformation SetSchemaMetadata with code that has a fix for KAFKA-5164, allowing the connector to set the namespace in the schema. If you do not have the fix for KAFKA-5164, see Example 4 that uses GenericAvro instead of SpecificAvro.
Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples
- Kafka Connect JDBC source connector produces Avro values, and null
Stringkeys, to a Kafka topic. - Client application reads from the Kafka topic using
GenericAvroSerdefor the value and then themapfunction to convert the stream of messages to haveLongkeys and custom class values.
This example currently uses GenericAvroSerde and not SpecificAvroSerde for a specific reason. JDBC source connector currently doesn't set a namespace when it generates a schema name for the data it is producing to Kafka. For SpecificAvroSerde, the lack of namespace is a problem when trying to match reader and writer schema because Avro uses the writer schema name and namespace to create a classname and tries to load this class, but without a namespace, the class will not be found.
Detailed walk-thru of this example is available in the whitepaper Kafka Serialization and Deserialization (SerDes) Examples
- Java client produces
Longkeys andSpecificAvrovalues to a Kafka topic. - Client application reads from the Kafka topic using
Serdes.Long()for key andSpecificAvroSerdefor the value.
- Kafka Connect JDBC source connector produces Avro values, and null keys, to a Kafka topic.
- KSQL reads from the Kafka topic and then uses
PARTITION BYto create a new stream of messages withBIGINTkeys.
All examples in this repo demonstrate the Kafka Streams API methods count and reduce.
KAFKA-5245: one needs to provide the Serdes twice, (1) when calling StreamsBuilder#stream() and (2) when calling KStream#groupByKey()
PR-531: Confluent distribution provides packages for GenericAvroSerde and SpecificAvroSerde
KAFKA-2378: adds APIs to be able to embed Kafka Connect into client applications
After you run ./start.sh:
- You should see each of the examples run end-to-end
- If you are running Confluent Platform, open your browser and navigate to the Control Center web interface Management -> Kafka Connect tab at http://localhost:9021/management/connect to see the two deployed connectors
- Beyond that, the real value of this demo is to see the provided configurations and client code
1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700
1|Raleigh|3
2|Dusseldorf|2
3|Moscow|3
4|Sydney|1
5|Chennai|1
1|Raleigh|1600
2|Dusseldorf|500
3|Moscow|1100
4|Sydney|200
5|Chennai|400









