Skip to content

Commit a48e799

Browse files
authored
BE: SR: Support SR vendor-specific content-types (#1697)
1 parent 0184b40 commit a48e799

File tree

3 files changed

+115
-1
lines changed

3 files changed

+115
-1
lines changed

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Properties;
3333
import java.util.stream.Stream;
3434
import lombok.extern.slf4j.Slf4j;
35+
import org.springframework.http.MediaType;
3536
import org.springframework.stereotype.Service;
3637
import org.springframework.util.StringUtils;
3738
import org.springframework.util.unit.DataSize;
@@ -48,6 +49,10 @@ public class KafkaClusterFactory {
4849
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
4950
private static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofSeconds(20);
5051

52+
// Confluent Schema Registry API content types (used by WarpStream and other compatible implementations)
53+
private static final MediaType SR_V1_JSON = MediaType.parseMediaType("application/vnd.schemaregistry.v1+json");
54+
private static final MediaType SR_JSON = MediaType.parseMediaType("application/vnd.schemaregistry+json");
55+
5156
private final DataSize webClientMaxBuffSize;
5257
private final Duration responseTimeout;
5358
private final JmxMetricsRetriever jmxMetricsRetriever;
@@ -233,6 +238,7 @@ private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperti
233238
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
234239
.configureBasicAuth(auth.getUsername(), auth.getPassword())
235240
.configureBufferSize(webClientMaxBuffSize)
241+
.configureAdditionalDecoderMediaTypes(SR_V1_JSON, SR_JSON)
236242
.build();
237243
return ReactiveFailover.create(
238244
parseUrlList(clusterProperties.getSchemaRegistry()),

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.security.KeyStore;
1313
import java.time.Duration;
1414
import java.util.function.Consumer;
15+
import java.util.stream.Stream;
1516
import javax.annotation.Nullable;
1617
import javax.net.ssl.KeyManagerFactory;
1718
import javax.net.ssl.TrustManagerFactory;
@@ -33,9 +34,10 @@ public class WebClientConfigurator {
3334
private HttpClient httpClient = HttpClient
3435
.create()
3536
.proxyWithSystemProperties();
37+
private ObjectMapper objectMapper = defaultOM();
3638

3739
public WebClientConfigurator() {
38-
configureObjectMapper(defaultOM());
40+
configureObjectMapper(objectMapper);
3941
}
4042

4143
private static ObjectMapper defaultOM() {
@@ -131,6 +133,7 @@ public WebClientConfigurator configureBufferSize(DataSize maxBuffSize) {
131133
}
132134

133135
public void configureObjectMapper(ObjectMapper mapper) {
136+
this.objectMapper = mapper;
134137
builder.codecs(codecs -> {
135138
codecs.defaultCodecs()
136139
.jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, MediaType.APPLICATION_JSON));
@@ -139,6 +142,20 @@ public void configureObjectMapper(ObjectMapper mapper) {
139142
});
140143
}
141144

145+
public WebClientConfigurator configureAdditionalDecoderMediaTypes(MediaType... additionalMediaTypes) {
146+
builder.codecs(codecs -> {
147+
codecs.defaultCodecs()
148+
.jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON));
149+
MediaType[] allMediaTypes = Stream.concat(
150+
Stream.of(MediaType.APPLICATION_JSON),
151+
Stream.of(additionalMediaTypes)
152+
).toArray(MediaType[]::new);
153+
codecs.defaultCodecs()
154+
.jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, allMediaTypes));
155+
});
156+
return this;
157+
}
158+
142159
public WebClientConfigurator configureCodecs(Consumer<ClientCodecConfigurer> configurer) {
143160
builder.codecs(configurer);
144161
return this;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.kafbat.ui.util;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.io.IOException;
6+
import okhttp3.mockwebserver.MockResponse;
7+
import okhttp3.mockwebserver.MockWebServer;
8+
import org.junit.jupiter.api.AfterEach;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.springframework.http.MediaType;
12+
import org.springframework.web.reactive.function.client.WebClient;
13+
14+
class WebClientConfiguratorTest {
15+
16+
private final MockWebServer mockWebServer = new MockWebServer();
17+
18+
@BeforeEach
19+
void startMockServer() throws IOException {
20+
mockWebServer.start();
21+
}
22+
23+
@AfterEach
24+
void stopMockServer() throws IOException {
25+
mockWebServer.close();
26+
}
27+
28+
@Test
29+
void decodesStandardApplicationJson() {
30+
mockWebServer.enqueue(new MockResponse()
31+
.addHeader("Content-Type", "application/json")
32+
.setBody("{\"name\":\"test\"}"));
33+
34+
WebClient client = new WebClientConfigurator().build();
35+
String body = client.get()
36+
.uri(mockWebServer.url("/").toString())
37+
.retrieve()
38+
.bodyToMono(String.class)
39+
.block();
40+
41+
assertThat(body).isEqualTo("{\"name\":\"test\"}");
42+
}
43+
44+
@Test
45+
void decodesVendorMediaTypeWhenConfigured() {
46+
MediaType srV1Json = MediaType.parseMediaType("application/vnd.schemaregistry.v1+json");
47+
48+
mockWebServer.enqueue(new MockResponse()
49+
.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json")
50+
.setBody("{\"compatibilityLevel\":\"BACKWARD\"}"));
51+
52+
WebClient client = new WebClientConfigurator()
53+
.configureAdditionalDecoderMediaTypes(srV1Json)
54+
.build();
55+
56+
CompatibilityConfig config = client.get()
57+
.uri(mockWebServer.url("/config").toString())
58+
.retrieve()
59+
.bodyToMono(CompatibilityConfig.class)
60+
.block();
61+
62+
assertThat(config).isNotNull();
63+
assertThat(config.compatibilityLevel()).isEqualTo("BACKWARD");
64+
}
65+
66+
@Test
67+
void decodesMultipleVendorMediaTypes() {
68+
MediaType srV1Json = MediaType.parseMediaType("application/vnd.schemaregistry.v1+json");
69+
MediaType srJson = MediaType.parseMediaType("application/vnd.schemaregistry+json");
70+
71+
mockWebServer.enqueue(new MockResponse()
72+
.addHeader("Content-Type", "application/vnd.schemaregistry+json")
73+
.setBody("{\"compatibilityLevel\":\"FULL\"}"));
74+
75+
WebClient client = new WebClientConfigurator()
76+
.configureAdditionalDecoderMediaTypes(srV1Json, srJson)
77+
.build();
78+
79+
CompatibilityConfig config = client.get()
80+
.uri(mockWebServer.url("/config").toString())
81+
.retrieve()
82+
.bodyToMono(CompatibilityConfig.class)
83+
.block();
84+
85+
assertThat(config).isNotNull();
86+
assertThat(config.compatibilityLevel()).isEqualTo("FULL");
87+
}
88+
89+
record CompatibilityConfig(String compatibilityLevel) {
90+
}
91+
}

0 commit comments

Comments
 (0)