-
-
Notifications
You must be signed in to change notification settings - Fork 588
feat(kafka): support of Apache Kafka images #3488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✅ Deploy Preview for testcontainers-go ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Summary by CodeRabbitRelease Notes
WalkthroughReworks Kafka startup to compute listener ports at runtime, extract image entrypoint/cmd via the Docker provider, generate and copy a dynamic starter script into containers, and updates tests/examples to run per-image Kafka instances with graceful-shutdown and localhost-listener checks. (33 words) Changes
Sequence DiagramsequenceDiagram
participant Test
participant Run as kafka.Run
participant Provider as DockerProvider
participant Container
participant Script as StarterScript
Test->>Run: Run(ctx, image, opts...)
Run->>Provider: getDockerProvider(opts...)
Run->>Container: Start container (expose ports)
Container-->>Run: Started (container ID, mapped host ports)
Run->>Provider: Inspect image -> entrypoint & cmd
Provider-->>Run: entrypoint & cmd
Run->>Script: Build starter script (listeners, mapped ports, original entrypoint)
Run->>Container: Copy starter script into container
Run->>Container: Exec starter script to launch Kafka runtime
Container-->>Run: Kafka process running / readiness met
Run-->>Test: Return container info (clusterID, running state)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🧰 Additional context used🧠 Learnings (2)📚 Learning: 2025-09-29T15:08:18.694ZApplied to files:
📚 Learning: 2025-09-29T13:57:14.636ZApplied to files:
🧬 Code graph analysis (3)modules/kafka/kafka.go (4)
modules/kafka/examples_test.go (2)
modules/kafka/kafka_test.go (6)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
🔇 Additional comments (13)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6da1b48 to
c74f62a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
modules/kafka/kafka_test.go (1)
51-79: Close Sarama clients per iterationWe now spin up a consumer group and sync producer for every image in the loop, but never close them. Releasing them with
t.Cleanupkeeps goroutines and network resources from bleeding into later iterations.client, err := sarama.NewConsumerGroup(brokers, "groupName", config) require.NoError(t, err) + t.Cleanup(func() { + if err := client.Close(); err != nil { + t.Errorf("close consumer group: %v", err) + } + }) @@ producer, err := sarama.NewSyncProducer(brokers, config) require.NoError(t, err) + t.Cleanup(func() { + if err := producer.Close(); err != nil { + t.Errorf("close producer: %v", err) + } + })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
modules/kafka/examples_test.go(1 hunks)modules/kafka/kafka.go(7 hunks)modules/kafka/kafka_test.go(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-29T15:08:18.694Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3320
File: modules/artemis/artemis.go:98-103
Timestamp: 2025-09-29T15:08:18.694Z
Learning: In testcontainers-go, nat.Port is a type alias for string, so untyped string constants can be passed directly to functions expecting nat.Port (like wait.ForListeningPort) without explicit type conversion - the Go compiler handles the implicit conversion automatically.
Applied to files:
modules/kafka/kafka.go
📚 Learning: 2025-09-29T13:57:14.636Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3319
File: modules/arangodb/arangodb.go:46-57
Timestamp: 2025-09-29T13:57:14.636Z
Learning: In testcontainers-go ArangoDB module, the wait strategy combines port listening check with HTTP readiness check using wait.ForAll - both strategies are required and complementary, not redundant.
Applied to files:
modules/kafka/kafka.go
🧬 Code graph analysis (3)
modules/kafka/examples_test.go (3)
modules/kafka/kafka.go (2)
Run(52-139)WithClusterID(191-195)cleanup.go (1)
TerminateContainer(97-108)log/logger.go (1)
Printf(47-49)
modules/kafka/kafka_test.go (6)
modules/kafka/kafka.go (2)
Run(52-139)WithClusterID(191-195)testing.go (1)
CleanupContainer(91-97)cleanup.go (1)
StopTimeout(76-80)modules/kafka/consumer_test.go (1)
NewTestKafkaConsumer(18-29)options.go (1)
WithWaitStrategy(366-368)wait/exec.go (1)
NewExecStrategy(30-37)
modules/kafka/kafka.go (6)
options.go (1)
ContainerCustomizer(22-24)container.go (1)
Container(41-73)docker.go (1)
DockerProvider(977-984)generic.go (1)
GenericContainerRequest(21-27)log/logger.go (2)
Logger(18-20)Default(38-40)provider.go (1)
ProviderType(24-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (1.25.x, modules/kafka) / test: modules/kafka/1.25.x
- GitHub Check: test (1.24.x, modules/kafka) / test: modules/kafka/1.24.x
- GitHub Check: Analyze (go)
c74f62a to
ee5c1aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
modules/kafka/examples_test.go(2 hunks)modules/kafka/kafka.go(7 hunks)modules/kafka/kafka_test.go(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-29T15:08:18.694Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3320
File: modules/artemis/artemis.go:98-103
Timestamp: 2025-09-29T15:08:18.694Z
Learning: In testcontainers-go, nat.Port is a type alias for string, so untyped string constants can be passed directly to functions expecting nat.Port (like wait.ForListeningPort) without explicit type conversion - the Go compiler handles the implicit conversion automatically.
Applied to files:
modules/kafka/kafka.go
📚 Learning: 2025-09-29T13:57:14.636Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3319
File: modules/arangodb/arangodb.go:46-57
Timestamp: 2025-09-29T13:57:14.636Z
Learning: In testcontainers-go ArangoDB module, the wait strategy combines port listening check with HTTP readiness check using wait.ForAll - both strategies are required and complementary, not redundant.
Applied to files:
modules/kafka/kafka.go
🧬 Code graph analysis (3)
modules/kafka/examples_test.go (2)
modules/kafka/kafka.go (2)
Run(52-139)WithClusterID(191-195)cleanup.go (1)
TerminateContainer(97-108)
modules/kafka/kafka_test.go (6)
modules/kafka/kafka.go (2)
Run(52-139)WithClusterID(191-195)testing.go (1)
CleanupContainer(91-97)cleanup.go (1)
StopTimeout(76-80)modules/kafka/consumer_test.go (1)
NewTestKafkaConsumer(18-29)options.go (1)
WithWaitStrategy(366-368)wait/exec.go (1)
NewExecStrategy(30-37)
modules/kafka/kafka.go (6)
container.go (1)
Container(41-73)docker.go (1)
DockerProvider(977-984)wait/host_port.go (1)
ForMappedPort(79-81)generic.go (1)
GenericContainerRequest(21-27)log/logger.go (2)
Logger(18-20)Default(38-40)provider.go (1)
ProviderType(24-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (1.25.x, modules/kafka) / test: modules/kafka/1.25.x
- GitHub Check: test (1.24.x, modules/kafka) / test: modules/kafka/1.24.x
- GitHub Check: Analyze (go)
ee5c1aa to
73ec41e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
modules/kafka/kafka_test.go (1)
17-29: Consider reordering variable declarations for clarity.While Go correctly resolves the dependency and initializes
gracefulShutdownSupportingKafkaImagesbeforesupportedKafkaImages, explicitly defininggracefulShutdownSupportingKafkaImagesfirst would improve readability.var ( - supportedKafkaImages = append(gracefulShutdownSupportingKafkaImages, - "confluentinc/confluent-local:7.4.0", - "confluentinc/confluent-local:7.5.0", - ) - gracefulShutdownSupportingKafkaImages = []string{ "apache/kafka:3.9.1", "apache/kafka:4.0.1", "apache/kafka-native:3.9.1", "apache/kafka-native:4.0.1", } + + supportedKafkaImages = append(gracefulShutdownSupportingKafkaImages, + "confluentinc/confluent-local:7.4.0", + "confluentinc/confluent-local:7.5.0", + ) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
modules/kafka/examples_test.go(2 hunks)modules/kafka/kafka.go(6 hunks)modules/kafka/kafka_test.go(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-29T15:08:18.694Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3320
File: modules/artemis/artemis.go:98-103
Timestamp: 2025-09-29T15:08:18.694Z
Learning: In testcontainers-go, nat.Port is a type alias for string, so untyped string constants can be passed directly to functions expecting nat.Port (like wait.ForListeningPort) without explicit type conversion - the Go compiler handles the implicit conversion automatically.
Applied to files:
modules/kafka/kafka.go
📚 Learning: 2025-09-29T13:57:14.636Z
Learnt from: mdelapenya
Repo: testcontainers/testcontainers-go PR: 3319
File: modules/arangodb/arangodb.go:46-57
Timestamp: 2025-09-29T13:57:14.636Z
Learning: In testcontainers-go ArangoDB module, the wait strategy combines port listening check with HTTP readiness check using wait.ForAll - both strategies are required and complementary, not redundant.
Applied to files:
modules/kafka/kafka.go
🧬 Code graph analysis (3)
modules/kafka/examples_test.go (3)
log/logger.go (1)
Printf(47-49)modules/kafka/kafka.go (2)
Run(53-140)WithClusterID(186-190)cleanup.go (1)
TerminateContainer(97-108)
modules/kafka/kafka_test.go (6)
modules/kafka/kafka.go (2)
Run(53-140)WithClusterID(186-190)testing.go (1)
CleanupContainer(91-97)cleanup.go (1)
StopTimeout(76-80)modules/kafka/consumer_test.go (1)
NewTestKafkaConsumer(18-29)options.go (1)
WithWaitStrategy(366-368)wait/exec.go (1)
NewExecStrategy(30-37)
modules/kafka/kafka.go (5)
options.go (1)
ContainerCustomizer(22-24)container.go (1)
Container(41-73)docker.go (1)
DockerProvider(977-984)generic.go (1)
GenericContainerRequest(21-27)log/logger.go (2)
Logger(18-20)Default(38-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (1.24.x, modules/kafka) / test: modules/kafka/1.24.x
- GitHub Check: test (1.25.x, modules/kafka) / test: modules/kafka/1.25.x
- GitHub Check: Analyze (go)
🔇 Additional comments (12)
modules/kafka/examples_test.go (1)
12-103: LGTM! Excellent refactoring that resolves the closure issue.Splitting the original loop into three separate example functions (
ExampleRun_confluentLocal,ExampleRun_apacheKafka,ExampleRun_apacheKafkaNative) cleanly eliminates the defer closure problem from the previous version. Each example is now independently runnable with clear, testable output expectations.modules/kafka/kafka_test.go (3)
31-95: LGTM! Comprehensive per-image testing.The test properly exercises each supported image with a full producer/consumer flow. The use of
StopTimeout(0)for cleanup is appropriate for test scenarios where immediate termination is acceptable.
97-121: LGTM! Graceful shutdown test validates the key PR objective.The test correctly verifies that Apache Kafka images support graceful shutdown within a reasonable timeout (60s), which addresses one of the main motivations for this PR.
123-138: LGTM! Localhost listener verification is appropriate.The test validates that the LOCALHOST listener on port 9095 is accessible within the container, which is essential for certain Kafka client scenarios.
modules/kafka/kafka.go (8)
21-37: LGTM! Clear listener port organization.The port constants and starter script template are well-structured. The script properly exports advertised listeners and preserves the original container entrypoint execution.
54-62: LGTM! Clean initialization of dynamic port and provider.The explicit port creation and provider extraction enable the dynamic listener configuration needed to support multiple Kafka image variants.
68-77: LGTM! Dynamic listener configuration supports multiple images.The parameterized listener setup with the LOCALHOST mapping properly accommodates different Kafka image requirements.
91-93: LGTM! Elegant wait-and-exec pattern.The entrypoint override combined with the polling loop ensures the starter script is copied before execution, preventing race conditions.
194-206: LGTM! Consistent dynamic port handling.The Brokers function now uses the same port constant as Run, maintaining consistency across the module.
211-232: LGTM! Controller quorum voters use consistent port constant.The function now references
controllerListenerLocalPortinstead of a hardcoded value, maintaining alignment with the dynamic listener configuration.
234-257: LGTM! Practical approach to extract provider from options.The function applies customizers to a temporary request to extract the provider configuration, which is a reasonable pattern for accessing the Docker client needed for image inspection. Options are effectively applied twice (here and during actual Run), but standard customizers like
WithEnvandWithExposedPortsare idempotent.
164-177: IPv6 handling is already correct—no action needed.Both
PortEndpoint(docker.go:158 and modules/ollama/local.go:662) and thebrokerHostPortconstruction (kafka.go:170) usenet.JoinHostPort, which automatically wraps IPv6 addresses in brackets per Go's standard library. For an IPv6 host likefd00::1, this produces[fd00::1]:port, resulting in correctly formatted endpoints likePLAINTEXT://[fd00::1]:9093.
…kafka-native). Signed-off-by: Marat Abrarov <abrarov@gmail.com>
73ec41e to
731a095
Compare
Changes
What does this PR do?
Adding support of Apache Kafka Docker Hub images - apache/kafka and apache/kafka-native - into kafka module.
Why is it important?
Images supported by kafka module are limited to confluentinc/confluent-local and these images have issues with graceful stop - refer to #2206 (comment). apache/kafka and apache/kafka-native images support graceful shutdown. apache/kafka-native image is also faster and can be used to speedup tests.
Related issues
Follow-ups
Need to decide if #3249 is a better choice to go.