2121public class YdbDockerContainer extends GenericContainer <YdbDockerContainer > {
2222 public static final int DEFAULT_SECURE_PORT = 2135 ;
2323 public static final int DEFAULT_INSECURE_PORT = 2136 ;
24+ public static final int DEFAULT_KAFKA_PORT = 9092 ;
2425
2526 private final YdbEnvironment env ;
2627 private final int grpcsPort ; // Secure connection
2728 private final int grpcPort ; // Non secure connection
29+ private final int kafkaPort ; // Non secure kafka port
2830
2931 public YdbDockerContainer (YdbEnvironment env , PortsGenerator portGenerator ) {
3032 super (env .dockerImage ());
@@ -33,20 +35,24 @@ public YdbDockerContainer(YdbEnvironment env, PortsGenerator portGenerator) {
3335 if (env .useDockerIsolation ()) {
3436 this .grpcsPort = DEFAULT_SECURE_PORT ;
3537 this .grpcPort = DEFAULT_INSECURE_PORT ;
38+ this .kafkaPort = DEFAULT_KAFKA_PORT ;
3639 } else {
3740 this .grpcsPort = portGenerator .findAvailablePort ();
3841 this .grpcPort = portGenerator .findAvailablePort ();
42+ this .kafkaPort = portGenerator .findAvailablePort ();
3943 }
4044 }
4145
4246 public void init () {
4347 addExposedPort (grpcPort );
4448 addExposedPort (grpcsPort );
4549
50+ withEnv ("YDB_KAFKA_PROXY_PORT" , String .valueOf (kafkaPort ));
4651 if (!env .useDockerIsolation ()) {
4752 // Host ports and container ports MUST BE equal - ydb implementation limitation
4853 addFixedExposedPort (grpcsPort , grpcsPort );
4954 addFixedExposedPort (grpcPort , grpcPort );
55+ addFixedExposedPort (kafkaPort , kafkaPort );
5056
5157 withEnv ("GRPC_PORT" , String .valueOf (grpcPort ));
5258 withEnv ("GRPC_TLS_PORT" , String .valueOf (grpcsPort ));
@@ -85,6 +91,10 @@ public EndpointRecord secureEndpoint() {
8591 return new EndpointRecord (getHost (), getMappedPort (grpcsPort ));
8692 }
8793
94+ public String nonSecureKafkaEndpoint (){
95+ return getHost () + ":" + getMappedPort (kafkaPort );
96+ }
97+
8898 public byte [] pemCert () {
8999 return copyFileFromContainer (env .dockerPemPath (), is -> {
90100 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
0 commit comments