diff --git a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java index b4eb51493..854f57b9e 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java @@ -3,6 +3,7 @@ import io.kafbat.ui.model.TopicMessageDTO; import io.kafbat.ui.model.TopicMessageDTO.TimestampTypeEnum; import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.util.ContentUtils; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; @@ -68,7 +69,7 @@ private void fillHeaders(TopicMessageDTO message, ConsumerRecord r .forEachRemaining(header -> headers.put( header.key(), - header.value() != null ? new String(header.value()) : null + ContentUtils.convertToString(header.value()) )); message.setHeaders(headers); } diff --git a/api/src/main/java/io/kafbat/ui/util/ContentUtils.java b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java new file mode 100644 index 000000000..a3eafc2db --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java @@ -0,0 +1,129 @@ +package io.kafbat.ui.util; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Inspired by: https://github.com/tchiotludo/akhq/blob/dev/src/main/java/org/akhq/utils/ContentUtils.java + */ +public class ContentUtils { + private static final byte[] HEX_ARRAY = "0123456789ABCDEF".getBytes(StandardCharsets.US_ASCII); + + private static final CharsetDecoder UTF8_DECODER = StandardCharsets.UTF_8.newDecoder(); + + private ContentUtils() { + } + + /** + * Detects if bytes contain a UTF-8 string or something else. + * @param value the bytes to test for a UTF-8 encoded {@code java.lang.String} value + * @return true, if the byte[] contains a UTF-8 encode {@code java.lang.String} + */ + public static boolean isValidUtf8(byte[] value) { + // Any data exceeding 10KB will be treated as a string. + if (value.length > 10_000) { + return true; + } + try { + CharBuffer decode = UTF8_DECODER.decode(ByteBuffer.wrap(value)); + return decode.chars().allMatch(ContentUtils::isValidUtf8); + } catch (Exception e) { + return false; + } + } + + public static boolean isValidUtf8(int c) { + // SKIP NULL Symbols + if (c == 0) { + return false; + } + // Well known symbols + if (Character.isAlphabetic(c) + || Character.isDigit(c) + || Character.isWhitespace(c) + || Character.isEmoji(c) + ) { + return true; + } + // We could read only whitespace controls like + return !Character.isISOControl(c); + } + + /** + * Converts bytes to long. + * + * @param value the bytes to convert in to a long + * @return the long build from the given bytes + */ + public static Long asLong(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getLong() : null; + } + + /** + * Converts the given bytes to {@code int}. + * + * @param value the bytes to convert into a {@code int} + * @return the {@code int} build from the given bytes + */ + public static Integer asInt(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getInt() : null; + } + + /** + * Converts the given bytes to {@code short}. + * + * @param value the bytes to convert into a {@code short} + * @return the {@code short} build from the given bytes + */ + public static Short asShort(byte[] value) { + return value != null ? ByteBuffer.wrap(value).getShort() : null; + } + + /** + * Converts the given bytes either into a {@code java.lang.string}, {@code int}, + * {@code long} or {@code short} depending on the content it contains. + * @param value the bytes to convert + * @return the value as an {@code java.lang.string}, {@code int}, {@code long} or {@code short} + */ + public static String convertToString(byte[] value) { + String valueAsString = null; + + if (value != null) { + try { + if (ContentUtils.isValidUtf8(value)) { + valueAsString = new String(value); + } else { + if (value.length == Long.BYTES) { + valueAsString = String.valueOf(ContentUtils.asLong(value)); + } else if (value.length == Integer.BYTES) { + valueAsString = String.valueOf(ContentUtils.asInt(value)); + } else if (value.length == Short.BYTES) { + valueAsString = String.valueOf(ContentUtils.asShort(value)); + } else { + valueAsString = bytesToHex(value); + } + } + } catch (Exception ex) { + // Show the header as hexadecimal string + valueAsString = bytesToHex(value); + } + } + return valueAsString; + } + + // https://stackoverflow.com/questions/9655181/java-convert-a-byte-array-to-a-hex-string + public static String bytesToHex(byte[] bytes) { + byte[] hexChars = new byte[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = HEX_ARRAY[v >>> 4]; + hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; + } + return new String(hexChars, StandardCharsets.UTF_8); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/util/ContentUtilsTest.java b/api/src/test/java/io/kafbat/ui/util/ContentUtilsTest.java new file mode 100644 index 000000000..fa2926e3d --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/util/ContentUtilsTest.java @@ -0,0 +1,64 @@ +package io.kafbat.ui.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +public class ContentUtilsTest { + + private static byte[] toBytes(Short value) { + ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES); + buffer.putShort(value); + return buffer.array(); + } + + private static byte[] toBytes(Integer value) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(value); + return buffer.array(); + } + + private static byte[] toBytes(Long value) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(value); + return buffer.array(); + } + + @Test + void testHeaderValueStringUtf8() { + String testValue = "Test"; + + assertEquals(testValue, ContentUtils.convertToString(testValue.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void testHeaderValueInteger() { + int testValue = 1; + assertEquals(String.valueOf(testValue), ContentUtils.convertToString(toBytes(testValue))); + } + + @Test + void testHeaderValueLong() { + long testValue = 111L; + + assertEquals(String.valueOf(testValue), ContentUtils.convertToString(toBytes(testValue))); + } + + @Test + void testHeaderValueShort() { + short testValue = 10; + + assertEquals(String.valueOf(testValue), ContentUtils.convertToString(toBytes(testValue))); + } + + @Test + void testHeaderValueLongStringUtf8() { + String testValue = RandomStringUtils.random(10000, true, false); + + assertEquals(testValue, ContentUtils.convertToString(testValue.getBytes(StandardCharsets.UTF_8))); + } + +}