Skip to content

Commit ee5c1aa

Browse files
committed
feat(kafka): support of Apache Kafka Docker Hub images (apache/kafka and apache/kafka-native).
1 parent ec51c67 commit ee5c1aa

File tree

3 files changed

+276
-76
lines changed

3 files changed

+276
-76
lines changed

modules/kafka/examples_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import (
99
"github.com/testcontainers/testcontainers-go/modules/kafka"
1010
)
1111

12-
func ExampleRun() {
13-
// runKafkaContainer {
12+
func ExampleRun_confluentLocal() {
1413
ctx := context.Background()
1514

1615
kafkaContainer, err := kafka.Run(ctx,
@@ -26,7 +25,68 @@ func ExampleRun() {
2625
log.Printf("failed to start container: %s", err)
2726
return
2827
}
29-
// }
28+
29+
state, err := kafkaContainer.State(ctx)
30+
if err != nil {
31+
log.Printf("failed to get container state: %s", err)
32+
return
33+
}
34+
35+
fmt.Println(kafkaContainer.ClusterID)
36+
fmt.Println(state.Running)
37+
38+
// Output:
39+
// test-cluster
40+
// true
41+
}
42+
43+
func ExampleRun_apacheKafka() {
44+
ctx := context.Background()
45+
46+
kafkaContainer, err := kafka.Run(ctx,
47+
"apache/kafka:4.0.1",
48+
kafka.WithClusterID("test-cluster"),
49+
)
50+
defer func() {
51+
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
52+
log.Printf("failed to terminate container: %s", err)
53+
}
54+
}()
55+
if err != nil {
56+
log.Printf("failed to start container: %s", err)
57+
return
58+
}
59+
60+
state, err := kafkaContainer.State(ctx)
61+
if err != nil {
62+
log.Printf("failed to get container state: %s", err)
63+
return
64+
}
65+
66+
fmt.Println(kafkaContainer.ClusterID)
67+
fmt.Println(state.Running)
68+
69+
// Output:
70+
// test-cluster
71+
// true
72+
}
73+
74+
func ExampleRun_apacheKafkaNative() {
75+
ctx := context.Background()
76+
77+
kafkaContainer, err := kafka.Run(ctx,
78+
"apache/kafka-native:4.0.1",
79+
kafka.WithClusterID("test-cluster"),
80+
)
81+
defer func() {
82+
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
83+
log.Printf("failed to terminate container: %s", err)
84+
}
85+
}()
86+
if err != nil {
87+
log.Printf("failed to start container: %s", err)
88+
return
89+
}
3090

3191
state, err := kafkaContainer.State(ctx)
3292
if err != nil {

modules/kafka/kafka.go

Lines changed: 94 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,28 @@ import (
1212
"golang.org/x/mod/semver"
1313

1414
"github.com/testcontainers/testcontainers-go"
15+
"github.com/testcontainers/testcontainers-go/log"
1516
"github.com/testcontainers/testcontainers-go/wait"
1617
)
1718

18-
const publicPort = nat.Port("9093/tcp")
1919
const (
20-
starterScript = "/usr/sbin/testcontainers_start.sh"
21-
22-
// starterScript {
23-
starterScriptContent = `#!/bin/bash
24-
source /etc/confluent/docker/bash-config
25-
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092
26-
echo Starting Kafka KRaft mode
27-
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
28-
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
29-
echo '' > /etc/confluent/docker/ensure
30-
/etc/confluent/docker/configure
31-
/etc/confluent/docker/launch`
32-
// }
20+
controllerListenerLocalPort = 9094
21+
publicListenerLocalPort = 9093
22+
hostnameListenerLocalPort = 9092
23+
localhostListenerLocalPort = 9095
24+
starterScript = "/usr/sbin/testcontainers_start.sh"
25+
starterScriptContent = `#!/bin/bash
26+
export KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://%[2]s:%[3]d,BROKER://%[4]s:%[5]d,LOCALHOST://localhost:%[6]d'
27+
# For confluentinc/confluent-local image only
28+
if [ -d /etc/confluent/docker ]; then
29+
export KAFKA_REST_BOOTSTRAP_SERVERS="${KAFKA_LISTENERS}"
30+
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
31+
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
32+
echo '' > /etc/confluent/docker/ensure
33+
fi
34+
# Run original container entrypoint and command
35+
exec %[1]s
36+
`
3337
)
3438

3539
// KafkaContainer represents the Kafka container type used in the module
@@ -46,17 +50,30 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
4650

4751
// Run creates an instance of the Kafka container type
4852
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) {
53+
publicPort, err := nat.NewPort("tcp", strconv.Itoa(publicListenerLocalPort))
54+
if err != nil {
55+
return nil, fmt.Errorf("nat.NewPort: %w", err)
56+
}
57+
58+
dockerProvider, err := getDockerProvider(opts...)
59+
if err != nil {
60+
return nil, fmt.Errorf("getDockerProvider: %w", err)
61+
}
62+
4963
if err := validateKRaftVersion(img); err != nil {
5064
return nil, err
5165
}
5266

67+
kafkaListeners := fmt.Sprintf("PLAINTEXT://:%d,BROKER://:%d,CONTROLLER://:%d,LOCALHOST://localhost:%d",
68+
publicListenerLocalPort, hostnameListenerLocalPort, controllerListenerLocalPort, localhostListenerLocalPort)
69+
5370
moduleOpts := []testcontainers.ContainerCustomizer{
5471
testcontainers.WithExposedPorts(string(publicPort)),
5572
testcontainers.WithEnv(map[string]string{
5673
// envVars {
57-
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
58-
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
59-
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
74+
"KAFKA_LISTENERS": kafkaListeners,
75+
"KAFKA_REST_BOOTSTRAP_SERVERS": kafkaListeners,
76+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,LOCALHOST:PLAINTEXT",
6077
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
6178
"KAFKA_BROKER_ID": "1",
6279
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
@@ -72,15 +89,15 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
7289
}),
7390
testcontainers.WithEntrypoint("sh"),
7491
// this CMD will wait for the starter script to be copied into the container and then execute it
75-
testcontainers.WithCmd("-c", "while [ ! -f "+starterScript+" ]; do sleep 0.1; done; bash "+starterScript),
92+
testcontainers.WithCmd("-c", fmt.Sprintf("while [ ! -f %[1]q ]; do sleep 0.1; done; exec %[1]q", starterScript)),
7693
testcontainers.WithLifecycleHooks(testcontainers.ContainerLifecycleHooks{
7794
PostStarts: []testcontainers.ContainerHook{
7895
// Use a single hook to copy the starter script and wait for
7996
// the Kafka server to be ready. This prevents the wait running
8097
// if the starter script fails to copy.
8198
func(ctx context.Context, c testcontainers.Container) error {
8299
// 1. copy the starter script into the container
83-
if err := copyStarterScript(ctx, c); err != nil {
100+
if err := copyStarterScript(ctx, dockerProvider, c, publicPort); err != nil {
84101
return fmt.Errorf("copy starter script: %w", err)
85102
}
86103

@@ -122,15 +139,20 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
122139
}
123140

124141
// copyStarterScript copies the starter script into the container.
125-
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
142+
func copyStarterScript(ctx context.Context, dockerProvider *testcontainers.DockerProvider, c testcontainers.Container, publicPort nat.Port) error {
126143
if err := wait.ForMappedPort(publicPort).
127144
WaitUntilReady(ctx, c); err != nil {
128-
return fmt.Errorf("wait for mapped port: %w", err)
145+
return fmt.Errorf("wait for local port %s to be mapped: %w", publicPort, err)
129146
}
130147

131-
endpoint, err := c.PortEndpoint(ctx, publicPort, "PLAINTEXT")
148+
publicListenerMappedPort, err := c.MappedPort(ctx, publicPort)
132149
if err != nil {
133-
return fmt.Errorf("port endpoint: %w", err)
150+
return fmt.Errorf("get mapped port for local port %s: %w", publicPort, err)
151+
}
152+
153+
host, err := c.Host(ctx)
154+
if err != nil {
155+
return fmt.Errorf("host: %w", err)
134156
}
135157

136158
inspect, err := c.Inspect(ctx)
@@ -140,7 +162,24 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
140162

141163
hostname := inspect.Config.Hostname
142164

143-
scriptContent := fmt.Sprintf(starterScriptContent, endpoint, hostname)
165+
imageInspect, err := dockerProvider.Client().ImageInspect(ctx, inspect.Image)
166+
if err != nil {
167+
return fmt.Errorf("image inspect: %w", err)
168+
}
169+
containerCmdParts := append(imageInspect.Config.Entrypoint, imageInspect.Config.Cmd...) //nolint:gocritic // New variable is needed.
170+
for i, s := range containerCmdParts {
171+
containerCmdParts[i] = strconv.Quote(s)
172+
}
173+
containerCmd := strings.Join(containerCmdParts, " ")
174+
175+
scriptContent := fmt.Sprintf(starterScriptContent,
176+
containerCmd,
177+
host,
178+
publicListenerMappedPort.Int(),
179+
hostname,
180+
hostnameListenerLocalPort,
181+
localhostListenerLocalPort,
182+
)
144183

145184
if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
146185
return fmt.Errorf("copy to container: %w", err)
@@ -158,6 +197,11 @@ func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
158197
// Brokers retrieves the broker connection strings from Kafka with only one entry,
159198
// defined by the exposed public port.
160199
func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) {
200+
publicPort, err := nat.NewPort("tcp", strconv.Itoa(publicListenerLocalPort))
201+
if err != nil {
202+
return nil, fmt.Errorf("nat.NewPort: %w", err)
203+
}
204+
161205
endpoint, err := kc.PortEndpoint(ctx, publicPort, "")
162206
if err != nil {
163207
return nil, err
@@ -184,14 +228,39 @@ func configureControllerQuorumVoters() testcontainers.CustomizeRequestOption {
184228
}
185229
}
186230

187-
req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"] = "1@" + host + ":9094"
231+
req.Env["KAFKA_CONTROLLER_QUORUM_VOTERS"] = fmt.Sprintf("1@%s:%d", host, controllerListenerLocalPort)
188232
}
189233

190234
return nil
191235
}
192236
// }
193237
}
194238

239+
func getDockerProvider(opts ...testcontainers.ContainerCustomizer) (*testcontainers.DockerProvider, error) {
240+
// Use a dummy request to get the provider from options.
241+
var req testcontainers.GenericContainerRequest
242+
for _, opt := range opts {
243+
if err := opt.Customize(&req); err != nil {
244+
return nil, err
245+
}
246+
}
247+
248+
logging := req.Logger
249+
if logging == nil {
250+
logging = log.Default()
251+
}
252+
genericProvider, err := req.ProviderType.GetProvider(testcontainers.WithLogger(logging))
253+
if err != nil {
254+
return nil, fmt.Errorf("get provider: %w", err)
255+
}
256+
257+
if dockerProvider, ok := genericProvider.(*testcontainers.DockerProvider); ok {
258+
return dockerProvider, nil
259+
}
260+
261+
return nil, fmt.Errorf("unknown provider type: %T", genericProvider)
262+
}
263+
195264
// validateKRaftVersion validates if the image version is compatible with KRaft mode,
196265
// which is available since version 7.0.0.
197266
func validateKRaftVersion(fqName string) error {

0 commit comments

Comments
 (0)