@@ -46,10 +46,13 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
4646
4747// Run creates an instance of the Kafka container type
4848func Run (ctx context.Context , img string , opts ... testcontainers.ContainerCustomizer ) (* KafkaContainer , error ) {
49- req := testcontainers.ContainerRequest {
50- Image : img ,
51- ExposedPorts : []string {string (publicPort )},
52- Env : map [string ]string {
49+ if err := validateKRaftVersion (img ); err != nil {
50+ return nil , err
51+ }
52+
53+ moduleOpts := []testcontainers.ContainerCustomizer {
54+ testcontainers .WithExposedPorts (string (publicPort )),
55+ testcontainers .WithEnv (map [string ]string {
5356 // envVars {
5457 "KAFKA_LISTENERS" : "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" ,
5558 "KAFKA_REST_BOOTSTRAP_SERVERS" : "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094" ,
@@ -66,56 +69,53 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
6669 "KAFKA_PROCESS_ROLES" : "broker,controller" ,
6770 "KAFKA_CONTROLLER_LISTENER_NAMES" : "CONTROLLER" ,
6871 // }
69- },
70- Entrypoint : [] string { "sh" } ,
72+ }) ,
73+ testcontainers . WithEntrypoint ( "sh" ) ,
7174 // this CMD will wait for the starter script to be copied into the container and then execute it
72- Cmd : []string {"-c" , "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript },
73- LifecycleHooks : []testcontainers.ContainerLifecycleHooks {
74- {
75- PostStarts : []testcontainers.ContainerHook {
76- // Use a single hook to copy the starter script and wait for
77- // the Kafka server to be ready. This prevents the wait running
78- // if the starter script fails to copy.
79- func (ctx context.Context , c testcontainers.Container ) error {
80- // 1. copy the starter script into the container
81- if err := copyStarterScript (ctx , c ); err != nil {
82- return fmt .Errorf ("copy starter script: %w" , err )
83- }
84-
85- // 2. wait for the Kafka server to be ready
86- return wait .ForLog (".*Transitioning from RECOVERY to RUNNING.*" ).AsRegexp ().WaitUntilReady (ctx , c )
87- },
75+ testcontainers .WithCmd ("-c" , "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript ),
76+ testcontainers .WithLifecycleHooks (testcontainers.ContainerLifecycleHooks {
77+ PostStarts : []testcontainers.ContainerHook {
78+ // Use a single hook to copy the starter script and wait for
79+ // the Kafka server to be ready. This prevents the wait running
80+ // if the starter script fails to copy.
81+ func (ctx context.Context , c testcontainers.Container ) error {
82+ // 1. copy the starter script into the container
83+ if err := copyStarterScript (ctx , c ); err != nil {
84+ return fmt .Errorf ("copy starter script: %w" , err )
85+ }
86+
87+ // 2. wait for the Kafka server to be ready
88+ return wait .ForLog (".*Transitioning from RECOVERY to RUNNING.*" ).AsRegexp ().WaitUntilReady (ctx , c )
8889 },
8990 },
90- },
91+ }) ,
9192 }
9293
93- genericContainerReq := testcontainers.GenericContainerRequest {
94- ContainerRequest : req ,
95- Started : true ,
96- }
94+ moduleOpts = append (moduleOpts , opts ... )
9795
98- for _ , opt := range opts {
99- if err := opt .Customize (& genericContainerReq ); err != nil {
100- return nil , err
101- }
102- }
96+ // configure the controller quorum voters after all the options have been applied
97+ moduleOpts = append (moduleOpts , configureControllerQuorumVoters ())
10398
104- err := validateKRaftVersion (genericContainerReq .Image )
99+ var c * KafkaContainer
100+ ctr , err := testcontainers .Run (ctx , img , moduleOpts ... )
101+ if ctr != nil {
102+ c = & KafkaContainer {Container : ctr }
103+ }
105104 if err != nil {
106- return nil , err
105+ return c , fmt . Errorf ( "run kafka: %w" , err )
107106 }
108107
109- configureControllerQuorumVoters (& genericContainerReq )
110-
111- container , err := testcontainers .GenericContainer (ctx , genericContainerReq )
112- var c * KafkaContainer
113- if container != nil {
114- c = & KafkaContainer {Container : container , ClusterID : genericContainerReq .Env ["CLUSTER_ID" ]}
108+ // Inspect the container to get the CLUSTER_ID environment variable
109+ inspect , err := ctr .Inspect (ctx )
110+ if err != nil {
111+ return c , fmt .Errorf ("inspect kafka: %w" , err )
115112 }
116113
117- if err != nil {
118- return c , fmt .Errorf ("generic container: %w" , err )
114+ for _ , env := range inspect .Config .Env {
115+ if v , ok := strings .CutPrefix (env , "CLUSTER_ID=" ); ok {
116+ c .ClusterID = v
117+ break
118+ }
119119 }
120120
121121 return c , nil
@@ -150,11 +150,9 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
150150}
151151
152152func WithClusterID (clusterID string ) testcontainers.CustomizeRequestOption {
153- return func (req * testcontainers.GenericContainerRequest ) error {
154- req .Env ["CLUSTER_ID" ] = clusterID
155-
156- return nil
157- }
153+ return testcontainers .WithEnv (map [string ]string {
154+ "CLUSTER_ID" : clusterID ,
155+ })
158156}
159157
160158// Brokers retrieves the broker connection strings from Kafka with only one entry,
@@ -168,24 +166,28 @@ func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) {
168166 return []string {endpoint }, nil
169167}
170168
171- // configureControllerQuorumVoters sets the quorum voters for the controller. For that, it will
172- // check if there are any network aliases defined for the container and use the first alias in the
173- // first network. Else, it will use localhost.
174- func configureControllerQuorumVoters (req * testcontainers.GenericContainerRequest ) {
175- if req .Env == nil {
176- req .Env = map [string ]string {}
177- }
169+ // configureControllerQuorumVoters returns an option that sets the quorum voters for the controller.
170+ // For that, it will check if there are any network aliases defined for the container and use the
171+ // first alias in the first network. Else, it will use localhost.
172+ func configureControllerQuorumVoters () testcontainers.CustomizeRequestOption {
173+ return func (req * testcontainers.GenericContainerRequest ) error {
174+ if req .Env == nil {
175+ req .Env = map [string ]string {}
176+ }
178177
179- if req .Env ["KAFKA_CONTROLLER_QUORUM_VOTERS" ] == "" {
180- host := "localhost"
181- if len (req .Networks ) > 0 {
182- nw := req .Networks [0 ]
183- if len (req .NetworkAliases [nw ]) > 0 {
184- host = req .NetworkAliases [nw ][0 ]
178+ if req .Env ["KAFKA_CONTROLLER_QUORUM_VOTERS" ] == "" {
179+ host := "localhost"
180+ if len (req .Networks ) > 0 {
181+ nw := req .Networks [0 ]
182+ if len (req .NetworkAliases [nw ]) > 0 {
183+ host = req .NetworkAliases [nw ][0 ]
184+ }
185185 }
186+
187+ req .Env ["KAFKA_CONTROLLER_QUORUM_VOTERS" ] = "1@" + host + ":9094"
186188 }
187189
188- req . Env [ "KAFKA_CONTROLLER_QUORUM_VOTERS" ] = fmt . Sprintf ( "1@%s:9094" , host )
190+ return nil
189191 }
190192 // }
191193}
@@ -197,8 +199,13 @@ func validateKRaftVersion(fqName string) error {
197199 return errors .New ("image cannot be empty" )
198200 }
199201
200- image := fqName [:strings .LastIndex (fqName , ":" )]
201- version := fqName [strings .LastIndex (fqName , ":" )+ 1 :]
202+ idx := strings .LastIndex (fqName , ":" )
203+ if idx == - 1 || idx == len (fqName )- 1 {
204+ return nil
205+ }
206+
207+ image := fqName [:idx ]
208+ version := fqName [idx + 1 :]
202209
203210 if ! strings .EqualFold (image , "confluentinc/confluent-local" ) {
204211 // do not validate if the image is not the official one.
0 commit comments