Skip to content

Commit 221f2a0

Browse files
authored
Merge pull request kroxylicious#2203 from SamBarker/checksum-the-proxy
Checksum the proxy
2 parents 822481a + 6d49282 commit 221f2a0

File tree

13 files changed

+580
-45
lines changed

13 files changed

+580
-45
lines changed

kroxylicious-filters/kroxylicious-simple-transform/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,16 @@
6969
<artifactId>mockito-junit-jupiter</artifactId>
7070
<scope>test</scope>
7171
</dependency>
72+
<dependency>
73+
<groupId>com.fasterxml.jackson.core</groupId>
74+
<artifactId>jackson-databind</artifactId>
75+
<scope>test</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>io.kroxylicious</groupId>
79+
<artifactId>kroxylicious-runtime</artifactId>
80+
<scope>test</scope>
81+
</dependency>
82+
7283
</dependencies>
7384
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter.simpletransform;
8+
9+
import java.io.IOException;
10+
import java.nio.ByteBuffer;
11+
import java.nio.charset.Charset;
12+
import java.nio.charset.IllegalCharsetNameException;
13+
import java.nio.charset.StandardCharsets;
14+
import java.nio.charset.UnsupportedCharsetException;
15+
import java.nio.file.Files;
16+
import java.nio.file.Path;
17+
import java.util.Objects;
18+
import java.util.Optional;
19+
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
import io.kroxylicious.proxy.plugin.Plugin;
23+
import io.kroxylicious.proxy.plugin.PluginConfigurationException;
24+
25+
@Plugin(configType = Replacing.Config.class)
26+
public class Replacing implements ByteBufferTransformationFactory<Replacing.Config> {
27+
public record Config(
28+
@JsonProperty String charset,
29+
@JsonProperty(required = true) String targetPattern,
30+
@JsonProperty String replacementValue,
31+
@JsonProperty Path pathToReplacementValue) {
32+
public Config(@JsonProperty String charset, @JsonProperty(required = true) String targetPattern, @JsonProperty String replacementValue,
33+
@JsonProperty Path pathToReplacementValue) {
34+
this.charset = Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8.name());
35+
this.targetPattern = targetPattern;
36+
this.replacementValue = replacementValue;
37+
this.pathToReplacementValue = pathToReplacementValue;
38+
}
39+
}
40+
41+
@Override
42+
public void validateConfiguration(Config config) throws PluginConfigurationException {
43+
config = requireConfig(config);
44+
try {
45+
Charset.forName(config.charset);
46+
}
47+
catch (IllegalCharsetNameException e) {
48+
throw new PluginConfigurationException("Illegal charset name: '" + config.charset + "'");
49+
}
50+
catch (UnsupportedCharsetException e) {
51+
throw new PluginConfigurationException("Unsupported charset: " + config.charset + "'");
52+
}
53+
if (config.replacementValue != null && config.pathToReplacementValue != null) {
54+
throw new PluginConfigurationException("Both replacementValue and pathToReplacementValue are specified. MAKE UP YOUR MIND");
55+
}
56+
if (config.pathToReplacementValue != null && !Files.isReadable(config.pathToReplacementValue)) {
57+
throw new PluginConfigurationException("Path: '" + config.pathToReplacementValue + "' is not readable. ");
58+
}
59+
}
60+
61+
@Override
62+
public Transformation createTransformation(Config configuration) {
63+
return new Transformation(configuration);
64+
}
65+
66+
public static class Transformation implements ByteBufferTransformation {
67+
68+
private final Charset charset;
69+
private final String targetPattern;
70+
private final String replaceWith;
71+
72+
Transformation(Config config) {
73+
this.charset = Charset.forName(Optional.ofNullable(config.charset()).orElse(StandardCharsets.UTF_8.name()));
74+
this.targetPattern = config.targetPattern;
75+
try {
76+
if (config.pathToReplacementValue != null) {
77+
this.replaceWith = Files.readString(config.pathToReplacementValue);
78+
}
79+
else {
80+
this.replaceWith = Objects.requireNonNullElse(config.replacementValue, "");
81+
}
82+
}
83+
catch (IOException e) {
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
88+
@Override
89+
public ByteBuffer transform(String topicName, ByteBuffer in) {
90+
return ByteBuffer.wrap(new String(charset.decode(in).array()).replaceAll(targetPattern, replaceWith).getBytes(charset));
91+
}
92+
}
93+
}

kroxylicious-filters/kroxylicious-simple-transform/src/main/resources/META-INF/services/io.kroxylicious.proxy.filter.simpletransform.ByteBufferTransformationFactory

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
55
#
66

7-
io.kroxylicious.proxy.filter.simpletransform.UpperCasing
7+
io.kroxylicious.proxy.filter.simpletransform.UpperCasing
8+
io.kroxylicious.proxy.filter.simpletransform.Replacing

kroxylicious-filters/kroxylicious-simple-transform/src/test/java/io/kroxylicious/proxy/filter/simpletransform/FetchResponseTransformationFilterTest.java

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66

77
package io.kroxylicious.proxy.filter.simpletransform;
88

9+
import java.io.IOException;
910
import java.nio.ByteBuffer;
1011
import java.nio.charset.StandardCharsets;
12+
import java.nio.file.Files;
13+
import java.nio.file.Path;
14+
import java.nio.file.Paths;
1115
import java.util.Collection;
1216
import java.util.Locale;
1317
import java.util.Objects;
@@ -35,12 +39,16 @@
3539
import org.junit.jupiter.api.BeforeEach;
3640
import org.junit.jupiter.api.Test;
3741
import org.junit.jupiter.api.extension.ExtendWith;
42+
import org.junit.jupiter.api.io.TempDir;
3843
import org.mockito.ArgumentCaptor;
3944
import org.mockito.Captor;
4045
import org.mockito.Mock;
4146
import org.mockito.junit.jupiter.MockitoExtension;
4247
import org.mockito.stubbing.Answer;
4348

49+
import com.fasterxml.jackson.databind.ObjectMapper;
50+
51+
import io.kroxylicious.proxy.config.ConfigParser;
4452
import io.kroxylicious.proxy.filter.FilterContext;
4553
import io.kroxylicious.proxy.filter.FilterFactoryContext;
4654
import io.kroxylicious.proxy.filter.ResponseFilterResult;
@@ -85,6 +93,8 @@ class FetchResponseTransformationFilterTest {
8593
@Captor
8694
private ArgumentCaptor<ApiMessage> apiMessageCaptor;
8795

96+
private static final ObjectMapper MAPPER = ConfigParser.createObjectMapper();
97+
8898
@BeforeEach
8999
@SuppressWarnings("unchecked")
90100
void setUp() {
@@ -122,11 +132,66 @@ void testFactory() {
122132
assertThat(factory.createFilter(constructContext, config)).isInstanceOf(FetchResponseTransformationFilter.class);
123133
}
124134

135+
@Test
136+
void shouldRequireConfigForReplacingFilter() {
137+
// Given
138+
FetchResponseTransformation factory = new FetchResponseTransformation();
139+
140+
// When
141+
// Then
142+
assertThatThrownBy(() -> factory.initialize(null, null)).isInstanceOf(PluginConfigurationException.class)
143+
.hasMessage(FetchResponseTransformation.class.getSimpleName() + " requires configuration, but config object is null");
144+
}
145+
146+
@Test
147+
void shouldConstructReplacingFilter() {
148+
FetchResponseTransformation factory = new FetchResponseTransformation();
149+
FilterFactoryContext constructContext = mock(FilterFactoryContext.class);
150+
doReturn(new Replacing()).when(constructContext).pluginInstance(any(), any());
151+
FetchResponseTransformation.Config config = new FetchResponseTransformation.Config(Replacing.class.getName(),
152+
new Replacing.Config(null, "foo", "bar", null));
153+
assertThat(factory.createFilter(constructContext, config)).isInstanceOf(FetchResponseTransformationFilter.class);
154+
}
155+
156+
@Test
157+
void shouldConstructReplacingFilterWithPath(@TempDir Path tempDir) throws IOException {
158+
Path replamventValuePath = Files.createFile(Path.of(tempDir.toAbsolutePath().toString(), "replacement-value.txt"));
159+
Files.writeString(replamventValuePath, "bar", StandardCharsets.UTF_8);
160+
FetchResponseTransformation factory = new FetchResponseTransformation();
161+
FilterFactoryContext constructContext = mock(FilterFactoryContext.class);
162+
doReturn(new Replacing()).when(constructContext).pluginInstance(any(), any());
163+
FetchResponseTransformation.Config config = new FetchResponseTransformation.Config(Replacing.class.getName(),
164+
new Replacing.Config(null, "foo", null, replamventValuePath));
165+
assertThat(factory.createFilter(constructContext, config)).isInstanceOf(FetchResponseTransformationFilter.class);
166+
}
167+
168+
@Test
169+
void shouldConstructReplacingFilterWithPathConfig(@TempDir Path tempDir) throws IOException {
170+
// Given
171+
Path replamventValuePath = Files.createFile(Path.of(tempDir.toAbsolutePath().toString(), "replacement-value.txt"));
172+
Path valuePath = Files.writeString(replamventValuePath, "bar", StandardCharsets.UTF_8);
173+
174+
FetchResponseTransformation.Config config = new FetchResponseTransformation.Config(Replacing.class.getName(),
175+
new Replacing.Config(null, "foo", null, Paths.get(valuePath.toUri())));
176+
// toUri called so the expected and actual match see
177+
// https://github.com/FasterXML/jackson-databind/blob/099481bf725afd11dfd4f3c23eed9465fa3391da/src/main/java/com/fasterxml/jackson/databind/ext/NioPathDeserializer.java#L65
178+
179+
String snippet = MAPPER.writeValueAsString(config).stripTrailing();
180+
181+
// When
182+
FetchResponseTransformation.Config actual = MAPPER.readValue(snippet, FetchResponseTransformation.Config.class);
183+
184+
// Then
185+
assertThat(actual)
186+
.isInstanceOf(FetchResponseTransformation.Config.class)
187+
.isEqualTo(config);
188+
}
189+
125190
@Test
126191
void filterHandlesPreV13ResponseBasedOnTopicNames() throws Exception {
127192

128193
var fetchResponse = new FetchResponseData();
129-
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord(RECORD_KEY, ORIGINAL_RECORD_VALUE).setTopic(TOPIC_NAME)); // Version 12
194+
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord().setTopic(TOPIC_NAME)); // Version 12
130195

131196
var stage = filter.onFetchResponse(fetchResponse.apiKey(), new ResponseHeaderData(), fetchResponse, context);
132197
assertThat(stage).isCompleted();
@@ -147,7 +212,7 @@ void filterHandlesPreV13ResponseBasedOnTopicNames() throws Exception {
147212
void filterHandlesV13OrHigherResponseBasedOnTopicIds() throws Exception {
148213

149214
var fetchResponse = new FetchResponseData();
150-
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord(RECORD_KEY, ORIGINAL_RECORD_VALUE).setTopicId(TOPIC_ID));
215+
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord().setTopicId(TOPIC_ID));
151216

152217
var metadataResponse = new MetadataResponseData();
153218
metadataResponse.topics().add(new MetadataResponseData.MetadataResponseTopic().setTopicId(TOPIC_ID).setName(TOPIC_NAME));
@@ -185,7 +250,7 @@ void filterHandlesMetadataRequestError() {
185250

186251
var fetchResponse = new FetchResponseData();
187252
// Version 13 switched to topic id rather than topic names.
188-
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord(RECORD_KEY, ORIGINAL_RECORD_VALUE).setTopicId(TOPIC_ID));
253+
fetchResponse.responses().add(createFetchableTopicResponseWithOneRecord().setTopicId(TOPIC_ID));
189254

190255
var metadataResponse = new MetadataResponseData();
191256
metadataResponse.topics().add(new MetadataResponseData.MetadataResponseTopic().setTopicId(TOPIC_ID).setName(TOPIC_NAME));
@@ -212,19 +277,19 @@ private Stream<Record> responseToRecordStream(FetchResponseData filteredResponse
212277
}
213278

214279
@NonNull
215-
private static FetchableTopicResponse createFetchableTopicResponseWithOneRecord(String key, String value) {
280+
private static FetchableTopicResponse createFetchableTopicResponseWithOneRecord() {
216281
var fetchableTopicResponse = new FetchableTopicResponse();
217282
var partitionData1 = new PartitionData();
218-
partitionData1.setRecords(buildOneRecord(key, value));
283+
partitionData1.setRecords(buildOneRecord());
219284
fetchableTopicResponse.partitions().add(partitionData1);
220285
return fetchableTopicResponse;
221286
}
222287

223-
private static MemoryRecords buildOneRecord(String key, String value) {
288+
private static MemoryRecords buildOneRecord() {
224289
ByteBuffer buffer = ByteBuffer.allocate(1024);
225290
try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
226291
Compression.NONE, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis())) {
227-
builder.append(0L, key.getBytes(), value.getBytes());
292+
builder.append(0L, FetchResponseTransformationFilterTest.RECORD_KEY.getBytes(), FetchResponseTransformationFilterTest.ORIGINAL_RECORD_VALUE.getBytes());
228293
return builder.build();
229294
}
230295
}

0 commit comments

Comments
 (0)