Skip to content

Commit bd919df

Browse files
authored
Avoid duplicate values for listeners and listener_security_protocol_map in KafkaContainer (#8850)
Currently, when KafkaContainer is started more than one time then `KAFKA_LISTENERS` and `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP` registers an additional entry, which is duplicated. Fixes #8619
1 parent b4b1c20 commit bd919df

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,8 @@ void withClusterId(String clusterId) {
323323
void withRaft() {
324324
this.envVars.computeIfAbsent("CLUSTER_ID", key -> clusterId);
325325
this.envVars.computeIfAbsent("KAFKA_NODE_ID", key -> getEnvVars().get("KAFKA_BROKER_ID"));
326-
addEnvVar(
327-
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
328-
String.format("%s,CONTROLLER:PLAINTEXT", getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
329-
);
330-
addEnvVar(
331-
"KAFKA_LISTENERS",
332-
String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvVars().get("KAFKA_LISTENERS"))
333-
);
326+
addEnvVar("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap());
327+
addEnvVar("KAFKA_LISTENERS", kafkaListeners());
334328
addEnvVar("KAFKA_PROCESS_ROLES", "broker,controller");
335329

336330
String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
@@ -345,5 +339,24 @@ void withRaft() {
345339

346340
setWaitStrategy(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
347341
}
342+
343+
private String kafkaListenerSecurityProtocolMap() {
344+
String kafkaListenerSecurityProtocolMapEnvVar = getEnvVars().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP");
345+
String kafkaListenerSecurityProtocolMap = String.format(
346+
"%s,CONTROLLER:PLAINTEXT",
347+
kafkaListenerSecurityProtocolMapEnvVar
348+
);
349+
Set<String> listenerSecurityProtocolMap = new HashSet<>(
350+
Arrays.asList(kafkaListenerSecurityProtocolMap.split(","))
351+
);
352+
return String.join(",", listenerSecurityProtocolMap);
353+
}
354+
355+
private String kafkaListeners() {
356+
String kafkaListenersEnvVar = getEnvVars().get("KAFKA_LISTENERS");
357+
String kafkaListeners = String.format("%s,CONTROLLER://0.0.0.0:9094", kafkaListenersEnvVar);
358+
Set<String> listeners = new HashSet<>(Arrays.asList(kafkaListeners.split(",")));
359+
return String.join(",", listeners);
360+
}
348361
}
349362
}

0 commit comments

Comments
 (0)