11#!/usr/bin/env python
22
33from confluent_kafka import Producer , KafkaError , KafkaException
4- import threading , time , queue
4+ import threading
5+ import time
6+ try :
7+ from queue import Queue , Empty
8+ except :
9+ from Queue import Queue , Empty
510
611
712class IntendedException (Exception ):
813 pass
914
10- def thread_run (myid ,p ,q ):
11- def do_crash (err , msg ):
15+
16+ def thread_run (myid , p , q ):
17+ def do_crash (err , msg ):
1218 raise IntendedException ()
1319
1420 for i in range (1 , 3 ):
@@ -23,45 +29,41 @@ def do_crash (err, msg):
2329 except IntendedException :
2430 print (myid , "Intentional callback crash: ok" )
2531 continue
26- except :
27- raise
2832
2933 print (myid , 'Done' )
3034 q .put (myid )
3135
32-
3336
3437def test_thread_safety ():
3538 """ Basic thread safety tests. """
3639
37- q = queue . Queue ()
38- p = Producer ({'socket.timeout.ms' :10 ,
40+ q = Queue ()
41+ p = Producer ({'socket.timeout.ms' : 10 ,
3942 'socket.blocking.max.ms' : 10 ,
4043 'default.topic.config' : {'message.timeout.ms' : 10 }})
4144
4245 threads = list ()
4346 for i in range (1 , 5 ):
44- thr = threading .Thread (target = thread_run , name = str (i ), args = [i ,p , q ])
47+ thr = threading .Thread (target = thread_run , name = str (i ), args = [i , p , q ])
4548 thr .start ()
4649 threads .append (thr )
4750
4851 for thr in threads :
4952 thr .join ()
5053
51-
5254 # Count the number of threads that exited cleanly
5355 cnt = 0
5456 try :
5557 for x in iter (q .get_nowait , None ):
5658 cnt += 1
57- except queue . Empty :
59+ except Empty :
5860 pass
5961
6062 if cnt != len (threads ):
6163 raise Exception ('Only %d/%d threads succeeded' % (cnt , len (threads )))
6264
6365 print ('Done' )
6466
65-
66- if __name__ == '__main__' :
67+
68+ if __name__ == '__main__' :
6769 test_thread_safety ()
0 commit comments