Skip to content

Commit 57e9f98

Browse files
authored
KAFKA-19644 Enhance the documentation for producer headers and integration tests (#20524)
- Improve the docs for Record Headers. - Add integration tests to verify that the order of headers in a record is preserved when producing and consuming. - Add unit tests for RecordHeaders.java. Reviewers: Ken Huang <[email protected]>, Hong-Yi Chen <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 848e3d0 commit 57e9f98

File tree

6 files changed

+100
-15
lines changed

6 files changed

+100
-15
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ private void testHeaders(Map<String, Object> consumerConfig) throws Exception {
203203
) {
204204
var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes());
205205
record.headers().add("headerKey", "headerValue".getBytes());
206+
record.headers().add("headerKey2", "headerValue2".getBytes());
207+
record.headers().add("headerKey3", "headerValue3".getBytes());
206208
producer.send(record);
209+
producer.flush();
207210

208211
assertEquals(0, consumer.assignment().size());
209212
consumer.assign(List.of(TP));
@@ -212,8 +215,15 @@ var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getByt
212215
consumer.seek(TP, 0);
213216
var records = consumeRecords(consumer, numRecords);
214217
assertEquals(numRecords, records.size());
218+
215219
var header = records.get(0).headers().lastHeader("headerKey");
216220
assertEquals("headerValue", header == null ? null : new String(header.value()));
221+
222+
// Test the order of headers in a record is preserved when producing and consuming
223+
Header[] headers = records.get(0).headers().toArray();
224+
assertEquals("headerKey", headers[0].key());
225+
assertEquals("headerKey2", headers[1].key());
226+
assertEquals("headerKey3", headers[2].key());
217227
}
218228
}
219229

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,8 @@ public void testHeaders() {
467467
int numRecords = 1;
468468
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
469469
record.headers().add("headerKey", "headerValue".getBytes());
470+
record.headers().add("headerKey2", "headerValue2".getBytes());
471+
record.headers().add("headerKey3", "headerValue3".getBytes());
470472
producer.send(record);
471473
producer.flush();
472474

@@ -475,11 +477,15 @@ public void testHeaders() {
475477
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
476478
assertEquals(numRecords, records.size());
477479

478-
for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
479-
Header header = consumerRecord.headers().lastHeader("headerKey");
480-
if (header != null)
481-
assertEquals("headerValue", new String(header.value()));
482-
}
480+
Header header = records.get(0).headers().lastHeader("headerKey");
481+
assertEquals("headerValue", new String(header.value()));
482+
483+
// Test the order of headers in a record is preserved when producing and consuming
484+
Header[] headers = records.get(0).headers().toArray();
485+
assertEquals("headerKey", headers[0].key());
486+
assertEquals("headerKey2", headers[1].key());
487+
assertEquals("headerKey3", headers[2].key());
488+
483489
verifyShareGroupStateTopicRecordsProduced();
484490
}
485491
}

clients/src/main/java/org/apache/kafka/common/header/Header.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,23 @@
1616
*/
1717
package org.apache.kafka.common.header;
1818

19+
/**
20+
* A header is a key-value pair.
21+
*/
1922
public interface Header {
20-
23+
24+
/**
25+
* Returns the key of the header.
26+
*
27+
* @return the header's key; must not be null.
28+
*/
2129
String key();
2230

31+
/**
32+
* Returns the value of the header.
33+
*
34+
* @return the header's value; may be null.
35+
*/
2336
byte[] value();
24-
37+
2538
}

clients/src/main/java/org/apache/kafka/common/header/Headers.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616
*/
1717
package org.apache.kafka.common.header;
1818

19+
20+
/**
21+
* A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}.
22+
* <p>
23+
* The order of headers is preserved in the order they were added.
24+
*/
1925
public interface Headers extends Iterable<Header> {
2026

2127
/**
2228
* Adds a header (key inside), to the end, returning if the operation succeeded.
2329
*
24-
* @param header the Header to be added
30+
* @param header the Header to be added.
2531
* @return this instance of the Headers, once the header is added.
2632
* @throws IllegalStateException is thrown if headers are in a read-only state.
2733
*/
@@ -30,17 +36,18 @@ public interface Headers extends Iterable<Header> {
3036
/**
3137
* Creates and adds a header, to the end, returning if the operation succeeded.
3238
*
33-
* @param key of the header to be added.
34-
* @param value of the header to be added.
39+
* @param key of the header to be added; must not be null.
40+
* @param value of the header to be added; may be null.
3541
* @return this instance of the Headers, once the header is added.
3642
* @throws IllegalStateException is thrown if headers are in a read-only state.
3743
*/
3844
Headers add(String key, byte[] value) throws IllegalStateException;
3945

4046
/**
41-
* Removes all headers for the given key returning if the operation succeeded.
47+
* Removes all headers for the given key returning if the operation succeeded,
48+
* while preserving the insertion order of the remaining headers.
4249
*
43-
* @param key to remove all headers for.
50+
* @param key to remove all headers for; must not be null.
4451
* @return this instance of the Headers, once the header is removed.
4552
* @throws IllegalStateException is thrown if headers are in a read-only state.
4653
*/
@@ -49,16 +56,17 @@ public interface Headers extends Iterable<Header> {
4956
/**
5057
* Returns just one (the very last) header for the given key, if present.
5158
*
52-
* @param key to get the last header for.
59+
* @param key to get the last header for; must not be null.
5360
* @return this last header matching the given key, returns null if not present.
5461
*/
5562
Header lastHeader(String key);
5663

5764
/**
5865
* Returns all headers for the given key, in the order they were added in, if present.
66+
* The iterator does not support {@link java.util.Iterator#remove()}.
5967
*
60-
* @param key to return the headers for.
61-
* @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned.
68+
* @param key to return the headers for; must not be null.
69+
* @return all headers for the given key, in the order they were added in, if NO headers are present an empty iterable is returned.
6270
*/
6371
Iterable<Header> headers(String key);
6472

clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,21 @@ public void testAdd() {
4747
assertEquals(2, getCount(headers));
4848
}
4949

50+
@Test
51+
public void testAddHeadersPreserveOrder() {
52+
Headers headers = new RecordHeaders();
53+
headers.add(new RecordHeader("key", "value".getBytes()));
54+
headers.add(new RecordHeader("key2", "value2".getBytes()));
55+
headers.add(new RecordHeader("key3", "value3".getBytes()));
56+
57+
Header[] headersArr = headers.toArray();
58+
assertHeader("key", "value", headersArr[0]);
59+
assertHeader("key2", "value2", headersArr[1]);
60+
assertHeader("key3", "value3", headersArr[2]);
61+
62+
assertEquals(3, getCount(headers));
63+
}
64+
5065
@Test
5166
public void testRemove() {
5267
Headers headers = new RecordHeaders();
@@ -59,6 +74,27 @@ public void testRemove() {
5974
assertFalse(headers.iterator().hasNext());
6075
}
6176

77+
@Test
78+
public void testPreserveOrderAfterRemove() {
79+
Headers headers = new RecordHeaders();
80+
headers.add(new RecordHeader("key", "value".getBytes()));
81+
headers.add(new RecordHeader("key2", "value2".getBytes()));
82+
headers.add(new RecordHeader("key3", "value3".getBytes()));
83+
84+
headers.remove("key");
85+
Header[] headersArr = headers.toArray();
86+
assertHeader("key2", "value2", headersArr[0]);
87+
assertHeader("key3", "value3", headersArr[1]);
88+
assertEquals(2, getCount(headers));
89+
90+
headers.add(new RecordHeader("key4", "value4".getBytes()));
91+
headers.remove("key3");
92+
headersArr = headers.toArray();
93+
assertHeader("key2", "value2", headersArr[0]);
94+
assertHeader("key4", "value4", headersArr[1]);
95+
assertEquals(2, getCount(headers));
96+
}
97+
6298
@Test
6399
public void testAddRemoveInterleaved() {
64100
Headers headers = new RecordHeaders();
@@ -127,6 +163,17 @@ public void testLastHeader() {
127163

128164
}
129165

166+
@Test
167+
public void testHeadersIteratorRemove() {
168+
Headers headers = new RecordHeaders();
169+
headers.add(new RecordHeader("key", "value".getBytes()));
170+
171+
Iterator<Header> headersIterator = headers.headers("key").iterator();
172+
headersIterator.next();
173+
assertThrows(UnsupportedOperationException.class,
174+
headersIterator::remove);
175+
}
176+
130177
@Test
131178
public void testReadOnly() {
132179
RecordHeaders headers = new RecordHeaders();

docs/implementation.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ <h5 class="anchor-heading"><a id="recordheader" class="anchor-link"></a><a href=
101101
headerKey: String
102102
headerValueLength: varint
103103
Value: byte[]</code></pre>
104+
<p>The key of a record header is guaranteed to be non-null, while the value of a record header may be null. The order of headers in a record is preserved when producing and consuming.</p>
104105
<p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
105106
is also encoded as a varint.</p>
106107

0 commit comments

Comments
 (0)