From fa69309842b67552a7245815ca78b01470027e07 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Tue, 30 Sep 2025 08:59:45 +0800 Subject: [PATCH 1/2] KAFKA-19634: document the encoding of nullable struct --- .../kafka/common/protocol/types/Schema.java | 20 ++++++++++++++++++- .../kafka/common/protocol/types/Type.java | 3 ++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 325a77fe43ffc..ad95410471610 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol.types; +import org.apache.kafka.common.protocol.types.Type.DocumentedType; + import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -24,7 +26,10 @@ /** * The schema for a compound record definition */ -public final class Schema extends Type { +public final class Schema extends DocumentedType { + + private static final String STRUCT_TYPE_NAME = "STRUCT"; + private static final Object[] NO_VALUES = new Object[0]; private final BoundField[] fields; @@ -206,6 +211,19 @@ public Struct validate(Object item) { } } + @Override + public String typeName() { + return STRUCT_TYPE_NAME; + } + + @Override + public String documentation() { + return "Represents a composite object or null. " + + "For non-null values, the first byte is an INT8 with value 1, " + + "followed by the serialization of each field in the order they are defined. " + + "A null value is encoded as an INT8 with value -1 and there are no following bytes."; + } + public void walk(Visitor visitor) { Objects.requireNonNull(visitor, "visitor must be non-null"); handleNode(this, visitor); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index f4c0ee5705c14..5d8c37901fb21 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -1116,7 +1116,8 @@ private static String toHtml() { UINT16, UNSIGNED_INT32, VARINT, VARLONG, UUID, FLOAT64, STRING, COMPACT_STRING, NULLABLE_STRING, COMPACT_NULLABLE_STRING, BYTES, COMPACT_BYTES, NULLABLE_BYTES, COMPACT_NULLABLE_BYTES, - RECORDS, COMPACT_RECORDS, new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING)}; + RECORDS, COMPACT_RECORDS, new ArrayOf(STRING), new CompactArrayOf(COMPACT_STRING), + new Schema()}; final StringBuilder b = new StringBuilder(); b.append("\n"); b.append(""); From 6f1464e58bb1e8fc64186d2537bf4371e8b85f74 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Tue, 30 Sep 2025 09:12:24 +0800 Subject: [PATCH 2/2] rename struct --- .../java/org/apache/kafka/common/protocol/types/Schema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index ad95410471610..41915dd5ebbe5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -28,7 +28,7 @@ */ public final class Schema extends DocumentedType { - private static final String STRUCT_TYPE_NAME = "STRUCT"; + private static final String STRUCT_TYPE_NAME = "NULLABLE_STRUCT"; private static final Object[] NO_VALUES = new Object[0];