66package io .kroxylicious .app ;
77
88import java .io .File ;
9+ import java .io .IOException ;
10+ import java .nio .charset .StandardCharsets ;
911import java .nio .file .Files ;
1012import java .nio .file .Path ;
1113import java .time .Duration ;
14+ import java .util .ArrayList ;
1215import java .util .List ;
1316import java .util .Map ;
1417import java .util .Set ;
18+ import java .util .concurrent .ExecutionException ;
1519import java .util .concurrent .TimeUnit ;
16- import java .util .function .Function ;
20+ import java .util .function .BiFunction ;
1721
1822import org .apache .kafka .clients .admin .Admin ;
1923import org .apache .kafka .clients .admin .NewTopic ;
24+ import org .apache .kafka .clients .consumer .Consumer ;
2025import org .apache .kafka .clients .consumer .ConsumerRecords ;
26+ import org .apache .kafka .clients .producer .Producer ;
2127import org .apache .kafka .clients .producer .ProducerConfig ;
2228import org .apache .kafka .clients .producer .ProducerRecord ;
2329import org .junit .jupiter .api .Test ;
2632
2733import io .kroxylicious .proxy .config .ConfigParser ;
2834import io .kroxylicious .proxy .config .Configuration ;
29- import io .kroxylicious .proxy .service .HostPort ;
35+ import io .kroxylicious .proxy .internal .config .Feature ;
36+ import io .kroxylicious .proxy .internal .config .Features ;
3037import io .kroxylicious .testing .kafka .api .KafkaCluster ;
3138import io .kroxylicious .testing .kafka .junit5ext .KafkaClusterExtension ;
3239
3340import static io .kroxylicious .test .tester .KroxyliciousConfigUtils .proxy ;
3441import static io .kroxylicious .test .tester .KroxyliciousTesters .kroxyliciousTester ;
42+ import static io .kroxylicious .test .tester .KroxyliciousTesters .newBuilder ;
43+ import static org .assertj .core .api .Assertions .assertThat ;
3544import static org .junit .jupiter .api .Assertions .assertEquals ;
3645
3746/**
@@ -45,9 +54,73 @@ class KroxyliciousIT {
4554 private static final String PLAINTEXT = "Hello, world!" ;
4655
4756 @ Test
48- void shouldProxyWhenRunAsStandaloneProcess (KafkaCluster cluster , Admin admin , @ TempDir Path tempDir ) throws Exception {
49- var proxyAddress = HostPort .parse ("localhost:9192" );
57+ void shouldFailToStartWithTestConfigurationsByDefault (@ TempDir Path tempDir ) throws IOException {
58+ SubprocessKroxyliciousFactory kroxyliciousFactory = new SubprocessKroxyliciousFactory (tempDir , (features , processBuilder ) -> {
59+ // no-op so that io is not inherited
60+ }, List .of ());
61+ var tester = kroxyliciousTester (proxy ("fake:9092" ).withDevelopment (Map .of ("a" , "b" )), kroxyliciousFactory );
62+ Process lastProcess = kroxyliciousFactory .lastProcess ;
63+ assertThat (lastProcess ).isNotNull ();
64+ assertThat (lastProcess .onExit ()).succeedsWithin (5 , TimeUnit .SECONDS );
65+ byte [] bytes = lastProcess .getInputStream ().readAllBytes ();
66+ String output = new String (bytes , StandardCharsets .UTF_8 );
67+ assertThat (output ).contains ("test-only configuration for proxy present, but loading test-only configuration not enabled" );
68+ tester .close ();
69+ }
70+
71+ @ Test
72+ void shouldFailToStartWithTestConfigurationAndLoadTestConfigurationExplicitlyDisabled (@ TempDir Path tempDir ) throws IOException {
73+ SubprocessKroxyliciousFactory kroxyliciousFactory = new SubprocessKroxyliciousFactory (tempDir , (features , processBuilder ) -> {
74+ processBuilder .environment ().put (prefixUnlockPropertyName (Feature .TEST_ONLY_CONFIGURATION ), "false" );
75+ }, List .of ());
76+ var tester = kroxyliciousTester (proxy ("fake:9092" ).withDevelopment (Map .of ("a" , "b" )), kroxyliciousFactory );
77+ Process lastProcess = kroxyliciousFactory .lastProcess ;
78+ assertThat (lastProcess ).isNotNull ();
79+ assertThat (lastProcess .onExit ()).succeedsWithin (5 , TimeUnit .SECONDS );
80+ byte [] bytes = lastProcess .getInputStream ().readAllBytes ();
81+ String output = new String (bytes , StandardCharsets .UTF_8 );
82+ assertThat (output ).contains ("test-only configuration for proxy present, but loading test-only configuration not enabled" );
83+ tester .close ();
84+ }
85+
86+ @ Test
87+ void shouldStartWithTestConfigurationsFeatureEnabledByEnvironmentVariable (KafkaCluster cluster , Admin admin , @ TempDir Path tempDir ) throws Exception {
88+ admin .createTopics (List .of (
89+ new NewTopic (TOPIC_1 , 1 , (short ) 1 ),
90+ new NewTopic (TOPIC_2 , 1 , (short ) 1 ))).all ().get ();
91+
92+ try (var tester = newBuilder (proxy (cluster ).withDevelopment (Map .of ("a" , "b" )))
93+ .setKroxyliciousFactory (new SubprocessKroxyliciousFactory (tempDir ))
94+ .setFeatures (Features .builder ().enable (Feature .TEST_ONLY_CONFIGURATION ).build ())
95+ .createDefaultKroxyliciousTester ();
96+ var producer = tester .producer (Map .of (
97+ ProducerConfig .CLIENT_ID_CONFIG , "shouldModifyProduceMessage" ,
98+ ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3_600_000 ));
99+ var consumer = tester .consumer ()) {
100+ assertProxies (producer , consumer );
101+ }
102+ }
103+
104+ @ Test
105+ void shouldStartWithTestConfigurationsFeatureEnabledBySystemProperty (KafkaCluster cluster , Admin admin , @ TempDir Path tempDir ) throws Exception {
106+ admin .createTopics (List .of (
107+ new NewTopic (TOPIC_1 , 1 , (short ) 1 ),
108+ new NewTopic (TOPIC_2 , 1 , (short ) 1 ))).all ().get ();
50109
110+ try (var tester = newBuilder (proxy (cluster ).withDevelopment (Map .of ("a" , "b" )))
111+ .setKroxyliciousFactory (new SubprocessKroxyliciousFactory (tempDir , (features , processBuilder ) -> processBuilder .inheritIO (),
112+ List .of ("-D" + prefixUnlockPropertyName (Feature .TEST_ONLY_CONFIGURATION ) + "=true" )))
113+ .createDefaultKroxyliciousTester ();
114+ var producer = tester .producer (Map .of (
115+ ProducerConfig .CLIENT_ID_CONFIG , "shouldModifyProduceMessage" ,
116+ ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3_600_000 ));
117+ var consumer = tester .consumer ()) {
118+ assertProxies (producer , consumer );
119+ }
120+ }
121+
122+ @ Test
123+ void shouldProxyWhenRunAsStandaloneProcess (KafkaCluster cluster , Admin admin , @ TempDir Path tempDir ) throws Exception {
51124 admin .createTopics (List .of (
52125 new NewTopic (TOPIC_1 , 1 , (short ) 1 ),
53126 new NewTopic (TOPIC_2 , 1 , (short ) 1 ))).all ().get ();
@@ -57,36 +130,70 @@ void shouldProxyWhenRunAsStandaloneProcess(KafkaCluster cluster, Admin admin, @T
57130 ProducerConfig .CLIENT_ID_CONFIG , "shouldModifyProduceMessage" ,
58131 ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 3_600_000 ));
59132 var consumer = tester .consumer ()) {
60- producer .send (new ProducerRecord <>(TOPIC_1 , "my-key" , PLAINTEXT )).get ();
61- producer .send (new ProducerRecord <>(TOPIC_2 , "my-key" , PLAINTEXT )).get ();
62- producer .flush ();
63-
64- consumer .subscribe (Set .of (TOPIC_1 ));
65- ConsumerRecords <String , String > records1 = consumer .poll (Duration .ofSeconds (10 ));
66- consumer .subscribe (Set .of (TOPIC_2 ));
67- ConsumerRecords <String , String > records2 = consumer .poll (Duration .ofSeconds (10 ));
68-
69- assertEquals (1 , records1 .count ());
70- assertEquals (PLAINTEXT , records1 .iterator ().next ().value ());
71- assertEquals (1 , records2 .count ());
72- assertEquals (PLAINTEXT , records2 .iterator ().next ().value ());
133+ assertProxies (producer , consumer );
73134 }
74135 }
75136
76- private record SubprocessKroxyliciousFactory (Path tempDir ) implements Function <Configuration , AutoCloseable > {
137+ private static void assertProxies (Producer <String , String > producer , Consumer <String , String > consumer )
138+ throws InterruptedException , ExecutionException {
139+ producer .send (new ProducerRecord <>(KroxyliciousIT .TOPIC_1 , "my-key" , KroxyliciousIT .PLAINTEXT )).get ();
140+ producer .send (new ProducerRecord <>(KroxyliciousIT .TOPIC_2 , "my-key" , KroxyliciousIT .PLAINTEXT )).get ();
141+ producer .flush ();
142+
143+ consumer .subscribe (Set .of (KroxyliciousIT .TOPIC_1 ));
144+ ConsumerRecords <String , String > records1 = consumer .poll (Duration .ofSeconds (10 ));
145+ consumer .subscribe (Set .of (KroxyliciousIT .TOPIC_2 ));
146+ ConsumerRecords <String , String > records2 = consumer .poll (Duration .ofSeconds (10 ));
147+
148+ assertEquals (1 , records1 .count ());
149+ assertEquals (KroxyliciousIT .PLAINTEXT , records1 .iterator ().next ().value ());
150+ assertEquals (1 , records2 .count ());
151+ assertEquals (KroxyliciousIT .PLAINTEXT , records2 .iterator ().next ().value ());
152+ }
153+
154+ private static String prefixUnlockPropertyName (Feature feature ) {
155+ return "KROXYLICIOUS_UNLOCK_" + feature .name ();
156+ }
157+
158+ private static class SubprocessKroxyliciousFactory implements BiFunction <Configuration , Features , AutoCloseable > {
159+
160+ private final Path tempDir ;
161+ private final java .util .function .BiConsumer <Features , ProcessBuilder > processBuilderModifier ;
162+ private final List <String > jvmArgs ;
163+ private Process lastProcess ;
164+
165+ SubprocessKroxyliciousFactory (Path tempDir ) {
166+ this (tempDir , (features , processBuilder ) -> {
167+ processBuilder .inheritIO ();
168+ if (features .isEnabled (Feature .TEST_ONLY_CONFIGURATION )) {
169+ processBuilder .environment ().put (prefixUnlockPropertyName (Feature .TEST_ONLY_CONFIGURATION ), "true" );
170+ }
171+ }, List .of ());
172+ }
173+
174+ SubprocessKroxyliciousFactory (Path tempDir , java .util .function .BiConsumer <Features , ProcessBuilder > processBuilderModifier , List <String > jvmArgs ) {
175+ this .tempDir = tempDir ;
176+ this .processBuilderModifier = processBuilderModifier ;
177+ this .jvmArgs = jvmArgs ;
178+ }
77179
78180 @ Override
79- public AutoCloseable apply (Configuration config ) {
181+ public AutoCloseable apply (Configuration config , Features features ) {
80182 try {
81183 Path configPath = tempDir .resolve ("config.yaml" );
82184 Files .writeString (configPath , new ConfigParser ().toYaml (config ));
83185 String java = System .getProperty ("java.home" ) + File .separator + "bin" + File .separator + "java" ;
84186 String classpath = System .getProperty ("java.class.path" );
85- var processBuilder = new ProcessBuilder (java , "-cp" , classpath , "io.kroxylicious.app.Kroxylicious" , "-c" , configPath .toString ()).inheritIO ();
86- Process start = processBuilder .start ();
187+ List <String > command = new ArrayList <>();
188+ command .add (java );
189+ command .addAll (jvmArgs );
190+ command .addAll (List .of ("-cp" , classpath , "io.kroxylicious.app.Kroxylicious" , "-c" , configPath .toString ()));
191+ var processBuilder = new ProcessBuilder (command );
192+ processBuilderModifier .accept (features , processBuilder );
193+ lastProcess = processBuilder .start ();
87194 return () -> {
88- start .destroy ();
89- start .onExit ().get (10 , TimeUnit .SECONDS );
195+ lastProcess .destroy ();
196+ lastProcess .onExit ().get (10 , TimeUnit .SECONDS );
90197 };
91198 }
92199 catch (Exception e ) {
0 commit comments