Skip to content

Commit f4c8fed

Browse files
committed
feat: added protobuf serde based on remote files. Resolve 841
1 parent a05709f commit f4c8fed

File tree

2 files changed

+271
-0
lines changed

2 files changed

+271
-0
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package io.kafbat.ui.serdes.builtin;
2+
3+
import static org.springframework.util.MultiValueMap.fromSingleValue;
4+
import static org.springframework.util.ObjectUtils.isEmpty;
5+
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import com.fasterxml.jackson.databind.DeserializationFeature;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
10+
import com.google.common.annotations.VisibleForTesting;
11+
import com.google.protobuf.Descriptors.Descriptor;
12+
import com.google.protobuf.DynamicMessage;
13+
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
14+
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
15+
import io.kafbat.ui.exception.ValidationException;
16+
import io.kafbat.ui.serde.api.DeserializeResult;
17+
import io.kafbat.ui.serde.api.PropertyResolver;
18+
import io.kafbat.ui.serde.api.RecordHeaders;
19+
import io.kafbat.ui.serde.api.SchemaDescription;
20+
import io.kafbat.ui.serdes.BuiltInSerde;
21+
import io.netty.handler.codec.http.HttpResponseStatus;
22+
import java.io.ByteArrayInputStream;
23+
import java.time.Duration;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
import lombok.SneakyThrows;
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.openapitools.jackson.nullable.JsonNullableModule;
31+
import org.springframework.web.util.UriComponents;
32+
import org.springframework.web.util.UriComponentsBuilder;
33+
import reactor.netty.http.client.HttpClient;
34+
35+
@Slf4j
36+
public class ProtobufRemoteFileSerde implements BuiltInSerde {
37+
38+
private HttpClient httpClient;
39+
private String path;
40+
private Map<String, String> queryParams;
41+
private ObjectMapper mapper;
42+
43+
public static String name() {
44+
return "ProtobufRemoteFile";
45+
}
46+
47+
@Override
48+
public void configure(PropertyResolver serdeProperties,
49+
PropertyResolver kafkaClusterProperties,
50+
PropertyResolver globalProperties) {
51+
configure(Configuration.create(serdeProperties));
52+
}
53+
54+
@VisibleForTesting
55+
void configure(Configuration configuration) {
56+
if (configuration.httpClient() == null) {
57+
throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde");
58+
}
59+
this.httpClient = configuration.httpClient();
60+
this.path = configuration.path();
61+
this.queryParams = configuration.queryParams();
62+
this.mapper = new ObjectMapper()
63+
.registerModule(new JavaTimeModule())
64+
.registerModule(new JsonNullableModule())
65+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
66+
}
67+
68+
@Override
69+
public Optional<String> getDescription() {
70+
return Optional.empty();
71+
}
72+
73+
private Optional<Descriptor> getDescriptorFromRemote(String topic, Target type) {
74+
var params = new HashMap<>(queryParams);
75+
params.put("topic", topic);
76+
params.put("type", type.name());
77+
78+
UriComponents uriComponents = UriComponentsBuilder.newInstance().queryParams(fromSingleValue(params)).build();
79+
80+
var response = httpClient.get()
81+
.uri(path + "?" + uriComponents.getQuery())
82+
.responseSingle(((httpResponse, bytes) ->
83+
bytes.asString().map(this::read)
84+
.map(it -> new RemoteResponse(httpResponse.status(), it))
85+
))
86+
.block();
87+
88+
if (response == null || response.status() != HttpResponseStatus.OK || isEmpty(response.schema)) {
89+
throw new ValidationException(String.format("Error getting descriptor from remote for topic: %s", topic));
90+
}
91+
92+
var messageTypeName = response.schema.msgTypeName;
93+
var resolvedSchema = response.schema.schema;
94+
95+
return Optional.of(resolvedSchema)
96+
.map(ProtobufSchema::new)
97+
.map(it -> it.toDescriptor(messageTypeName));
98+
}
99+
100+
@Override
101+
public boolean canDeserialize(String topic, Target type) {
102+
return getDescriptorFromRemote(topic, type).isPresent();
103+
}
104+
105+
@Override
106+
public boolean canSerialize(String topic, Target type) {
107+
return getDescriptorFromRemote(topic, type).isPresent();
108+
}
109+
110+
@Override
111+
public Serializer serializer(String topic, Target type) {
112+
throw new UnsupportedOperationException();
113+
}
114+
115+
@Override
116+
public Deserializer deserializer(String topic, Target type) {
117+
var descriptor = getDescriptorFromRemote(topic, type).orElseThrow();
118+
return new Deserializer() {
119+
@SneakyThrows
120+
@Override
121+
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
122+
var protoMsg = DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(data));
123+
byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg);
124+
var result = new String(jsonFromProto);
125+
return new DeserializeResult(
126+
result,
127+
DeserializeResult.Type.JSON,
128+
Map.of()
129+
);
130+
}
131+
};
132+
}
133+
134+
@Override
135+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
136+
return Optional.empty();
137+
}
138+
139+
private ResolvedSchema read(String response) {
140+
try {
141+
var parsedBody = mapper.readTree(response);
142+
143+
var messageTypeName = parsedBody.get("msgTypeName").asText();
144+
var schema = parsedBody.get("schema").asText();
145+
return new ResolvedSchema(messageTypeName, schema);
146+
} catch (JsonProcessingException e) {
147+
throw new RuntimeException(e);
148+
}
149+
}
150+
151+
//----------------------------------------------------------------------------------------------------------------
152+
153+
@VisibleForTesting
154+
record Configuration(
155+
HttpClient httpClient,
156+
String path,
157+
Map<String, String> queryParams
158+
) {
159+
160+
static Configuration create(PropertyResolver properties) {
161+
var url = properties.getProperty("url", String.class).orElseThrow();
162+
var path = properties.getProperty("path", String.class).orElseThrow();
163+
var timeout = properties.getProperty("timeout", String.class).map(Duration::parse).orElseThrow();
164+
165+
Optional<Map<String, String>> queryParams = properties.getMapProperty("query_params", String.class, String.class);
166+
167+
HttpClient httpClient = HttpClient
168+
.create()
169+
.proxyWithSystemProperties()
170+
.baseUrl(url)
171+
.responseTimeout(timeout);
172+
173+
return new Configuration(httpClient, path, queryParams.orElse(Collections.emptyMap()));
174+
}
175+
}
176+
177+
record RemoteResponse(
178+
HttpResponseStatus status,
179+
ResolvedSchema schema) {
180+
181+
}
182+
183+
record ResolvedSchema(
184+
String msgTypeName,
185+
String schema
186+
) {
187+
188+
}
189+
190+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.kafbat.ui.serdes.builtin;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.ArgumentMatchers.anyString;
6+
import static org.mockito.Mockito.mock;
7+
import static org.mockito.Mockito.when;
8+
9+
import com.fasterxml.jackson.databind.json.JsonMapper;
10+
import com.google.protobuf.util.JsonFormat;
11+
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
12+
import io.kafbat.ui.serde.api.Serde;
13+
import io.kafbat.ui.util.ResourceUtil;
14+
import java.io.IOException;
15+
import java.util.Map;
16+
import io.netty.handler.codec.http.HttpResponseStatus;
17+
import lombok.SneakyThrows;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
import org.springframework.core.io.ClassPathResource;
21+
import reactor.core.publisher.Mono;
22+
import reactor.netty.http.client.HttpClient;
23+
24+
public class ProtobufRemoteFileSerdeTest {
25+
26+
private static final String samplePersonMsgJson =
27+
"{ \"name\": \"My Name\",\"id\": 101, \"email\": \"[email protected]\", \"phones\":[] }";
28+
29+
private static final String sampleBookMsgJson = "{\"version\": 1, \"people\": ["
30+
+ "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"[email protected]\", \"phones\":[]}]}";
31+
32+
// Sample message of type `test.Person`
33+
private byte[] personMessageBytes;
34+
35+
@BeforeEach
36+
void setUp() throws Exception {
37+
var schema = new ClassPathResource("protobuf-serde/address-book.proto");
38+
39+
var addressBookSchema = new ProtobufSchema(ResourceUtil.readAsString(schema));
40+
var builder = addressBookSchema.newMessageBuilder("test.Person");
41+
JsonFormat.parser().merge(samplePersonMsgJson, builder);
42+
personMessageBytes = builder.build().toByteArray();
43+
}
44+
45+
@Test
46+
void serializeUsesTopicsMappingToFindMsgDescriptor() throws IOException {
47+
var httpClient = mock(HttpClient.class);
48+
49+
HttpClient.ResponseReceiver responseReceiver = mock(HttpClient.ResponseReceiver.class);
50+
when(httpClient.get()).thenReturn(responseReceiver);
51+
when(responseReceiver.uri(anyString())).thenReturn(responseReceiver);
52+
when(responseReceiver.responseSingle(any())).thenReturn(Mono.just(
53+
new ProtobufRemoteFileSerde.RemoteResponse(
54+
HttpResponseStatus.OK,
55+
new ProtobufRemoteFileSerde.ResolvedSchema(
56+
"test.Person",
57+
ResourceUtil.readAsString(new ClassPathResource("protobuf-serde/address-book.proto"))
58+
)
59+
)
60+
));
61+
62+
var serde = new ProtobufRemoteFileSerde();
63+
serde.configure(
64+
new ProtobufRemoteFileSerde.Configuration(
65+
httpClient,
66+
"/test",
67+
Map.of("test", "test")
68+
)
69+
);
70+
71+
var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
72+
.deserialize(null, personMessageBytes);
73+
assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
74+
}
75+
76+
@SneakyThrows
77+
private void assertJsonEquals(String expectedJson, String actualJson) {
78+
var mapper = new JsonMapper();
79+
assertThat(mapper.readTree(actualJson)).isEqualTo(mapper.readTree(expectedJson));
80+
}
81+
}

0 commit comments

Comments
 (0)