@@ -3,31 +3,45 @@ package integrations.kafka
33import org.testcontainers.containers.GenericContainer
44import org.testcontainers.containers.wait.strategy.Wait
55import org.testcontainers.containers.KafkaContainer
6+ import org.testcontainers.containers.KafkaContainer.KAFKA_PORT
7+ import org.testcontainers.containers.KafkaContainer.ZOOKEEPER_PORT
68import org.testcontainers.containers.Network
9+ import org.testcontainers.containers.SocatContainer
10+ import java.util.stream.Stream
711
812
913class SchemaRegistryContainer (version : String ): GenericContainer<SchemaRegistryContainer>(" confluentinc/cp-schema-registry:$version " ) {
1014
11- override fun start () {
12- withExposedPorts(PORT )
13- waitingFor(Wait .forHttp(" /subjects" ).forStatusCode(200 ))
14- super .start()
15+ private var proxy: SocatContainer ? = null
16+
17+ override fun doStart () {
18+ val networkAlias = networkAliases[0 ]
19+ proxy = SocatContainer ()
20+ .withNetwork(network)
21+ .withTarget(PORT , networkAlias)
22+
23+ proxy?.start()
24+ waitingFor(Wait .forHttp(" /subjects" )
25+ .forStatusCode(200 ))
26+ super .doStart()
1527 }
1628
1729 fun withKafka (kafka : KafkaContainer ): SchemaRegistryContainer {
18- return withKafka(kafka.network, kafka.networkAliases[ 0 ] + " : 9092" )
30+ return withKafka(kafka.network, kafka.networkAliases.map { " PLAINTEXT:// $it : 9092" }.joinToString( " , " ) )
1931 }
2032
2133 fun withKafka (network : Network , bootstrapServers : String ): SchemaRegistryContainer {
2234 withNetwork(network)
2335 withEnv(" SCHEMA_REGISTRY_HOST_NAME" , " schema-registry" )
24- withEnv(" SCHEMA_REGISTRY_LISTENERS" , " http://0.0.0.0:8081" )
25- withEnv(" SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" , " PLAINTEXT://$bootstrapServers " )
36+ withEnv(" SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" , bootstrapServers)
2637 return self()
2738 }
2839
40+ fun getSchemaRegistryUrl () = " http://${proxy?.containerIpAddress} :${proxy?.firstMappedPort} "
2941
30- fun getSchemaRegistryUrl () = " http://localhost:${getMappedPort(PORT )} "
42+ override fun stop () {
43+ Stream .of(Runnable { super .stop() }, Runnable { proxy?.stop() }).parallel().forEach { it.run () }
44+ }
3145
3246 companion object {
3347 @JvmStatic val PORT = 8081
0 commit comments