4545except ImportError as e :
4646 with_progress = False
4747
48+ # Default test conf location
49+ testconf = "tests/testconf.json"
50+
4851# Kafka bootstrap server(s)
4952bootstrap_servers = None
5053
5154# Confluent schema-registry
5255schema_registry_url = None
5356
5457# Topic prefix to use
55- topic = 'test'
58+ topic = None
5659
5760# API version requests are only implemented in Kafka broker >=0.10
5861# but the client handles failed API version requests gracefully for older
@@ -374,6 +377,96 @@ def verify_avro():
374377 c .close ()
375378
376379
380+ def verify_avro_https ():
381+ from confluent_kafka import avro
382+ avsc_dir = os .path .join (os .path .dirname (__file__ ), os .pardir , 'tests' , 'avro' )
383+
384+ # Producer config
385+ conf = {'bootstrap.servers' : bootstrap_servers ,
386+ 'error_cb' : error_cb ,
387+ 'api.version.request' : api_version_request }
388+
389+ conf .update (testconf .get ('schema_registry_https' , {}))
390+
391+ p = avro .AvroProducer (conf )
392+
393+ prim_float = avro .load (os .path .join (avsc_dir , "primitive_float.avsc" ))
394+ prim_string = avro .load (os .path .join (avsc_dir , "primitive_string.avsc" ))
395+ basic = avro .load (os .path .join (avsc_dir , "basic_schema.avsc" ))
396+ str_value = 'abc'
397+ float_value = 32.0
398+
399+ combinations = [
400+ dict (key = float_value , key_schema = prim_float ),
401+ dict (value = float_value , value_schema = prim_float ),
402+ dict (key = {'name' : 'abc' }, key_schema = basic ),
403+ dict (value = {'name' : 'abc' }, value_schema = basic ),
404+ dict (value = {'name' : 'abc' }, value_schema = basic , key = float_value , key_schema = prim_float ),
405+ dict (value = {'name' : 'abc' }, value_schema = basic , key = str_value , key_schema = prim_string ),
406+ dict (value = float_value , value_schema = prim_float , key = {'name' : 'abc' }, key_schema = basic ),
407+ dict (value = float_value , value_schema = prim_float , key = str_value , key_schema = prim_string ),
408+ dict (value = str_value , value_schema = prim_string , key = {'name' : 'abc' }, key_schema = basic ),
409+ dict (value = str_value , value_schema = prim_string , key = float_value , key_schema = prim_float ),
410+ # Verify identity check allows Falsy object values(e.g., 0, empty string) to be handled properly (issue #342)
411+ dict (value = '' , value_schema = prim_string , key = 0.0 , key_schema = prim_float ),
412+ dict (value = 0.0 , value_schema = prim_float , key = '' , key_schema = prim_string ),
413+ ]
414+
415+ for i , combo in enumerate (combinations ):
416+ combo ['topic' ] = str (uuid .uuid4 ())
417+ combo ['headers' ] = [('index' , str (i ))]
418+ p .produce (** combo )
419+ p .flush ()
420+
421+ conf = {'bootstrap.servers' : bootstrap_servers ,
422+ 'group.id' : generate_group_id (),
423+ 'session.timeout.ms' : 6000 ,
424+ 'enable.auto.commit' : False ,
425+ 'api.version.request' : api_version_request ,
426+ 'on_commit' : print_commit_result ,
427+ 'error_cb' : error_cb ,
428+ 'default.topic.config' : {
429+ 'auto.offset.reset' : 'earliest'
430+ }}
431+
432+ conf .update (testconf .get ('schema_registry_https' , {}))
433+
434+ c = avro .AvroConsumer (conf )
435+ c .subscribe ([(t ['topic' ]) for t in combinations ])
436+
437+ msgcount = 0
438+ while msgcount < len (combinations ):
439+ msg = c .poll (0 )
440+
441+ if msg is None or msg .error ():
442+ continue
443+
444+ tstype , timestamp = msg .timestamp ()
445+ print ('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
446+ (msg .topic (), msg .partition (), msg .offset (),
447+ msg .key (), msg .value (), tstype , timestamp ))
448+
449+ # omit empty Avro fields from payload for comparison
450+ record_key = msg .key ()
451+ record_value = msg .value ()
452+ index = int (dict (msg .headers ())['index' ])
453+
454+ if isinstance (msg .key (), dict ):
455+ record_key = {k : v for k , v in msg .key ().items () if v is not None }
456+
457+ if isinstance (msg .value (), dict ):
458+ record_value = {k : v for k , v in msg .value ().items () if v is not None }
459+
460+ assert combinations [index ].get ('key' ) == record_key
461+ assert combinations [index ].get ('value' ) == record_value
462+
463+ c .commit ()
464+ msgcount += 1
465+
466+ # Close consumer
467+ c .close ()
468+
469+
377470def verify_producer_performance (with_dr_cb = True ):
378471 """ Time how long it takes to produce and delivery X messages """
379472 conf = {'bootstrap.servers' : bootstrap_servers ,
@@ -1125,7 +1218,7 @@ def verify_config(expconfig, configs):
11251218
11261219# Exclude throttle since from default list
11271220default_modes = ['consumer' , 'producer' , 'avro' , 'performance' , 'admin' ]
1128- all_modes = default_modes + ['throttle' , 'none' ]
1221+ all_modes = default_modes + ['throttle' , 'avro-https' , ' none' ]
11291222"""All test modes"""
11301223
11311224
@@ -1140,6 +1233,21 @@ def print_usage(exitcode, reason=None):
11401233 sys .exit (exitcode )
11411234
11421235
1236+ def generate_group_id ():
1237+ return str (uuid .uuid1 ())
1238+
1239+
1240+ def resolve_envs (_conf ):
1241+ """Resolve environment variables"""
1242+
1243+ for k , v in _conf .items ():
1244+ if isinstance (v , dict ):
1245+ resolve_envs (v )
1246+
1247+ if str (v ).startswith ('$' ):
1248+ _conf [k ] = os .getenv (v [1 :])
1249+
1250+
11431251if __name__ == '__main__' :
11441252 """Run test suites"""
11451253
@@ -1152,22 +1260,30 @@ def print_usage(exitcode, reason=None):
11521260 # Parse options
11531261 while len (sys .argv ) > 1 and sys .argv [1 ].startswith ('--' ):
11541262 opt = sys .argv .pop (1 )[2 :]
1263+
1264+ if opt == 'conf' :
1265+ testconf = sys .argv .pop (1 )
1266+ continue
1267+
11551268 if opt not in all_modes :
11561269 print_usage (1 , 'unknown option --' + opt )
11571270 modes .append (opt )
11581271
1159- if len (sys .argv ) > 1 :
1160- bootstrap_servers = sys .argv [1 ]
1161- if len (sys .argv ) > 2 :
1162- topic = sys .argv [2 ]
1163- if len (sys .argv ) > 3 :
1164- schema_registry_url = sys .argv [3 ]
1165- else :
1166- print_usage (1 )
1272+ with open (testconf ) as f :
1273+ testconf = json .load (f )
1274+ resolve_envs (testconf )
1275+
1276+ bootstrap_servers = testconf .get ('bootstrap.servers' , None )
1277+ topic = testconf .get ('topic' , None )
1278+ schema_registry_url = testconf .get ('schema.registry.url' , None )
11671279
11681280 if len (modes ) == 0 :
11691281 modes = default_modes
11701282
1283+ if bootstrap_servers is None or topic is None :
1284+ print_usage (1 , "Properties bootstrap.servers and topic must be set. "
1285+ "Use tests/testconf-example.json as a template when creating a new conf file." )
1286+
11711287 print ('Using confluent_kafka module version %s (0x%x)' % confluent_kafka .version ())
11721288 print ('Using librdkafka version %s (0x%x)' % confluent_kafka .libversion ())
11731289 print ('Testing: %s' % modes )
@@ -1213,6 +1329,10 @@ def print_usage(exitcode, reason=None):
12131329 print ('=' * 30 , 'Verifying AVRO' , '=' * 30 )
12141330 verify_avro ()
12151331
1332+ if 'avro-https' in modes :
1333+ print ('=' * 30 , 'Verifying AVRO with HTTPS' , '=' * 30 )
1334+ verify_avro_https ()
1335+
12161336 if 'admin' in modes :
12171337 print ('=' * 30 , 'Verifying Admin API' , '=' * 30 )
12181338 verify_admin ()
0 commit comments