2121public class YdbDockerContainer extends GenericContainer <YdbDockerContainer > {
2222 public static final int DEFAULT_SECURE_PORT = 2135 ;
2323 public static final int DEFAULT_INSECURE_PORT = 2136 ;
24+ public static final int DEFAULT_KAFKA_PORT = 9092 ;
2425
2526 private final YdbEnvironment env ;
2627 private final int grpcsPort ; // Secure connection
2728 private final int grpcPort ; // Non secure connection
29+ private final int kafkaPort ; // Non secure kafka port
2830
2931 public YdbDockerContainer (YdbEnvironment env , PortsGenerator portGenerator ) {
3032 super (env .dockerImage ());
@@ -33,20 +35,25 @@ public YdbDockerContainer(YdbEnvironment env, PortsGenerator portGenerator) {
3335 if (env .useDockerIsolation ()) {
3436 this .grpcsPort = DEFAULT_SECURE_PORT ;
3537 this .grpcPort = DEFAULT_INSECURE_PORT ;
38+ this .kafkaPort = DEFAULT_KAFKA_PORT ;
3639 } else {
3740 this .grpcsPort = portGenerator .findAvailablePort ();
3841 this .grpcPort = portGenerator .findAvailablePort ();
42+ this .kafkaPort = portGenerator .findAvailablePort ();
3943 }
4044 }
4145
4246 public void init () {
4347 addExposedPort (grpcPort );
4448 addExposedPort (grpcsPort );
49+ addExposedPort (kafkaPort );
4550
51+ withEnv ("YDB_KAFKA_PROXY_PORT" , String .valueOf (kafkaPort ));
4652 if (!env .useDockerIsolation ()) {
4753 // Host ports and container ports MUST BE equal - ydb implementation limitation
4854 addFixedExposedPort (grpcsPort , grpcsPort );
4955 addFixedExposedPort (grpcPort , grpcPort );
56+ addFixedExposedPort (kafkaPort , kafkaPort );
5057
5158 withEnv ("GRPC_PORT" , String .valueOf (grpcPort ));
5259 withEnv ("GRPC_TLS_PORT" , String .valueOf (grpcsPort ));
@@ -85,6 +92,10 @@ public EndpointRecord secureEndpoint() {
8592 return new EndpointRecord (getHost (), getMappedPort (grpcsPort ));
8693 }
8794
95+ public String nonSecureKafkaEndpoint () {
96+ return getHost () + ":" + getMappedPort (kafkaPort );
97+ }
98+
8899 public byte [] pemCert () {
89100 return copyFileFromContainer (env .dockerPemPath (), is -> {
90101 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
0 commit comments