@@ -9,6 +9,7 @@ import org.apache.kafka.clients.producer.{
99 ProducerConfig ,
1010 ProducerRecord
1111}
12+ import org .apache .kafka .common .header .internals .RecordHeaders
1213import org .apache .kafka .common .serialization .{
1314 ByteArraySerializer ,
1415 StringDeserializer ,
@@ -58,6 +59,33 @@ class EmbeddedKafkaMethodsSpec
5859 consumer.close()
5960 }
6061
62+ " publish synchronously a String message with a header to Kafka" in {
63+ implicit val serializer = new StringSerializer ()
64+ implicit val deserializer = new StringDeserializer ()
65+ val message = " hello world!"
66+ val topic = " publish_test_topic_with_header"
67+ val headerValue = " my_header_value"
68+ val headers = new RecordHeaders ().add(" my_header" , headerValue.toCharArray.map(_.toByte))
69+ val producerRecord = new ProducerRecord [String , String ](topic, null , " key" , message, headers)
70+
71+ publishToKafka(topic, producerRecord)
72+
73+ val consumer = kafkaConsumer
74+ consumer.subscribe(List (topic).asJava)
75+
76+ val records = consumer.poll(consumerPollTimeout)
77+
78+ records.iterator().hasNext shouldBe true
79+ val record = records.iterator().next()
80+
81+ record.value() shouldBe message
82+ val myHeader = record.headers().lastHeader(" my_header" )
83+ val actualHeaderValue = new String (myHeader.value().map(_.toChar))
84+ actualHeaderValue shouldBe headerValue
85+
86+ consumer.close()
87+ }
88+
6189 " publish synchronously a String message with String key to Kafka" in {
6290 implicit val serializer = new StringSerializer ()
6391 implicit val deserializer = new StringDeserializer ()
0 commit comments