Skip to content

Commit c8a2a8e

Browse files
authored
fix(kafka-tests): use non deprecated kafka test container (#99)
* fix(build): removed non existing module entry in settings.gradle.kts * fix(kafka-tests): use non deprecated kafka test container: - switched to using the Testcontainers implementation for Confluent Kafka. This required some changes in the tests setup. - This also exposed an issue where the CommonClientPropertiesBuilder being an object was causing it to be shared across tests, which while cannot be an issue in main code, it still is more correct for it to be a class. * format: auto applied ktlint formatting * bump patch version
1 parent 3fe9483 commit c8a2a8e

File tree

10 files changed

+82
-69
lines changed

10 files changed

+82
-69
lines changed

.gitignore

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,9 @@ out/
4545
/.run/publish kafka and run example.run.xml
4646
/.kotlin
4747
/buildSrc/.kotlin
48-
**/node_modules/
48+
**/node_modules/
49+
50+
### Agents ###
51+
/CLAUDE.md
52+
.claude
53+
.junie/

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ github.repository.name=extra-ktor-plugins
33
kotlin.native.cacheKind.linuxX64=none
44
kotlin.native.ignoreDisabledTargets=true
55
gradle.publish.enable.module-metadata=true
6-
version=2.2.1
6+
version=2.2.2
77
gpr.user=flaxoos
88
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=2g
99
kotlin.mpp.applyDefaultHierarchyTemplate=false

ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,6 @@ internal fun KafkaConfigPropertiesContext.withSslProperties(properties: SslPrope
245245
}
246246
}
247247

248-
internal fun KafkaConfigPropertiesContext.withSslProperties(properties: SslPropertiesBuilder?) =
249-
apply {
250-
properties?.let { kafkaProperties.putAll(it.build()) }
251-
}
252-
253-
internal fun KafkaConfigPropertiesContext.withSaslProperties(properties: SaslPropertiesBuilder?) =
254-
apply {
255-
properties?.let { kafkaProperties.putAll(it.build()) }
256-
}
257-
258248
@KafkaDsl
259249
fun KafkaConsumerConfig.consumerRecordHandler(
260250
topicName: TopicName,
@@ -264,9 +254,9 @@ fun KafkaConsumerConfig.consumerRecordHandler(
264254
}
265255

266256
@KafkaDsl
267-
fun KafkaConfig.common(configuration: CommonClientPropertiesBuilder.() -> Unit = { CommonClientPropertiesBuilder }) {
257+
fun KafkaConfig.common(configuration: CommonClientPropertiesBuilder.() -> Unit = { CommonClientPropertiesBuilder() }) {
268258
commonPropertiesBuilder =
269-
CommonClientPropertiesBuilder.apply(configuration)
259+
CommonClientPropertiesBuilder().apply(configuration)
270260
}
271261

272262
@KafkaDsl
@@ -591,7 +581,7 @@ sealed class ClientPropertiesBuilder : KafkaPropertiesBuilder() {
591581
/**
592582
* Concrete implementation of [ClientPropertiesBuilder] to represent the common properties
593583
*/
594-
data object CommonClientPropertiesBuilder : ClientPropertiesBuilder()
584+
class CommonClientPropertiesBuilder : ClientPropertiesBuilder()
595585

596586
class AdminPropertiesBuilder : ClientPropertiesBuilder()
597587

ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/BaseKafkaIntegrationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ abstract class BaseKafkaIntegrationTest : FunSpec() {
103103
}
104104
isConnected = true
105105
break
106-
} catch (e: Exception) {
106+
} catch (_: Exception) {
107107
logger.info(
108108
"Attempt $i to connect to Kafka broker at bootstrap.servers: " +
109109
"$bootstrapServers failed, retrying",

ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaContainers.kt

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,67 @@
11
package io.github.flaxoos.ktor.server.plugins.kafka
22

3-
import io.github.flaxoos.ktor.server.plugins.kafka.KafkaGenericContainer.Companion.kafkaImage
43
import io.ktor.util.logging.KtorSimpleLogger
54
import io.ktor.util.logging.Logger
65
import org.testcontainers.containers.DockerComposeContainer
76
import org.testcontainers.containers.GenericContainer
8-
import org.testcontainers.containers.KafkaContainer
97
import org.testcontainers.containers.Network
108
import org.testcontainers.containers.output.Slf4jLogConsumer
119
import org.testcontainers.containers.wait.strategy.Wait
10+
import org.testcontainers.kafka.ConfluentKafkaContainer
1211
import org.testcontainers.utility.DockerImageName
1312
import org.testcontainers.utility.MountableFile
1413
import java.io.File
1514
import kotlin.time.Duration.Companion.seconds
1615
import kotlin.time.toJavaDuration
1716

1817
private const val CONFLUENT_PLATFORM_VERSION = "7.7.1"
18+
private const val KAFKA_IMAGE_NAME = "confluentinc/cp-kafka"
19+
private const val SCHEMA_REGISTRY_IMAGE_NAME = "confluentinc/cp-schema-registry"
20+
private const val KAFKA_IMAGE = "$KAFKA_IMAGE_NAME:$CONFLUENT_PLATFORM_VERSION"
21+
private const val SCHEMA_REGISTRY_IMAGE = "$SCHEMA_REGISTRY_IMAGE_NAME:$CONFLUENT_PLATFORM_VERSION"
1922
private val logger: Logger = KtorSimpleLogger(BaseKafkaIntegrationTest::class.java.simpleName)
2023
private val kafkaNetwork = Network.builder().driver("bridge").build()
2124
private const val YML_FILE_PATH = "/Kafka Cluster SSL.yml"
2225
private val yml = object {}::class.java.getResource(YML_FILE_PATH)?.toURI() ?: error("$YML_FILE_PATH not found")
23-
2426
internal const val LOCALHOST = "localhost"
2527
internal const val KAFKA_BROKER_PORT = 19092
2628
internal const val BOOTSTRAP_SERVERS: String = "$LOCALHOST:$KAFKA_BROKER_PORT"
2729
internal const val SCHEMA_REGISTRY_PORT = 8081
2830
internal const val SCHEMA_REGISTRY_URL: String = "https://$LOCALHOST:$SCHEMA_REGISTRY_PORT"
2931
internal const val PASSWORD = "test_password"
3032

31-
internal fun newKafkaContainer() = KafkaContainer(kafkaImage)
33+
internal fun newKafkaContainer() =
34+
ConfluentKafkaContainer(
35+
DockerImageName
36+
.parse(KAFKA_IMAGE)
37+
.asCompatibleSubstituteFor(KAFKA_IMAGE_NAME),
38+
).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT_S.seconds.toJavaDuration())
3239

33-
internal fun KafkaContainer.config() {
34-
if (System.getProperty("os.name").lowercase().contains("mac")) {
35-
withCreateContainerCmdModifier { it.withPlatform("linux/amd64") }
36-
}
37-
withNetworkAliases("kafka")
40+
internal fun ConfluentKafkaContainer.config() {
3841
withNetwork(kafkaNetwork)
39-
withEnv("KAFKA_AUTO_CREATE_TOPIC_ENABLE", "false")
42+
withNetworkAliases("kafka")
43+
withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
4044
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
4145
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
46+
withListener("kafka:9095")
4247
}
4348

44-
internal fun SchemaRegistryContainer.config(kafka: KafkaContainer) {
45-
waitingFor(Wait.forHttp("/subjects").forStatusCode(200).withStartupTimeout(120.seconds.toJavaDuration()))
49+
private const val CONTAINER_STARTUP_TIMEOUT_S = 30
50+
51+
internal fun SchemaRegistryContainer.config(kafka: ConfluentKafkaContainer) {
52+
waitingFor(
53+
Wait
54+
.forHttp("/subjects")
55+
.forStatusCode(200)
56+
.withStartupTimeout(CONTAINER_STARTUP_TIMEOUT_S.seconds.toJavaDuration()),
57+
)
4658
withExposedPorts(SCHEMA_REGISTRY_PORT)
4759
withNetwork(kafka.network)
60+
withNetworkAliases("schema-registry")
4861
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
4962
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:$SCHEMA_REGISTRY_PORT")
50-
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://${kafka.networkAliases[0]}:9092")
63+
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9095")
64+
withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
5165
}
5266

5367
internal val zookeeperContainer =
@@ -125,6 +139,7 @@ internal fun SchemaRegistryContainer.sslConfig() {
125139
withCopyToContainer(MountableFile.forClasspathResource("/secrets"), "/etc/kafka/secrets")
126140
}
127141

142+
@Suppress("unused")
128143
internal val dockerComposeContainer: DockerComposeContainer<*> =
129144
logErrors { DockerComposeContainer("kafka-ssl", File(yml)) }
130145

@@ -146,28 +161,31 @@ internal abstract class FixedPortsContainer<SELF : GenericContainer<SELF>>(
146161
}
147162
}
148163

149-
internal class SchemaRegistryContainer private constructor() :
150-
FixedPortsContainer<SchemaRegistryContainer>("$schemaRegistryImage:$CONFLUENT_PLATFORM_VERSION") {
151-
companion object {
152-
val schemaRegistryImage: DockerImageName = DockerImageName.parse("confluentinc/cp-schema-registry")
164+
internal class SchemaRegistryContainer private constructor() : FixedPortsContainer<SchemaRegistryContainer>(SCHEMA_REGISTRY_IMAGE) {
165+
companion object {
166+
@Suppress("unused")
167+
val schemaRegistryImage: DockerImageName = DockerImageName.parse(SCHEMA_REGISTRY_IMAGE)
153168

154-
fun new() = SchemaRegistryContainer()
155-
}
169+
fun new() = SchemaRegistryContainer()
156170
}
171+
}
157172

158-
internal class KafkaGenericContainer private constructor() :
159-
FixedPortsContainer<KafkaGenericContainer>("$kafkaImage:$CONFLUENT_PLATFORM_VERSION") {
160-
companion object {
161-
val kafkaImage: DockerImageName = DockerImageName.parse("confluentinc/cp-kafka")
173+
internal class KafkaGenericContainer private constructor() : FixedPortsContainer<KafkaGenericContainer>(KAFKA_IMAGE) {
174+
companion object {
175+
@Suppress("unused")
176+
val kafkaImage: DockerImageName = DockerImageName.parse(KAFKA_IMAGE)
162177

163-
fun new() = KafkaGenericContainer()
164-
}
178+
fun new() = KafkaGenericContainer()
165179
}
180+
}
166181

167-
internal fun KafkaContainer.withConsumeLogs() = withLogConsumer(kafkaLogConsumer)
182+
@Suppress("unused")
183+
internal fun ConfluentKafkaContainer.withConsumeLogs() = withLogConsumer(kafkaLogConsumer)
168184

185+
@Suppress("unused")
169186
internal fun KafkaGenericContainer.withConsumeLogs() = withLogConsumer(kafkaLogConsumer)
170187

188+
@Suppress("unused")
171189
internal fun SchemaRegistryContainer.withConsumeLogs() = withLogConsumer(schemaRegistryLogConsumer)
172190

173191
private val kafkaLogger = KtorSimpleLogger("kafka")

ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaIntegrationTest.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import io.kotest.extensions.testcontainers.ContainerExtension
88
import io.kotest.extensions.testcontainers.ContainerLifecycleMode
99
import io.ktor.server.application.install
1010
import kotlinx.serialization.Serializable
11+
import org.apache.kafka.clients.CommonClientConfigs
1112
import org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG
1213
import org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
13-
import org.testcontainers.containers.KafkaContainer
14+
import org.testcontainers.kafka.ConfluentKafkaContainer
1415
import kotlin.time.Duration.Companion.seconds
1516

1617
const val CODE_CONFIGURED_CLIENT_ID = "code-configured-client-id"
@@ -19,13 +20,13 @@ const val CODE_CONFIGURED_GROUP_ID = "code-configured-group-id"
1920
@OptIn(ExperimentalKotest::class)
2021
class KafkaIntegrationTest : BaseKafkaIntegrationTest() {
2122
init {
22-
val kafkaContainer: KafkaContainer = newKafkaContainer()
23+
val kafkaContainer: ConfluentKafkaContainer = newKafkaContainer()
2324
val schemaRegistryContainer = SchemaRegistryContainer.new()
2425
val kafka =
2526
install(
2627
ContainerExtension(
2728
kafkaContainer.apply { config() },
28-
mode = ContainerLifecycleMode.Project,
29+
mode = ContainerLifecycleMode.Spec,
2930
beforeTest = {
3031
bootstrapServers = kafkaContainer.bootstrapServers
3132
},
@@ -41,7 +42,7 @@ class KafkaIntegrationTest : BaseKafkaIntegrationTest() {
4142
},
4243
),
4344
)
44-
context("should pr oduce and consume records").config(timeout = 120.seconds) {
45+
context("should produce and consume records").config(timeout = 20.seconds) {
4546
test("With default config path") {
4647
editConfigurationFile()
4748
testKafkaApplication {
@@ -66,7 +67,7 @@ class KafkaIntegrationTest : BaseKafkaIntegrationTest() {
6667
install(Kafka) {
6768
schemaRegistryUrl = resolvedSchemaRegistryUrl
6869
setupTopics()
69-
common { bootstrapServers = listOf(kafka.bootstrapServers) }
70+
common { bootstrapServers = listOf(this@KafkaIntegrationTest.bootstrapServers) }
7071
admin { clientId = CODE_CONFIGURED_CLIENT_ID }
7172
producer { clientId = CODE_CONFIGURED_CLIENT_ID }
7273
consumer {
@@ -84,7 +85,7 @@ class KafkaIntegrationTest : BaseKafkaIntegrationTest() {
8485
setupTopics()
8586
common {
8687
additional {
87-
(listOf(kafka.bootstrapServers))
88+
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG(this@KafkaIntegrationTest.bootstrapServers)
8889
}
8990
}
9091
admin {

ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaSSLIntegrationTest.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.apache.kafka.clients.CommonClientConfigs
88
import org.apache.kafka.common.config.SslConfigs
99
import java.io.File
1010
import java.nio.file.Paths
11+
import kotlin.time.Duration.Companion.seconds
1112

1213
class KafkaSSLIntegrationTest : BaseKafkaIntegrationTest() {
1314
override val additionalProducerProperties: Map<String, Any> by lazy {
@@ -30,7 +31,7 @@ class KafkaSSLIntegrationTest : BaseKafkaIntegrationTest() {
3031
install(
3132
ContainerExtension(
3233
zookeeperContainer,
33-
ContainerLifecycleMode.Project,
34+
ContainerLifecycleMode.Spec,
3435
beforeStart = { generateCertificates() },
3536
),
3637
)
@@ -41,10 +42,10 @@ class KafkaSSLIntegrationTest : BaseKafkaIntegrationTest() {
4142
sslConfig()
4243
dependsOn(zooKeeper)
4344
},
44-
ContainerLifecycleMode.Project,
45-
beforeStart = { zooKeeper.start() },
46-
beforeTest = {
45+
ContainerLifecycleMode.Spec,
46+
beforeStart = {
4747
bootstrapServers = BOOTSTRAP_SERVERS
48+
zooKeeper.start()
4849
},
4950
),
5051
)
@@ -54,7 +55,7 @@ class KafkaSSLIntegrationTest : BaseKafkaIntegrationTest() {
5455
sslConfig()
5556
dependsOn(kafka)
5657
},
57-
ContainerLifecycleMode.Project,
58+
ContainerLifecycleMode.Spec,
5859
beforeStart = {
5960
kafkaContainer.start()
6061
},
@@ -64,13 +65,13 @@ class KafkaSSLIntegrationTest : BaseKafkaIntegrationTest() {
6465
),
6566
)
6667

67-
test("With code configuration") {
68+
test("With code configuration").config(timeout = 20.seconds) {
6869
testKafkaApplication(waitSecondsAfterApplicationStart = 10) {
6970
install(Kafka) {
7071
schemaRegistryUrl = resolvedSchemaRegistryUrl
7172
setupTopics()
7273
common {
73-
bootstrapServers = listOf(BOOTSTRAP_SERVERS)
74+
bootstrapServers = listOf(this@KafkaSSLIntegrationTest.bootstrapServers)
7475
securityProtocol = "SSL"
7576
}
7677
admin {

ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ import io.ktor.server.testing.TestApplication
1919
import io.ktor.server.testing.TestApplicationBuilder
2020
import korlibs.time.DateTime
2121
import korlibs.time.minutes
22-
import kotlin.math.pow
23-
import kotlin.time.Duration.Companion.milliseconds
2422
import kotlinx.coroutines.coroutineScope
2523
import kotlinx.coroutines.delay
2624
import kotlinx.coroutines.joinAll
2725
import kotlinx.coroutines.launch
26+
import kotlin.math.pow
27+
import kotlin.time.Duration.Companion.milliseconds
2828

2929
@Suppress("UNUSED")
3030
val logger = KotlinLogging.logger { }
@@ -77,10 +77,10 @@ abstract class TaskSchedulingPluginTest : FunSpec() {
7777
kronTaskSchedule = kronTaskSchedule(freqMs),
7878
).map { executionsAndApp ->
7979
executionsAndApp to
80-
launch {
81-
val app = executionsAndApp.second
82-
app.start()
83-
}
80+
launch {
81+
val app = executionsAndApp.second
82+
app.start()
83+
}
8484
}.also { appsAndJobs ->
8585
// wait for all app engines to start
8686
appsAndJobs.map { appAndJob -> appAndJob.second }.joinAll()
@@ -196,12 +196,12 @@ abstract class TaskSchedulingPluginTest : FunSpec() {
196196
}
197197
}
198198
executionRecords to
199-
TestApplication {
200-
engine {
201-
shutdownGracePeriod = freqMs * 10
202-
}
203-
block()
199+
TestApplication {
200+
engine {
201+
shutdownGracePeriod = freqMs * 10
204202
}
203+
block()
204+
}
205205
}
206206

207207
private fun exponentialScheduleGenerator(

ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.TaskLock
1313
import io.github.oshai.kotlinlogging.KotlinLogging
1414
import io.ktor.server.application.Application
1515
import korlibs.time.DateTime
16-
import kotlin.jvm.JvmInline
1716
import kotlinx.coroutines.runBlocking
17+
import kotlin.jvm.JvmInline
1818

1919
internal val logger = KotlinLogging.logger { }
2020

@@ -28,7 +28,6 @@ public class RedisLockManager(
2828
private val lockExpirationMs: Long,
2929
private val connectionAcquisitionTimeoutMs: Long,
3030
) : TaskLockManager<RedisTaskLock>() {
31-
3231
override suspend fun init(tasks: List<Task>) {}
3332

3433
override suspend fun acquireLockKey(

settings.gradle.kts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,3 @@ include("ktor-server-task-scheduling:ktor-server-task-scheduling-core:test")
2929
include("ktor-server-task-scheduling:ktor-server-task-scheduling-jdbc")
3030
include("ktor-server-task-scheduling:ktor-server-task-scheduling-mongodb")
3131
include("ktor-server-task-scheduling:ktor-server-task-scheduling-redis")
32-
include("ktor-server-task-scheduling:common-test")

0 commit comments

Comments
 (0)