diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerde.java new file mode 100644 index 000000000..32b746642 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerde.java @@ -0,0 +1,190 @@ +package io.kafbat.ui.serdes.builtin; + +import static org.springframework.util.MultiValueMap.fromSingleValue; +import static org.springframework.util.ObjectUtils.isEmpty; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors.Descriptor; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; +import io.kafbat.ui.exception.ValidationException; +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.PropertyResolver; +import io.kafbat.ui.serde.api.RecordHeaders; +import io.kafbat.ui.serde.api.SchemaDescription; +import io.kafbat.ui.serdes.BuiltInSerde; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.ByteArrayInputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.openapitools.jackson.nullable.JsonNullableModule; +import org.springframework.web.util.UriComponents; +import org.springframework.web.util.UriComponentsBuilder; +import reactor.netty.http.client.HttpClient; + +@Slf4j +public class ProtobufRemoteFileSerde implements BuiltInSerde { + + private HttpClient httpClient; + private String path; + private Map queryParams; + private ObjectMapper mapper; + + public static String name() { + return "ProtobufRemoteFile"; + } + + @Override + public void configure(PropertyResolver serdeProperties, + PropertyResolver kafkaClusterProperties, + PropertyResolver globalProperties) { + configure(Configuration.create(serdeProperties)); + } + + @VisibleForTesting + void configure(Configuration configuration) { + if (configuration.httpClient() == null) { + throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde"); + } + this.httpClient = configuration.httpClient(); + this.path = configuration.path(); + this.queryParams = configuration.queryParams(); + this.mapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .registerModule(new JsonNullableModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public Optional getDescription() { + return Optional.empty(); + } + + private Optional getDescriptorFromRemote(String topic, Target type) { + var params = new HashMap<>(queryParams); + params.put("topic", topic); + params.put("type", type.name()); + + UriComponents uriComponents = UriComponentsBuilder.newInstance().queryParams(fromSingleValue(params)).build(); + + var response = httpClient.get() + .uri(path + "?" + uriComponents.getQuery()) + .responseSingle(((httpResponse, bytes) -> + bytes.asString().map(this::read) + .map(it -> new RemoteResponse(httpResponse.status(), it)) + )) + .block(); + + if (response == null || response.status() != HttpResponseStatus.OK || isEmpty(response.schema)) { + throw new ValidationException(String.format("Error getting descriptor from remote for topic: %s", topic)); + } + + var messageTypeName = response.schema.msgTypeName; + var resolvedSchema = response.schema.schema; + + return Optional.of(resolvedSchema) + .map(ProtobufSchema::new) + .map(it -> it.toDescriptor(messageTypeName)); + } + + @Override + public boolean canDeserialize(String topic, Target type) { + return getDescriptorFromRemote(topic, type).isPresent(); + } + + @Override + public boolean canSerialize(String topic, Target type) { + return getDescriptorFromRemote(topic, type).isPresent(); + } + + @Override + public Serializer serializer(String topic, Target type) { + throw new UnsupportedOperationException(); + } + + @Override + public Deserializer deserializer(String topic, Target type) { + var descriptor = getDescriptorFromRemote(topic, type).orElseThrow(); + return new Deserializer() { + @SneakyThrows + @Override + public DeserializeResult deserialize(RecordHeaders headers, byte[] data) { + var protoMsg = DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(data)); + byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg); + var result = new String(jsonFromProto); + return new DeserializeResult( + result, + DeserializeResult.Type.JSON, + Map.of() + ); + } + }; + } + + @Override + public Optional getSchema(String topic, Target type) { + return Optional.empty(); + } + + private ResolvedSchema read(String response) { + try { + var parsedBody = mapper.readTree(response); + + var messageTypeName = parsedBody.get("msgTypeName").asText(); + var schema = parsedBody.get("schema").asText(); + return new ResolvedSchema(messageTypeName, schema); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + //---------------------------------------------------------------------------------------------------------------- + + @VisibleForTesting + record Configuration( + HttpClient httpClient, + String path, + Map queryParams + ) { + + static Configuration create(PropertyResolver properties) { + var url = properties.getProperty("url", String.class).orElseThrow(); + var path = properties.getProperty("path", String.class).orElseThrow(); + var timeout = properties.getProperty("timeout", String.class).map(Duration::parse).orElseThrow(); + + Optional> queryParams = properties.getMapProperty("query_params", String.class, String.class); + + HttpClient httpClient = HttpClient + .create() + .proxyWithSystemProperties() + .baseUrl(url) + .responseTimeout(timeout); + + return new Configuration(httpClient, path, queryParams.orElse(Collections.emptyMap())); + } + } + + record RemoteResponse( + HttpResponseStatus status, + ResolvedSchema schema) { + + } + + record ResolvedSchema( + String msgTypeName, + String schema + ) { + + } + +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerdeTest.java new file mode 100644 index 000000000..d064169e0 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRemoteFileSerdeTest.java @@ -0,0 +1,81 @@ +package io.kafbat.ui.serdes.builtin; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.google.protobuf.util.JsonFormat; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.util.ResourceUtil; +import java.io.IOException; +import java.util.Map; +import io.netty.handler.codec.http.HttpResponseStatus; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.core.io.ClassPathResource; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +public class ProtobufRemoteFileSerdeTest { + + private static final String samplePersonMsgJson = + "{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\", \"phones\":[] }"; + + private static final String sampleBookMsgJson = "{\"version\": 1, \"people\": [" + + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\", \"phones\":[]}]}"; + + // Sample message of type `test.Person` + private byte[] personMessageBytes; + + @BeforeEach + void setUp() throws Exception { + var schema = new ClassPathResource("protobuf-serde/address-book.proto"); + + var addressBookSchema = new ProtobufSchema(ResourceUtil.readAsString(schema)); + var builder = addressBookSchema.newMessageBuilder("test.Person"); + JsonFormat.parser().merge(samplePersonMsgJson, builder); + personMessageBytes = builder.build().toByteArray(); + } + + @Test + void serializeUsesTopicsMappingToFindMsgDescriptor() throws IOException { + var httpClient = mock(HttpClient.class); + + HttpClient.ResponseReceiver responseReceiver = mock(HttpClient.ResponseReceiver.class); + when(httpClient.get()).thenReturn(responseReceiver); + when(responseReceiver.uri(anyString())).thenReturn(responseReceiver); + when(responseReceiver.responseSingle(any())).thenReturn(Mono.just( + new ProtobufRemoteFileSerde.RemoteResponse( + HttpResponseStatus.OK, + new ProtobufRemoteFileSerde.ResolvedSchema( + "test.Person", + ResourceUtil.readAsString(new ClassPathResource("protobuf-serde/address-book.proto")) + ) + ) + )); + + var serde = new ProtobufRemoteFileSerde(); + serde.configure( + new ProtobufRemoteFileSerde.Configuration( + httpClient, + "/test", + Map.of("test", "test") + ) + ); + + var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE) + .deserialize(null, personMessageBytes); + assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult()); + } + + @SneakyThrows + private void assertJsonEquals(String expectedJson, String actualJson) { + var mapper = new JsonMapper(); + assertThat(mapper.readTree(actualJson)).isEqualTo(mapper.readTree(expectedJson)); + } +}