@@ -44,6 +44,7 @@ import java.nio.file.Files
4444import java.nio.file.Paths
4545import java.time.Duration
4646import java.util.*
47+ import java.util.concurrent.ExecutionException
4748import kotlin.collections.HashMap
4849
4950/* *
@@ -54,13 +55,26 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
5455 companion object {
5556 val Log : Logger = LoggerFactory .getLogger(TestingEmbeddedKafka ::class .java)
5657
57- private fun getTopicNames (adminClient : Admin ): MutableSet <String > {
58+ private fun listTopicNames (adminClient : Admin ): MutableSet <String > {
5859 return try {
5960 adminClient.listTopics().names().get()
6061 } catch (e: Exception ) {
6162 throw RuntimeException (" Failed to get topic names" , e)
6263 }
6364 }
65+
66+ private fun waitForTrue (timeout : Duration ,
67+ time : Long = System .currentTimeMillis(),
68+ action : () -> Boolean ): Boolean {
69+
70+ val timeoutMs = timeout.toMillis()
71+ var result = false
72+ while (System .currentTimeMillis() - time < timeoutMs && ! result) {
73+ result = action()
74+ }
75+ return result
76+ }
77+
6478 }
6579
6680 private val config: MutableMap <Any , Any > = HashMap (config)
@@ -71,15 +85,15 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
7185 * @param securityProtocol the security protocol the returned broker list should use.
7286 *
7387 */
74- fun bootstrapServers (securityProtocol : SecurityProtocol ? = null): String {
88+ fun bootstrapServers (securityProtocol : SecurityProtocol ? = null): Array < String > {
7589 val port = if (securityProtocol == null ) {
7690 val listenerName = kafka.config().advertisedListeners().apply (0 ).listenerName()
7791 kafka.boundPort(listenerName)
7892 }
7993 else {
8094 kafka.boundPort(ListenerName (securityProtocol.toString()))
8195 }
82- return " ${kafka.config().hostName()} :$port "
96+ return arrayOf( " ${kafka.config().hostName()} :$port " )
8397 }
8498
8599 /* *
@@ -153,21 +167,62 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
153167 topic, partitions, replication, config
154168 )
155169
156- adminClient().use {adminClient ->
157- val newTopic = NewTopic (topic, partitions, replication.toShort())
158- newTopic.configs(config)
170+ adminClient().use {client ->
159171 try {
160- adminClient.createTopics(listOf (newTopic)).all().get()
161- } catch (e: Exception ) {
162- throw RuntimeException (" Failed to create topic:$topic " , e)
172+ val newTopic = NewTopic (topic, partitions, replication.toShort())
173+ newTopic.configs(config)
174+ client.createTopics(listOf (newTopic)).all().get()
175+ } catch (e : ExecutionException ) {
176+ throw e.cause as Throwable
163177 }
164178 }
165179 }
166180
167181 /* *
168182 * @return the list of topics that exists on the embedded cluster.
169183 */
170- fun topics (): Set <String > = adminClient().use { adminClient -> return getTopicNames(adminClient) }
184+ fun topics (): Set <String > = adminClient().use { adminClient -> return listTopicNames(adminClient) }
185+
186+ /* *
187+ * Waits for all given [topicNames] to be present on the embedded cluster until [timeout].
188+ *
189+ * @return {@code true} if all topics are present before reaching the timeout, {@code false} otherwise.
190+ */
191+ fun waitForTopicsToBePresent (vararg topicNames : String ,
192+ timeout : Duration = Duration .ofSeconds(30)): Boolean {
193+ val now = System .currentTimeMillis()
194+ val required = mutableListOf (* topicNames)
195+ return adminClient().use { client ->
196+ waitForTrue(timeout, now) {
197+ listTopicNames(client).containsAll(required)
198+ }
199+ }
200+ }
201+
202+ /* *
203+ * Waits for all given [topicNames] to be absent on the embedded cluster until [timeout].
204+ *
205+ * @return {@code true} if all topics are absent before reaching the timeout, {@code false} otherwise.
206+ */
207+ fun waitForTopicsToBeAbsent (vararg topicNames : String ,
208+ timeout : Duration = Duration .ofSeconds(30)): Boolean {
209+ return adminClient().use {
210+ doWaitForTopicsToBeAbsent(topics = arrayOf(* topicNames), until = timeout, adminClient = it)
211+ }
212+ }
213+
214+ private fun doWaitForTopicsToBeAbsent (
215+ topics : Array <String >,
216+ until : Duration = Duration .ofMillis(Long .MAX_VALUE ),
217+ now : Long = System .currentTimeMillis(),
218+ adminClient : AdminClient ): Boolean {
219+ val remaining: MutableList <String > = mutableListOf (* topics)
220+ return waitForTrue(until, now) {
221+ val exists = listTopicNames(adminClient)
222+ remaining.retainAll(exists)
223+ remaining.isEmpty()
224+ }
225+ }
171226
172227 /* *
173228 * Creates a new admin client.
@@ -176,7 +231,7 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
176231 */
177232 fun adminClient () =
178233 AdminClient .create(mutableMapOf (
179- Pair (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers()),
234+ Pair (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers().joinToString() ),
180235 Pair (AdminClientConfig .REQUEST_TIMEOUT_MS_CONFIG , 60000 )
181236 ))
182237
@@ -187,7 +242,7 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
187242 */
188243 fun producerClient (config : Map <String , Any ?> = emptyMap()): Producer <Any , Any > {
189244 val configs = HashMap (config)
190- configs[ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers()
245+ configs[ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers().joinToString()
191246 return KafkaProducer (configs)
192247 }
193248
@@ -200,7 +255,7 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
200255 keyDeserializer : Deserializer <K >? = null,
201256 valueDeserializer : Deserializer <V >? = null): Consumer <K , V > {
202257 val configs = HashMap (config)
203- configs[ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers()
258+ configs[ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ] = bootstrapServers().joinToString()
204259 configs.putIfAbsent(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , ByteArrayDeserializer ::class .java.name)
205260 configs.putIfAbsent(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , ByteArrayDeserializer ::class .java.name)
206261 configs.putIfAbsent(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " earliest" )
@@ -232,18 +287,15 @@ class TestingEmbeddedKafka(config: Properties = Properties()) {
232287 /* *
233288 * Deletes the given [topics] from the cluster.
234289 */
235- fun deleteTopics (topics : Collection <String ?>) {
290+ fun deleteTopics (vararg topicNames : String ) {
291+ val remaining: MutableList <String > = mutableListOf (* topicNames)
236292 try {
237- adminClient().use { adminClient ->
238- adminClient.deleteTopics(topics).all().get()
239- val remaining: MutableSet <String ?> = topics.toMutableSet()
240- while (remaining.isNotEmpty()) {
241- val topicNames: Set <String > = adminClient.listTopics().names().get()
242- remaining.retainAll(topicNames)
243- }
293+ adminClient().use { client ->
294+ client.deleteTopics(remaining).all().get()
295+ doWaitForTopicsToBeAbsent(topics = arrayOf(* topicNames), adminClient = client)
244296 }
245297 } catch (e: Exception ) {
246- throw RuntimeException (" Failed to delete topics: $topics " , e)
298+ throw RuntimeException (" Failed to delete topics: $remaining " , e)
247299 }
248300 }
249301
0 commit comments