Skip to content

Commit ae62e34

Browse files
committed
Require opt-in to allow testing configuration
Adds a concept of configuration features. We want to add some features to the proxy to assist in testing that we want to be disallowed unless the user has opted into them, telling the proxy that these features are unlocked. By default the proxy has no features enabled and will log an error and fail to start if any testing configurations are specified in the config file. To opt in to using test-only configuration the user must supply either: 1. a system prop `-DKROXYLICIOUS_UNLOCK_TEST_ONLY_CONFIGURATION=true` 2. an env var `KROXYLICIOUS_UNLOCK_TEST_ONLY_CONFIGURATION=true` with the system-prop taking precedence. Integration tests behave the same way,you have to enable the feature explicitly in your test to use any test-only features. Why: We plan to add some opt-in configuration to enable overriding the maximum version the proxy accepts for an ApiKey. This is test only functionality and we want to ensure this isn't accidentally applied in production without some kind of opt-in hoops to leap through. Signed-off-by: Robert Young <robeyoun@redhat.com>
1 parent 14b7fd2 commit ae62e34

File tree

14 files changed

+587
-52
lines changed

14 files changed

+587
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#1648](https://github.com/kroxylicious/kroxylicious/pull/1648) Add test-only feature mechanism to Proxy configuration
1011
* [#1379](https://github.com/kroxylicious/kroxylicious/issues/1379) Remove Deprecated EnvelopeEncryption
1112
* [#1561](https://github.com/kroxylicious/kroxylicious/pull/1631) Allow Trust and ClientAuth to be set for Downstream TLS
1213
* [#1550](https://github.com/kroxylicious/kroxylicious/pull/1550) Upgrade Apache Kafka from 3.8.0 to 3.9.0 #1550

kroxylicious-app/src/main/java/io/kroxylicious/app/Kroxylicious.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
import java.io.File;
99
import java.io.InputStream;
1010
import java.nio.file.Files;
11+
import java.util.Arrays;
1112
import java.util.Properties;
1213
import java.util.concurrent.Callable;
13-
import java.util.function.BiFunction;
1414

1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
@@ -19,6 +19,8 @@
1919
import io.kroxylicious.proxy.config.ConfigParser;
2020
import io.kroxylicious.proxy.config.Configuration;
2121
import io.kroxylicious.proxy.config.PluginFactoryRegistry;
22+
import io.kroxylicious.proxy.internal.config.Feature;
23+
import io.kroxylicious.proxy.internal.config.Features;
2224

2325
import picocli.CommandLine;
2426
import picocli.CommandLine.Command;
@@ -35,13 +37,17 @@ public class Kroxylicious implements Callable<Integer> {
3537

3638
private static final Logger LOGGER = LoggerFactory.getLogger("io.kroxylicious.proxy.StartupShutdownLogger");
3739
private static final String UNKNOWN = "unknown";
38-
private final BiFunction<PluginFactoryRegistry, Configuration, KafkaProxy> proxyBuilder;
40+
private final KafkaProxyBuilder proxyBuilder;
41+
42+
interface KafkaProxyBuilder {
43+
KafkaProxy build(PluginFactoryRegistry registry, Configuration config, Features features);
44+
}
3945

4046
Kroxylicious() {
4147
this(KafkaProxy::new);
4248
}
4349

44-
Kroxylicious(BiFunction<PluginFactoryRegistry, Configuration, KafkaProxy> proxyBuilder) {
50+
Kroxylicious(KafkaProxyBuilder proxyBuilder) {
4551
this.proxyBuilder = proxyBuilder;
4652
}
4753

@@ -61,8 +67,9 @@ public Integer call() throws Exception {
6167
try (InputStream stream = Files.newInputStream(configFile.toPath())) {
6268

6369
Configuration config = configParser.parseConfiguration(stream);
64-
printBannerAndVersions();
65-
try (KafkaProxy kafkaProxy = proxyBuilder.apply(configParser, config)) {
70+
Features features = getFeatures();
71+
printBannerAndVersions(features);
72+
try (KafkaProxy kafkaProxy = proxyBuilder.build(configParser, config, features)) {
6673
kafkaProxy.startup();
6774
kafkaProxy.block();
6875
}
@@ -75,12 +82,29 @@ public Integer call() throws Exception {
7582
return 0;
7683
}
7784

78-
private static void printBannerAndVersions() throws Exception {
85+
private static boolean isExplicitlyEnabled(Feature feature) {
86+
String variableName = "KROXYLICIOUS_UNLOCK_" + feature.name();
87+
String enabledString = System.getProperty(variableName, System.getenv(variableName));
88+
return Boolean.parseBoolean(enabledString);
89+
}
90+
91+
private static Features getFeatures() {
92+
Features.FeaturesBuilder builder = Features.builder();
93+
Arrays.stream(Feature.values()).forEach(f -> {
94+
if (isExplicitlyEnabled(f)) {
95+
builder.enable(f);
96+
}
97+
});
98+
return builder.build();
99+
}
100+
101+
private static void printBannerAndVersions(Features features) throws Exception {
79102
new BannerLogger().log();
80103
String[] versions = new VersionProvider().getVersion();
81104
for (String version : versions) {
82105
LOGGER.info("{}", version);
83106
}
107+
features.warnings().forEach(LOGGER::warn);
84108
LOGGER.atInfo()
85109
.setMessage("Platform: Java {}({}) running on {} {}/{}")
86110
.addArgument(Runtime::version)

kroxylicious-app/src/test/java/io/kroxylicious/app/KroxyliciousIT.java

Lines changed: 130 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,24 @@
66
package io.kroxylicious.app;
77

88
import java.io.File;
9+
import java.io.IOException;
10+
import java.nio.charset.StandardCharsets;
911
import java.nio.file.Files;
1012
import java.nio.file.Path;
1113
import java.time.Duration;
14+
import java.util.ArrayList;
1215
import java.util.List;
1316
import java.util.Map;
1417
import java.util.Set;
18+
import java.util.concurrent.ExecutionException;
1519
import java.util.concurrent.TimeUnit;
16-
import java.util.function.Function;
20+
import java.util.function.BiFunction;
1721

1822
import org.apache.kafka.clients.admin.Admin;
1923
import org.apache.kafka.clients.admin.NewTopic;
24+
import org.apache.kafka.clients.consumer.Consumer;
2025
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.clients.producer.Producer;
2127
import org.apache.kafka.clients.producer.ProducerConfig;
2228
import org.apache.kafka.clients.producer.ProducerRecord;
2329
import org.junit.jupiter.api.Test;
@@ -26,12 +32,15 @@
2632

2733
import io.kroxylicious.proxy.config.ConfigParser;
2834
import io.kroxylicious.proxy.config.Configuration;
29-
import io.kroxylicious.proxy.service.HostPort;
35+
import io.kroxylicious.proxy.internal.config.Feature;
36+
import io.kroxylicious.proxy.internal.config.Features;
3037
import io.kroxylicious.testing.kafka.api.KafkaCluster;
3138
import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension;
3239

3340
import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy;
3441
import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester;
42+
import static io.kroxylicious.test.tester.KroxyliciousTesters.newBuilder;
43+
import static org.assertj.core.api.Assertions.assertThat;
3544
import static org.junit.jupiter.api.Assertions.assertEquals;
3645

3746
/**
@@ -45,9 +54,73 @@ class KroxyliciousIT {
4554
private static final String PLAINTEXT = "Hello, world!";
4655

4756
@Test
48-
void shouldProxyWhenRunAsStandaloneProcess(KafkaCluster cluster, Admin admin, @TempDir Path tempDir) throws Exception {
49-
var proxyAddress = HostPort.parse("localhost:9192");
57+
void shouldFailToStartWithTestConfigurationsByDefault(@TempDir Path tempDir) throws IOException {
58+
SubprocessKroxyliciousFactory kroxyliciousFactory = new SubprocessKroxyliciousFactory(tempDir, (features, processBuilder) -> {
59+
// no-op so that io is not inherited
60+
}, List.of());
61+
var tester = kroxyliciousTester(proxy("fake:9092").withDevelopment(Map.of("a", "b")), kroxyliciousFactory);
62+
Process lastProcess = kroxyliciousFactory.lastProcess;
63+
assertThat(lastProcess).isNotNull();
64+
assertThat(lastProcess.onExit()).succeedsWithin(5, TimeUnit.SECONDS);
65+
byte[] bytes = lastProcess.getInputStream().readAllBytes();
66+
String output = new String(bytes, StandardCharsets.UTF_8);
67+
assertThat(output).contains("test-only configuration for proxy present, but loading test-only configuration not enabled");
68+
tester.close();
69+
}
70+
71+
@Test
72+
void shouldFailToStartWithTestConfigurationAndLoadTestConfigurationExplicitlyDisabled(@TempDir Path tempDir) throws IOException {
73+
SubprocessKroxyliciousFactory kroxyliciousFactory = new SubprocessKroxyliciousFactory(tempDir, (features, processBuilder) -> {
74+
processBuilder.environment().put(prefixUnlockPropertyName(Feature.TEST_ONLY_CONFIGURATION), "false");
75+
}, List.of());
76+
var tester = kroxyliciousTester(proxy("fake:9092").withDevelopment(Map.of("a", "b")), kroxyliciousFactory);
77+
Process lastProcess = kroxyliciousFactory.lastProcess;
78+
assertThat(lastProcess).isNotNull();
79+
assertThat(lastProcess.onExit()).succeedsWithin(5, TimeUnit.SECONDS);
80+
byte[] bytes = lastProcess.getInputStream().readAllBytes();
81+
String output = new String(bytes, StandardCharsets.UTF_8);
82+
assertThat(output).contains("test-only configuration for proxy present, but loading test-only configuration not enabled");
83+
tester.close();
84+
}
85+
86+
@Test
87+
void shouldStartWithTestConfigurationsFeatureEnabledByEnvironmentVariable(KafkaCluster cluster, Admin admin, @TempDir Path tempDir) throws Exception {
88+
admin.createTopics(List.of(
89+
new NewTopic(TOPIC_1, 1, (short) 1),
90+
new NewTopic(TOPIC_2, 1, (short) 1))).all().get();
91+
92+
try (var tester = newBuilder(proxy(cluster).withDevelopment(Map.of("a", "b")))
93+
.setKroxyliciousFactory(new SubprocessKroxyliciousFactory(tempDir))
94+
.setFeatures(Features.builder().enable(Feature.TEST_ONLY_CONFIGURATION).build())
95+
.createDefaultKroxyliciousTester();
96+
var producer = tester.producer(Map.of(
97+
ProducerConfig.CLIENT_ID_CONFIG, "shouldModifyProduceMessage",
98+
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
99+
var consumer = tester.consumer()) {
100+
assertProxies(producer, consumer);
101+
}
102+
}
103+
104+
@Test
105+
void shouldStartWithTestConfigurationsFeatureEnabledBySystemProperty(KafkaCluster cluster, Admin admin, @TempDir Path tempDir) throws Exception {
106+
admin.createTopics(List.of(
107+
new NewTopic(TOPIC_1, 1, (short) 1),
108+
new NewTopic(TOPIC_2, 1, (short) 1))).all().get();
50109

110+
try (var tester = newBuilder(proxy(cluster).withDevelopment(Map.of("a", "b")))
111+
.setKroxyliciousFactory(new SubprocessKroxyliciousFactory(tempDir, (features, processBuilder) -> processBuilder.inheritIO(),
112+
List.of("-D" + prefixUnlockPropertyName(Feature.TEST_ONLY_CONFIGURATION) + "=true")))
113+
.createDefaultKroxyliciousTester();
114+
var producer = tester.producer(Map.of(
115+
ProducerConfig.CLIENT_ID_CONFIG, "shouldModifyProduceMessage",
116+
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
117+
var consumer = tester.consumer()) {
118+
assertProxies(producer, consumer);
119+
}
120+
}
121+
122+
@Test
123+
void shouldProxyWhenRunAsStandaloneProcess(KafkaCluster cluster, Admin admin, @TempDir Path tempDir) throws Exception {
51124
admin.createTopics(List.of(
52125
new NewTopic(TOPIC_1, 1, (short) 1),
53126
new NewTopic(TOPIC_2, 1, (short) 1))).all().get();
@@ -57,36 +130,70 @@ void shouldProxyWhenRunAsStandaloneProcess(KafkaCluster cluster, Admin admin, @T
57130
ProducerConfig.CLIENT_ID_CONFIG, "shouldModifyProduceMessage",
58131
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
59132
var consumer = tester.consumer()) {
60-
producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get();
61-
producer.send(new ProducerRecord<>(TOPIC_2, "my-key", PLAINTEXT)).get();
62-
producer.flush();
63-
64-
consumer.subscribe(Set.of(TOPIC_1));
65-
ConsumerRecords<String, String> records1 = consumer.poll(Duration.ofSeconds(10));
66-
consumer.subscribe(Set.of(TOPIC_2));
67-
ConsumerRecords<String, String> records2 = consumer.poll(Duration.ofSeconds(10));
68-
69-
assertEquals(1, records1.count());
70-
assertEquals(PLAINTEXT, records1.iterator().next().value());
71-
assertEquals(1, records2.count());
72-
assertEquals(PLAINTEXT, records2.iterator().next().value());
133+
assertProxies(producer, consumer);
73134
}
74135
}
75136

76-
private record SubprocessKroxyliciousFactory(Path tempDir) implements Function<Configuration, AutoCloseable> {
137+
private static void assertProxies(Producer<String, String> producer, Consumer<String, String> consumer)
138+
throws InterruptedException, ExecutionException {
139+
producer.send(new ProducerRecord<>(KroxyliciousIT.TOPIC_1, "my-key", KroxyliciousIT.PLAINTEXT)).get();
140+
producer.send(new ProducerRecord<>(KroxyliciousIT.TOPIC_2, "my-key", KroxyliciousIT.PLAINTEXT)).get();
141+
producer.flush();
142+
143+
consumer.subscribe(Set.of(KroxyliciousIT.TOPIC_1));
144+
ConsumerRecords<String, String> records1 = consumer.poll(Duration.ofSeconds(10));
145+
consumer.subscribe(Set.of(KroxyliciousIT.TOPIC_2));
146+
ConsumerRecords<String, String> records2 = consumer.poll(Duration.ofSeconds(10));
147+
148+
assertEquals(1, records1.count());
149+
assertEquals(KroxyliciousIT.PLAINTEXT, records1.iterator().next().value());
150+
assertEquals(1, records2.count());
151+
assertEquals(KroxyliciousIT.PLAINTEXT, records2.iterator().next().value());
152+
}
153+
154+
private static String prefixUnlockPropertyName(Feature feature) {
155+
return "KROXYLICIOUS_UNLOCK_" + feature.name();
156+
}
157+
158+
private static class SubprocessKroxyliciousFactory implements BiFunction<Configuration, Features, AutoCloseable> {
159+
160+
private final Path tempDir;
161+
private final java.util.function.BiConsumer<Features, ProcessBuilder> processBuilderModifier;
162+
private final List<String> jvmArgs;
163+
private Process lastProcess;
164+
165+
SubprocessKroxyliciousFactory(Path tempDir) {
166+
this(tempDir, (features, processBuilder) -> {
167+
processBuilder.inheritIO();
168+
if (features.isEnabled(Feature.TEST_ONLY_CONFIGURATION)) {
169+
processBuilder.environment().put(prefixUnlockPropertyName(Feature.TEST_ONLY_CONFIGURATION), "true");
170+
}
171+
}, List.of());
172+
}
173+
174+
SubprocessKroxyliciousFactory(Path tempDir, java.util.function.BiConsumer<Features, ProcessBuilder> processBuilderModifier, List<String> jvmArgs) {
175+
this.tempDir = tempDir;
176+
this.processBuilderModifier = processBuilderModifier;
177+
this.jvmArgs = jvmArgs;
178+
}
77179

78180
@Override
79-
public AutoCloseable apply(Configuration config) {
181+
public AutoCloseable apply(Configuration config, Features features) {
80182
try {
81183
Path configPath = tempDir.resolve("config.yaml");
82184
Files.writeString(configPath, new ConfigParser().toYaml(config));
83185
String java = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
84186
String classpath = System.getProperty("java.class.path");
85-
var processBuilder = new ProcessBuilder(java, "-cp", classpath, "io.kroxylicious.app.Kroxylicious", "-c", configPath.toString()).inheritIO();
86-
Process start = processBuilder.start();
187+
List<String> command = new ArrayList<>();
188+
command.add(java);
189+
command.addAll(jvmArgs);
190+
command.addAll(List.of("-cp", classpath, "io.kroxylicious.app.Kroxylicious", "-c", configPath.toString()));
191+
var processBuilder = new ProcessBuilder(command);
192+
processBuilderModifier.accept(features, processBuilder);
193+
lastProcess = processBuilder.start();
87194
return () -> {
88-
start.destroy();
89-
start.onExit().get(10, TimeUnit.SECONDS);
195+
lastProcess.destroy();
196+
lastProcess.onExit().get(10, TimeUnit.SECONDS);
90197
};
91198
}
92199
catch (Exception e) {

kroxylicious-app/src/test/java/io/kroxylicious/app/KroxyliciousTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.StringWriter;
1313
import java.nio.file.Files;
1414
import java.nio.file.Path;
15+
import java.util.concurrent.atomic.AtomicReference;
1516
import java.util.regex.Pattern;
1617

1718
import org.junit.jupiter.api.BeforeEach;
@@ -23,6 +24,7 @@
2324
import org.mockito.junit.jupiter.MockitoExtension;
2425

2526
import io.kroxylicious.proxy.KafkaProxy;
27+
import io.kroxylicious.proxy.internal.config.Features;
2628

2729
import picocli.CommandLine;
2830

@@ -41,10 +43,16 @@ class KroxyliciousTest {
4143
private CommandLine cmd;
4244
private StringWriter soutWriter;
4345
private StringWriter serrWriter;
46+
private AtomicReference<Features> features = new AtomicReference<>(null);
4447

4548
@BeforeEach
4649
public void setup() {
47-
Kroxylicious app = new Kroxylicious((ffm, configuration) -> mockProxy);
50+
Kroxylicious app = new Kroxylicious((ffm, configuration, features) -> {
51+
if (!this.features.compareAndSet(null, features)) {
52+
throw new IllegalStateException("env already set");
53+
}
54+
return mockProxy;
55+
});
4856
soutWriter = new StringWriter();
4957
serrWriter = new StringWriter();
5058
cmd = new CommandLine(app);
@@ -87,6 +95,15 @@ void testKroxyliciousStartsAndThenTerminates(@TempDir Path dir) throws Exception
8795
assertEquals(0, cmd.execute("-c", file.toString()));
8896
}
8997

98+
@Test
99+
void testDefaultFeatures(@TempDir Path dir) throws Exception {
100+
Path file = copyClasspathResourceToTempFileInDir("proxy-config.yaml", dir);
101+
when(mockProxy.startup()).thenReturn(mockProxy);
102+
doNothing().when(mockProxy).block();
103+
assertEquals(0, cmd.execute("-c", file.toString()));
104+
assertThat(features).hasValue(Features.defaultFeatures());
105+
}
106+
90107
@Test
91108
void testKroxyliciousExceptionOnBlock(@TempDir Path dir) throws Exception {
92109
Path file = copyClasspathResourceToTempFileInDir("proxy-config.yaml", dir);

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/tester/DefaultKroxyliciousTester.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.kroxylicious.proxy.config.ServiceBasedPluginFactoryRegistry;
4141
import io.kroxylicious.proxy.config.VirtualCluster;
4242
import io.kroxylicious.proxy.config.tls.Tls;
43+
import io.kroxylicious.proxy.internal.config.Features;
4344
import io.kroxylicious.test.client.KafkaClient;
4445

4546
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -220,7 +221,7 @@ public <U, V> Consumer<U, V> consumer(String virtualCluster, Serde<U> keySerde,
220221
public void restartProxy() {
221222
try {
222223
proxy.close();
223-
proxy = spawnProxy(kroxyliciousConfig);
224+
proxy = spawnProxy(kroxyliciousConfig, Features.defaultFeatures());
224225
}
225226
catch (Exception e) {
226227
throw new IllegalStateException(e);
@@ -257,8 +258,8 @@ private static Optional<Exception> closeCloseable(Closeable c) {
257258
}
258259
}
259260

260-
static KafkaProxy spawnProxy(Configuration config) {
261-
KafkaProxy kafkaProxy = new KafkaProxy(new ServiceBasedPluginFactoryRegistry(), config);
261+
static KafkaProxy spawnProxy(Configuration config, Features features) {
262+
KafkaProxy kafkaProxy = new KafkaProxy(new ServiceBasedPluginFactoryRegistry(), config, features);
262263
try {
263264
kafkaProxy.startup();
264265
}

0 commit comments

Comments
 (0)