Skip to content

Commit a48123e

Browse files
authored
MINOR: Simplify Header Serdes (#21514)
HeadersSerializer and HeaderDeserializer are internal classes and there is no reason why they would need to implement the public Serializer and Deserializer interfaces. If we don't implement these interfaces, the actual serialize() and deserialize() methods can be static simplifying the code further. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, TengYao Chi <frankvicky@apache.org>
1 parent da0c207 commit a48123e

File tree

6 files changed

+32
-58
lines changed

6 files changed

+32
-58
lines changed

streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.kafka.common.header.Headers;
2020
import org.apache.kafka.common.header.internals.RecordHeaders;
21-
import org.apache.kafka.common.serialization.Deserializer;
2221
import org.apache.kafka.common.utils.ByteUtils;
2322

2423
import java.nio.ByteBuffer;
@@ -41,18 +40,17 @@
4140
*
4241
* This is used by KIP-1271 to deserialize headers from state stores.
4342
*/
44-
public class HeadersDeserializer implements Deserializer<Headers> {
43+
class HeadersDeserializer {
4544

4645
/**
4746
* Deserializes headers from a byte array using varint encoding per KIP-1271.
4847
* <p>
4948
* The input format is [count][header1][header2]... without a size prefix.
5049
*
51-
* @param topic topic associated with the data
5250
* @param data the serialized byte array (can be null)
5351
* @return the deserialized headers
5452
*/
55-
public Headers deserialize(final String topic, final byte[] data) {
53+
public static Headers deserialize(final byte[] data) {
5654
if (data == null || data.length == 0) {
5755
return new RecordHeaders();
5856
}
@@ -86,10 +84,4 @@ public Headers deserialize(final String topic, final byte[] data) {
8684

8785
return headers;
8886
}
89-
90-
public static Headers deserialize(final byte[] data) {
91-
try (HeadersDeserializer deserializer = new HeadersDeserializer()) {
92-
return deserializer.deserialize("", data);
93-
}
94-
}
9587
}

streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.kafka.common.errors.SerializationException;
2020
import org.apache.kafka.common.header.Header;
2121
import org.apache.kafka.common.header.Headers;
22-
import org.apache.kafka.common.serialization.Serializer;
2322
import org.apache.kafka.common.utils.ByteUtils;
2423

2524
import java.io.ByteArrayOutputStream;
@@ -47,7 +46,7 @@
4746
* <p>
4847
* This is used by KIP-1271 to serialize headers for storage in state stores.
4948
*/
50-
public class HeadersSerializer implements Serializer<Headers> {
49+
class HeadersSerializer {
5150

5251
/**
5352
* Serializes headers into a byte array using varint encoding per KIP-1271.
@@ -58,12 +57,10 @@ public class HeadersSerializer implements Serializer<Headers> {
5857
* For null or empty headers, returns an empty byte array (0 bytes)
5958
* instead of encoding headerCount=0 (1 byte).
6059
*
61-
* @param topic topic associated with data
6260
* @param headers the headers to serialize (can be null)
6361
* @return the serialized byte array (empty array if headers are null or empty)
6462
*/
65-
@Override
66-
public byte[] serialize(final String topic, final Headers headers) {
63+
public static byte[] serialize(final Headers headers) {
6764
final Header[] headersArray = (headers == null) ? new Header[0] : headers.toArray();
6865

6966
if (headersArray.length == 0) {

streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,20 @@
4949
*/
5050
class ValueTimestampHeadersDeserializer<V> implements WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
5151
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
52-
private static final HeadersDeserializer HEADERS_DESERIALIZER = new HeadersDeserializer();
5352

5453
public final Deserializer<V> valueDeserializer;
5554
private final LongDeserializer timestampDeserializer;
56-
private final HeadersDeserializer headersDeserializer;
5755

5856
ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) {
5957
Objects.requireNonNull(valueDeserializer);
6058
this.valueDeserializer = valueDeserializer;
6159
this.timestampDeserializer = new LongDeserializer();
62-
this.headersDeserializer = new HeadersDeserializer();
6360
}
6461

6562
@Override
6663
public void configure(final Map<String, ?> configs, final boolean isKey) {
6764
valueDeserializer.configure(configs, isKey);
6865
timestampDeserializer.configure(configs, isKey);
69-
headersDeserializer.configure(configs, isKey);
7066
}
7167

7268
@Override
@@ -79,7 +75,7 @@ public ValueTimestampHeaders<V> deserialize(final String topic, final byte[] val
7975
final int headersSize = ByteUtils.readVarint(buffer);
8076

8177
final byte[] rawHeaders = readBytes(buffer, headersSize);
82-
final Headers headers = headersDeserializer.deserialize(topic, rawHeaders);
78+
final Headers headers = HeadersDeserializer.deserialize(rawHeaders);
8379
final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
8480
final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp);
8581
final byte[] rawValue = readBytes(buffer, buffer.remaining());
@@ -92,7 +88,6 @@ public ValueTimestampHeaders<V> deserialize(final String topic, final byte[] val
9288
public void close() {
9389
valueDeserializer.close();
9490
timestampDeserializer.close();
95-
headersDeserializer.close();
9691
}
9792

9893
@Override
@@ -162,7 +157,7 @@ static Headers headers(final byte[] rawValueTimestampHeaders) {
162157
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
163158
final int headersSize = ByteUtils.readVarint(buffer);
164159
final byte[] rawHeaders = readBytes(buffer, headersSize);
165-
return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
160+
return HeadersDeserializer.deserialize(rawHeaders);
166161
}
167162
/**
168163
* Extract raw value from serialized ValueTimestampHeaders.

streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,20 @@
4949
*
5050
* This is used by KIP-1271 to serialize values with timestamps and headers for state stores.
5151
*/
52-
public class ValueTimestampHeadersSerializer<V> implements WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
52+
class ValueTimestampHeadersSerializer<V> implements WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
5353
public final Serializer<V> valueSerializer;
5454
private final LongSerializer timestampSerializer;
55-
private final HeadersSerializer headersSerializer;
5655

5756
ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
5857
Objects.requireNonNull(valueSerializer);
5958
this.valueSerializer = valueSerializer;
6059
this.timestampSerializer = new LongSerializer();
61-
this.headersSerializer = new HeadersSerializer();
6260
}
6361

6462
@Override
6563
public void configure(final Map<String, ?> configs, final boolean isKey) {
6664
valueSerializer.configure(configs, isKey);
6765
timestampSerializer.configure(configs, isKey);
68-
headersSerializer.configure(configs, isKey);
6966
}
7067

7168
@Override
@@ -95,7 +92,7 @@ private byte[] serialize(final String topic, final V plainValue, final long time
9592
final byte[] rawTimestamp = timestampSerializer.serialize(topic, timestamp);
9693

9794
// empty (byte[0]) for null/empty headers, or [count][header1][header2]... for non-empty
98-
final byte[] rawHeaders = headersSerializer.serialize(topic, headers);
95+
final byte[] rawHeaders = HeadersSerializer.serialize(headers);
9996

10097
// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
10198
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -116,7 +113,6 @@ private byte[] serialize(final String topic, final V plainValue, final long time
116113
public void close() {
117114
valueSerializer.close();
118115
timestampSerializer.close();
119-
headersSerializer.close();
120116
}
121117

122118
@Override

streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,17 @@
3131

3232
public class HeadersDeserializerTest {
3333

34-
private final HeadersSerializer serializer = new HeadersSerializer();
35-
private final HeadersDeserializer deserializer = new HeadersDeserializer();
36-
3734
@Test
3835
public void shouldDeserializeNullData() {
39-
final Headers headers = deserializer.deserialize("", null);
36+
final Headers headers = HeadersDeserializer.deserialize(null);
4037

4138
assertNotNull(headers);
4239
assertEquals(0, headers.toArray().length);
4340
}
4441

4542
@Test
4643
public void shouldDeserializeEmptyData() {
47-
final Headers headers = deserializer.deserialize("", new byte[0]);
44+
final Headers headers = HeadersDeserializer.deserialize(new byte[0]);
4845

4946
assertNotNull(headers);
5047
assertEquals(0, headers.toArray().length);
@@ -53,8 +50,8 @@ public void shouldDeserializeEmptyData() {
5350
@Test
5451
public void shouldRoundTripEmptyHeaders() {
5552
final Headers original = new RecordHeaders();
56-
final byte[] serialized = serializer.serialize("", original);
57-
final Headers deserialized = deserializer.deserialize("", serialized);
53+
final byte[] serialized = HeadersSerializer.serialize(original);
54+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
5855

5956
assertNotNull(deserialized);
6057
assertEquals(0, deserialized.toArray().length);
@@ -64,8 +61,8 @@ public void shouldRoundTripEmptyHeaders() {
6461
public void shouldRoundTripSingleHeader() {
6562
final Headers original = new RecordHeaders()
6663
.add("key1", "value1".getBytes());
67-
final byte[] serialized = serializer.serialize("", original);
68-
final Headers deserialized = deserializer.deserialize("", serialized);
64+
final byte[] serialized = HeadersSerializer.serialize(original);
65+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
6966

7067
assertNotNull(deserialized);
7168
assertEquals(1, deserialized.toArray().length);
@@ -82,8 +79,8 @@ public void shouldRoundTripMultipleHeaders() {
8279
.add("key0", "value0".getBytes())
8380
.add("key1", "value1".getBytes())
8481
.add("key2", "value2".getBytes());
85-
final byte[] serialized = serializer.serialize("", original);
86-
final Headers deserialized = deserializer.deserialize("", serialized);
82+
final byte[] serialized = HeadersSerializer.serialize(original);
83+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
8784
assertNotNull(deserialized);
8885

8986
final Header[] headerArray = deserialized.toArray();
@@ -99,8 +96,8 @@ public void shouldRoundTripMultipleHeaders() {
9996
public void shouldRoundTripHeaderWithNullValue() {
10097
final Headers original = new RecordHeaders()
10198
.add("key1", null);
102-
final byte[] serialized = serializer.serialize("", original);
103-
final Headers deserialized = deserializer.deserialize("", serialized);
99+
final byte[] serialized = HeadersSerializer.serialize(original);
100+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
104101

105102
assertNotNull(deserialized);
106103
assertEquals(1, deserialized.toArray().length);
@@ -115,8 +112,8 @@ public void shouldRoundTripHeaderWithNullValue() {
115112
public void shouldRoundTripHeaderWithEmptyValue() {
116113
final Headers original = new RecordHeaders()
117114
.add("key1", new byte[0]);
118-
final byte[] serialized = serializer.serialize("", original);
119-
final Headers deserialized = deserializer.deserialize("", serialized);
115+
final byte[] serialized = HeadersSerializer.serialize(original);
116+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
120117

121118
assertNotNull(deserialized);
122119
assertEquals(1, deserialized.toArray().length);
@@ -135,8 +132,8 @@ public void shouldAllowDuplicateKeys() {
135132
.add("key1", "value1".getBytes())
136133
.add("key2", "value2".getBytes())
137134
.add("key2", "value3".getBytes());
138-
final byte[] serialized = serializer.serialize("", original);
139-
final Headers deserialized = deserializer.deserialize("", serialized);
135+
final byte[] serialized = HeadersSerializer.serialize(original);
136+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
140137
assertNotNull(deserialized);
141138

142139
final Header[] headerArray = deserialized.toArray();

streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,9 @@
3030

3131
public class HeadersSerializerTest {
3232

33-
private final HeadersSerializer serializer = new HeadersSerializer();
34-
private final HeadersDeserializer deserializer = new HeadersDeserializer();
35-
3633
@Test
3734
public void shouldSerializeNullHeaders() {
38-
final byte[] serialized = serializer.serialize("", null);
35+
final byte[] serialized = HeadersSerializer.serialize(null);
3936

4037
assertNotNull(serialized);
4138
assertEquals(0, serialized.length, "Null headers should serialize to empty byte array (0 bytes)");
@@ -44,7 +41,7 @@ public void shouldSerializeNullHeaders() {
4441
@Test
4542
public void shouldSerializeEmptyHeaders() {
4643
final Headers headers = new RecordHeaders();
47-
final byte[] serialized = serializer.serialize("", headers);
44+
final byte[] serialized = HeadersSerializer.serialize(headers);
4845

4946
assertNotNull(serialized);
5047
assertEquals(0, serialized.length, "Empty headers should serialize to empty byte array (0 bytes)");
@@ -54,12 +51,12 @@ public void shouldSerializeEmptyHeaders() {
5451
public void shouldSerializeSingleHeader() {
5552
final Headers headers = new RecordHeaders()
5653
.add("key1", "value1".getBytes());
57-
final byte[] serialized = serializer.serialize("", headers);
54+
final byte[] serialized = HeadersSerializer.serialize(headers);
5855

5956
assertNotNull(serialized);
6057
assertTrue(serialized.length > 0);
6158

62-
final Headers deserialized = deserializer.deserialize("", serialized);
59+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
6360
assertNotNull(deserialized);
6461
assertEquals(1, deserialized.toArray().length);
6562

@@ -75,12 +72,12 @@ public void shouldSerializeMultipleHeaders() {
7572
.add("key0", "value0".getBytes())
7673
.add("key1", "value1".getBytes())
7774
.add("key2", "value2".getBytes());
78-
final byte[] serialized = serializer.serialize("", headers);
75+
final byte[] serialized = HeadersSerializer.serialize(headers);
7976

8077
assertNotNull(serialized);
8178
assertTrue(serialized.length > 0);
8279

83-
final Headers deserialized = deserializer.deserialize("", serialized);
80+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
8481
assertNotNull(deserialized);
8582
assertEquals(3, deserialized.toArray().length);
8683

@@ -96,12 +93,12 @@ public void shouldSerializeMultipleHeaders() {
9693
public void shouldSerializeHeaderWithNullValue() {
9794
final Headers headers = new RecordHeaders()
9895
.add("key1", null);
99-
final byte[] serialized = serializer.serialize("", headers);
96+
final byte[] serialized = HeadersSerializer.serialize(headers);
10097

10198
assertNotNull(serialized);
10299
assertTrue(serialized.length > 0);
103100

104-
final Headers deserialized = deserializer.deserialize("", serialized);
101+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
105102
assertNotNull(deserialized);
106103
assertEquals(1, deserialized.toArray().length);
107104

@@ -115,12 +112,12 @@ public void shouldSerializeHeaderWithNullValue() {
115112
public void shouldSerializeHeadersWithEmptyValue() {
116113
final Headers headers = new RecordHeaders()
117114
.add("key1", new byte[0]);
118-
final byte[] serialized = serializer.serialize("", headers);
115+
final byte[] serialized = HeadersSerializer.serialize(headers);
119116

120117
assertNotNull(serialized);
121118
assertTrue(serialized.length > 0);
122119

123-
final Headers deserialized = deserializer.deserialize("", serialized);
120+
final Headers deserialized = HeadersDeserializer.deserialize(serialized);
124121
assertNotNull(deserialized);
125122
assertEquals(1, deserialized.toArray().length);
126123

0 commit comments

Comments
 (0)