Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
1ef2f84
feat(kafka_native): new module
strowk Aug 3, 2025
4ebe2d1
feat(kafka_native): new module
strowk Aug 3, 2025
0a80e8c
feat: merge kafka native into kafka module
strowk Nov 8, 2025
32b803c
Merge branch 'main' into feature/kafka_native
strowk Nov 8, 2025
0e2dfab
chore: correct unit test naming
strowk Nov 8, 2025
57f0cd9
chore: remove kafka_native doc
strowk Nov 8, 2025
cb66883
chore: small doc correction
strowk Nov 8, 2025
246fdae
chore: remove all kafka native folder
strowk Nov 8, 2025
e5c5ebb
Merge branch 'main' into feature/kafka_native
strowk Nov 8, 2025
b131199
chore: remove kafka_native from mkdocs
strowk Nov 8, 2025
13e9773
docs: explain default behavior
strowk Nov 8, 2025
5da08a1
chore: refactor to make linter happy
strowk Nov 8, 2025
d641100
chore: simplify helper function
strowk Nov 8, 2025
5a57c72
chore: test graceful shutdown for apache images
strowk Nov 8, 2025
ea05212
chore: test for more versions
strowk Nov 8, 2025
b55f3fd
chore: rename tests
strowk Nov 8, 2025
5cbb28d
chore: graceful shutdown should not give error
strowk Nov 8, 2025
5946d39
chore: test already calls stop, drop extra timeout
strowk Nov 8, 2025
9c948d2
docs: give more guidance to pick kafka image
strowk Nov 8, 2025
9ae246b
docs: correct snippet ref
strowk Nov 9, 2025
12a1ae9
docs: wording adjustment
strowk Nov 9, 2025
99b4833
feat: allow to override starter script
strowk Nov 9, 2025
b51a341
chore: clean, doc and linter
strowk Nov 9, 2025
a2eb5e3
chore: fix linter issues
strowk Nov 9, 2025
b21194e
feat: add localhost listener to both flavors
strowk Nov 9, 2025
4b7ebda
provide With..Flavor options
strowk Nov 15, 2025
4d5a299
go doc for options
strowk Nov 15, 2025
8c1a7ab
update option go doc
strowk Nov 15, 2025
9adbde3
update doc about flavor option
strowk Nov 15, 2025
fcfee34
fix to not have side effect when returning error
strowk Nov 15, 2025
f227e38
update go doc for options
strowk Nov 15, 2025
164ff1b
document images difference
strowk Nov 15, 2025
31f4579
document options better
strowk Nov 15, 2025
f6f75b2
document localhost listener
strowk Nov 15, 2025
1e916ee
move localhost doc into separate section
strowk Nov 15, 2025
9a00515
add benchmarks and correct time docs
strowk Nov 15, 2025
ad56cd2
fix doc typo
strowk Nov 15, 2025
a3a5cb7
update images pick table
strowk Nov 15, 2025
fb876de
ideomatic benchmark error handling
strowk Nov 15, 2025
57b8a7c
fix lint issue in benchmark
strowk Nov 15, 2025
5fe7204
docs: add since version markers
strowk Nov 15, 2025
0e923e0
docs: clarify when apache images are available
strowk Nov 15, 2025
f83d07d
docs: separate image pick section from usage
strowk Nov 15, 2025
b65ba06
docs: simplify doc with reference to detailed option
strowk Nov 15, 2025
4f02648
docs: link starter script section
strowk Nov 15, 2025
8706053
docs: add since version marker to localhost listener
strowk Nov 15, 2025
5f5e8fb
docs: align section header
strowk Nov 19, 2025
b8de9b1
chore: simplify test assertion
strowk Nov 20, 2025
c6a2b1d
chore: simplify benchmark assertion
strowk Nov 20, 2025
81b1065
fix: return error if conflicting options are given
strowk Nov 20, 2025
34242df
chore: formatting from linter
strowk Nov 20, 2025
907ce01
docs: explain conflicting options
strowk Nov 20, 2025
4627d1b
chore: simplify test assertions
strowk Nov 20, 2025
1fc9b85
chore: correct typo in test
strowk Nov 20, 2025
68f2f3f
chore: add apache/kafka test cases
strowk Nov 20, 2025
3034eea
docs: better describe image detection
strowk Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Kafka (KRaft)
# Kafka

Since <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.24.0"><span class="tc-version">:material-tag: v0.24.0</span></a>

## Introduction

The Testcontainers module for KRaft: [Apache Kafka Without ZooKeeper](https://developer.confluent.io/learn/kraft).
The Testcontainers module for Kafka.

This module would run Kafka in Kraft mode: [Apache Kafka Without ZooKeeper](https://developer.confluent.io/learn/kraft/) and it supports both [Apache Kafka](https://kafka.apache.org/) and [Confluent](https://docs.confluent.io/kafka/overview.html) images.

## Adding this module to your project dependencies

Expand All @@ -17,9 +19,19 @@ go get github.com/testcontainers/testcontainers-go/modules/kafka
## Usage example

<!--codeinclude-->
[Creating a Kafka container](../../modules/kafka/examples_test.go) inside_block:runKafkaContainer
[Apache Native Kafka](../../modules/kafka/examples_test.go) inside_block:runKafkaContainerApacheNative
<!--/codeinclude-->

<!--codeinclude-->
[Apache Kafka](../../modules/kafka/examples_test.go) inside_block:runKafkaContainerApacheNotNative
<!--/codeinclude-->

<!--codeinclude-->
[Confluent Kafka](../../modules/kafka/examples_test.go) inside_block:runKafkaContainerConfluent
<!--/codeinclude-->

The native container ([apache/kafka-native](https://hub.docker.com/r/apache/kafka-native/)) is based on GraalVM and typically starts several seconds faster than alternatives.

## Module Reference

### Run function
Expand All @@ -42,12 +54,12 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
#### Image

Use the second argument in the `Run` function to set a valid Docker image.
In example: `Run(context.Background(), "confluentinc/confluent-local:7.5.0")`.
In example: `Run(context.Background(), "apache/kafka-native:4.0.1")`.

!!! warning
The minimal required version of Kafka for KRaft mode is `confluentinc/confluent-local:7.4.0`. If you are using an image that
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check
the version for you.
Module expects that the image in use supports Kraft mode (Kafka without ZooKeeper).
The minimal required version of Confluent images for KRaft mode is `confluentinc/confluent-local:7.4.0`.
All Apache images support Kraft mode.

#### Environment variables

Expand All @@ -59,10 +71,19 @@ The environment variables that are already set by default are:

#### Init script

The Kafka container will be started using a custom shell script:
The Kafka container will be started using a custom shell script.

Module would vary the starter script depending on the image in use, using following logic:

- image starts with `apache/kafka`: use Apache Kafka starter script.
- image starts with `confluentinc/`: use Confluent starter script.

<!--codeinclude-->
[Apache Kafka starter script](../../modules/kafka/kafka.go) inside_block:starterScriptApache
<!--/codeinclude-->

<!--codeinclude-->
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript
[Confluent starter script](../../modules/kafka/kafka.go) inside_block:starterScriptConfluentinc
<!--/codeinclude-->

### Container Options
Expand Down
70 changes: 68 additions & 2 deletions modules/kafka/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/testcontainers/testcontainers-go/modules/kafka"
)

func ExampleRun() {
// runKafkaContainer {
func ExampleRun_confluentinc() {
// runKafkaContainerConfluentinc {
ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx,
Expand Down Expand Up @@ -41,3 +41,69 @@ func ExampleRun() {
// test-cluster
// true
}

func ExampleRun_apacheNative() {
// runKafkaContainerApacheNative {
ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx,
"apache/kafka-native:4.0.1",
kafka.WithClusterID("test-cluster"),
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
if err != nil {
log.Printf("failed to start container: %s", err)
return
}
// }

state, err := kafkaContainer.State(ctx)
if err != nil {
log.Printf("failed to get container state: %s", err)
return
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)

// Output:
// test-cluster
// true
}

func ExampleRun_apacheNotNative() {
// runKafkaContainerApacheNotNative {
ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx,
"apache/kafka:4.0.1",
kafka.WithClusterID("test-cluster"),
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
if err != nil {
log.Printf("failed to start container: %s", err)
return
}
// }

state, err := kafkaContainer.State(ctx)
if err != nil {
log.Printf("failed to get container state: %s", err)
return
}

fmt.Println(kafkaContainer.ClusterID)
fmt.Println(state.Running)

// Output:
// test-cluster
// true
}
19 changes: 13 additions & 6 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ const publicPort = nat.Port("9093/tcp")
const (
starterScript = "/usr/sbin/testcontainers_start.sh"

// starterScript {
starterScriptContent = `#!/bin/bash
// starterScriptConfluentinc {
confluentincStarterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092
echo Starting Kafka KRaft mode
Expand All @@ -30,6 +30,13 @@ echo '' > /etc/confluent/docker/ensure
/etc/confluent/docker/configure
/etc/confluent/docker/launch`
// }

// starterScriptApache {
apacheStarterScriptContent = `#!/bin/bash
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092
echo Starting Apache Kafka
exec /etc/kafka/docker/run`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid hard-coded value like it is done in #3488?

Copy link
Author

@strowk strowk Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a certain benefit in keeping hardcoded value tho, it is a bit clearer what is going on when looking at docs:
https://golang.testcontainers.org/modules/kafka/#init-script

Docs basically take out snippets of code to include inside built static website. The way my PR does this, it looks like this:

image image

Plus I kind of doubt that increasing complexity of this script would make it better. There is some chance that entrypoint would change in the future Apache Kafka versions, but is this probability plus losses from users facing related problems higher than efforts of testcontainers maintainers understanding what is going on in that script-building logic?

Copy link
Contributor

@mabrarov mabrarov Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I can be too worried of (old) docker images being removed from Docker Hub (like the recent case with JDK images). It is the reason I tried to make #3488 a little bit more forward-compatible. Considering documentation code snippet, I suppose you (@strowk) is the best decision maker for this comment thread.

Thank you for clarification.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll keep this open until there is maintainer to weigh-in..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenh , @mdelapenya , do you guys have any opinion on this? Should we try to derive the executable from inspecting the image or just keep them hardcoded until/if they change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mabrarov to confirm my understanding you're suggesting using the detail from inspect to identify the location of binary to exec, so that if that changed in the future it would remain compatible?

Copy link
Contributor

@mabrarov mabrarov Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @stevenh,

Yes, your understanding is correct (e.g. copyStarterScript in modules/kafka/kafka.go from #3488), but I am not sure if it matches the way Testcontainers for Go prefer (e.g. getDockerProvider from #3488 can be considered an issue making Testcontainers less decoupled with Docker).

Thank you.

Copy link
Author

@strowk strowk Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, so basically my feeling about this was that I am not sure if this technique would work reliably enough, i.e it won't break in some setups that would make resulting code less stable, not more, i.e trade off is extra complexity (and chance of bug) vs likeliness of ENTRYPOINT or CMD becoming different in one of flavors (or maybe users providing their own images without their own starter scripts.. in which case inspection might help too).

// }
)

// KafkaContainer represents the Kafka container type used in the module
Expand Down Expand Up @@ -78,7 +85,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c); err != nil {
if err := copyStarterScript(ctx, img, c); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

Expand Down Expand Up @@ -122,7 +129,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
}

// copyStarterScript copies the starter script into the container.
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
func copyStarterScript(ctx context.Context, img string, c testcontainers.Container) error {
if err := wait.ForMappedPort(publicPort).
WaitUntilReady(ctx, c); err != nil {
return fmt.Errorf("wait for mapped port: %w", err)
Expand All @@ -140,7 +147,7 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {

hostname := inspect.Config.Hostname

scriptContent := fmt.Sprintf(starterScriptContent, endpoint, hostname)
scriptContent := fmt.Sprintf(getStarterScriptContent(img), endpoint, hostname)

if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
return fmt.Errorf("copy to container: %w", err)
Expand Down Expand Up @@ -200,7 +207,7 @@ func validateKRaftVersion(fqName string) error {
image := fqName[:strings.LastIndex(fqName, ":")]
version := fqName[strings.LastIndex(fqName, ":")+1:]

if !strings.EqualFold(image, "confluentinc/confluent-local") {
if !isConfluentinc(image) {
// do not validate if the image is not the official one.
// not raising an error here, letting the image start and
// eventually evaluate an error if it exists.
Expand Down
30 changes: 28 additions & 2 deletions modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/testcontainers/testcontainers-go/modules/kafka"
)

func TestKafka(t *testing.T) {
func testFor(image string, t *testing.T) {
topic := "some-topic"

ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0", kafka.WithClusterID("kraftCluster"))
kafkaContainer, err := kafka.Run(ctx, image, kafka.WithClusterID("kraftCluster"))
testcontainers.CleanupContainer(t, kafkaContainer)
require.NoError(t, err)

Expand Down Expand Up @@ -66,6 +66,32 @@ func TestKafka(t *testing.T) {
require.Truef(t, strings.EqualFold(string(consumer.message.Value), "value"), "expected value to be %s, got %s", "value", string(consumer.message.Value))
}

func TestKafka(t *testing.T) {
testCases := []struct {
name string
image string
}{
{
name: "confluentinc",
image: "confluentinc/confluent-local:7.5.0",
},
{
name: "apache native",
image: "apache/kafka-native:4.0.1",
},
{
name: "apache not-native",
image: "apache/kafka:4.0.1",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testFor(tc.image, t)
})
}
}

func TestKafka_invalidVersion(t *testing.T) {
ctx := context.Background()

Expand Down
29 changes: 29 additions & 0 deletions modules/kafka/version.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: align filename with behaviour of the methods, type, flavour or other depending on the outcome of the other comments.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka

import "strings"

const (
apacheKafkaImagePrefix = "apache/kafka"
confluentincImagePrefix = "confluentinc/"
dockerIoPrefix = "docker.io/"
)

func isApache(image string) bool {
return strings.HasPrefix(image, apacheKafkaImagePrefix) || strings.HasPrefix(image, dockerIoPrefix+apacheKafkaImagePrefix)
}

func isConfluentinc(image string) bool {
return strings.HasPrefix(image, confluentincImagePrefix) || strings.HasPrefix(image, dockerIoPrefix+confluentincImagePrefix)
}

func getStarterScriptContent(image string) string {
if isApache(image) {
return apacheStarterScriptContent
} else if isConfluentinc(image) {
return confluentincStarterScriptContent
} else {
// Default to confluentinc for backward compatibility
// in situations when image was custom specified based on confluentinc
return confluentincStarterScriptContent
}
}
Loading