33import gc
44import docker as libdocker
55import logging
6+ import os
67import pytest
78import socket
89import uuid
910import sys
1011import pathlib
1112import shutil
12- import subprocess
1313
1414from aiokafka .record .legacy_records import (
1515 LegacyRecordBatchBuilder , _LegacyRecordBatchBuilderPy )
@@ -94,30 +94,66 @@ def kerberos_utils():
9494 return
9595
9696
97+ if sys .platform != 'win32' :
98+
99+ @pytest .fixture (scope = 'session' )
100+ def kafka_image (request , docker ):
101+ image = request .config .getoption ('--docker-image' )
102+ if not image :
103+ pytest .skip (
104+ "Skipping functional test as `--docker-image` not provided" )
105+ return
106+ if not request .config .getoption ('--no-pull' ):
107+ docker .images .pull (image )
108+ return image
109+
110+ else :
111+
112+ @pytest .fixture (scope = 'session' )
113+ def kafka_image ():
114+ pytest .skip ("Only unit tests on windows for now =(" )
115+
116+
97117@pytest .fixture (scope = 'session' )
98- def ssl_folder (docker_ip_address ):
118+ def ssl_folder (docker_ip_address , docker , kafka_image ):
99119 ssl_dir = pathlib .Path ('tests/ssl_cert' )
100120 if ssl_dir .exists ():
101121 shutil .rmtree (str (ssl_dir ))
102122
103123 ssl_dir .mkdir ()
104- p = subprocess .Popen (
105- f"bash ../../gen-ssl-certs.sh ca ca-cert { docker_ip_address } " ,
106- shell = True , stdout = subprocess .DEVNULL ,
107- cwd = str (ssl_dir ), stderr = subprocess .DEVNULL )
108- p .wait ()
109- p = subprocess .Popen (
110- "bash ../../gen-ssl-certs.sh -k server ca-cert br_ {}" .format (
111- docker_ip_address ),
112- shell = True , stdout = subprocess .DEVNULL ,
113- cwd = str (ssl_dir ), stderr = subprocess .DEVNULL ,)
114- p .wait ()
115- p = subprocess .Popen (
116- "bash ../../gen-ssl-certs.sh client ca-cert cl_ {}" .format (
117- docker_ip_address ),
118- shell = True , stdout = subprocess .DEVNULL ,
119- cwd = str (ssl_dir ), stderr = subprocess .DEVNULL ,)
120- p .wait ()
124+
125+ container = docker .containers .run (
126+ image = kafka_image ,
127+ command = "sleep 120" ,
128+ volumes = {
129+ pathlib .Path ("gen-ssl-certs.sh" ).resolve (): {
130+ "bind" : "/gen-ssl-certs.sh" ,
131+ },
132+ str (ssl_dir .resolve ()): {
133+ "bind" : "/ssl_cert" ,
134+ },
135+ },
136+ working_dir = "/ssl_cert" ,
137+ tty = True ,
138+ detach = True ,
139+ remove = True )
140+
141+ try :
142+ for args in [
143+ ["ca" , "ca-cert" , docker_ip_address ],
144+ ["-k" , "server" , "ca-cert" , "br_" , docker_ip_address ],
145+ ["client" , "ca-cert" , "cl_" , docker_ip_address ],
146+ ]:
147+ exit_code , output = container .exec_run (
148+ ["bash" , "/gen-ssl-certs.sh" ] + args ,
149+ user = f"{ os .getuid ()} :{ os .getgid ()} "
150+ )
151+ if exit_code != 0 :
152+ print (output .decode (), file = sys .stderr )
153+ pytest .exit ("Could not generate certificates" )
154+
155+ finally :
156+ container .stop ()
121157
122158 return ssl_dir
123159
@@ -159,15 +195,8 @@ def hosts(self):
159195if sys .platform != 'win32' :
160196
161197 @pytest .fixture (scope = 'session' )
162- def kafka_server (request , docker , docker_ip_address ,
198+ def kafka_server (kafka_image , docker , docker_ip_address ,
163199 unused_port , session_id , ssl_folder ):
164- image = request .config .getoption ('--docker-image' )
165- if not image :
166- pytest .skip (
167- "Skipping functional test as `--docker-image` not provided" )
168- return
169- if not request .config .getoption ('--no-pull' ):
170- docker .images .pull (image )
171200 kafka_host = docker_ip_address
172201 kafka_port = unused_port ()
173202 kafka_ssl_port = unused_port ()
@@ -181,7 +210,7 @@ def kafka_server(request, docker, docker_ip_address,
181210 'ADVERTISED_SASL_SSL_PORT' : kafka_sasl_ssl_port ,
182211 'NUM_PARTITIONS' : 2
183212 }
184- kafka_version = image .split (":" )[- 1 ].split ("_" )[- 1 ]
213+ kafka_version = kafka_image .split (":" )[- 1 ].split ("_" )[- 1 ]
185214 kafka_version = tuple (int (x ) for x in kafka_version .split ('.' ))
186215 if kafka_version >= (0 , 10 , 2 ):
187216 environment ['SASL_MECHANISMS' ] = (
@@ -196,7 +225,7 @@ def kafka_server(request, docker, docker_ip_address,
196225 environment ['SASL_JAAS_FILE' ] = "kafka_server_gssapi_jaas.conf"
197226
198227 container = docker .containers .run (
199- image = image ,
228+ image = kafka_image ,
200229 name = 'aiokafka-tests' ,
201230 ports = {
202231 2181 : 2181 ,
@@ -220,11 +249,11 @@ def kafka_server(request, docker, docker_ip_address,
220249 try :
221250 if not wait_kafka (kafka_host , kafka_port ):
222251 exit_code , output = container .exec_run (
223- ["supervisorctl" , "tail" , "-10000 " , "kafka" ])
252+ ["supervisorctl" , "tail" , "-20000 " , "kafka" ])
224253 print ("Kafka failed to start. \n --- STDOUT:" )
225254 print (output .decode (), file = sys .stdout )
226255 exit_code , output = container .exec_run (
227- ["supervisorctl" , "tail" , "-10000 " , "kafka" , "stderr" ])
256+ ["supervisorctl" , "tail" , "-20000 " , "kafka" , "stderr" ])
228257 print ("--- STDERR:" )
229258 print (output .decode (), file = sys .stderr )
230259 pytest .exit ("Could not start Kafka Server" )
0 commit comments