28
28
class QueueListener (object ):
29
29
_sentinel_item = None
30
30
31
- def __init__ (self , queue , * handlers ):
31
+ def __init__ (self , queue , * handlers , ** kwargs ):
32
32
self .queue = queue
33
+ self .queue_get_timeout = kwargs .get ("queue_get_timeout" , None )
33
34
self .handlers = handlers
34
35
self ._stop_nowait = threading .Event ()
35
36
self ._stop = threading .Event ()
36
37
self ._thread = None
37
38
38
39
def dequeue (self , block = True ):
39
40
"""Dequeue a record and return item."""
40
- return self .queue .get (block )
41
+ return self .queue .get (block , self . queue_get_timeout )
41
42
42
43
def start (self ):
43
44
"""Start the listener.
@@ -133,7 +134,7 @@ class ReportPortalServiceAsync(object):
133
134
def __init__ (self , endpoint , project , token , api_base = "api/v1" ,
134
135
error_handler = None , log_batch_size = 20 ,
135
136
is_skipped_an_issue = True ,
136
- verify_ssl = True ):
137
+ verify_ssl = True , queue_get_timeout = 5 ):
137
138
"""Init the service class.
138
139
139
140
Args:
@@ -160,7 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1",
160
161
"start_test_item" , "finish_test_item" , "log" ]
161
162
162
163
self .queue = queue .Queue ()
163
- self .listener = QueueListener (self .queue , self .process_item )
164
+ self .listener = QueueListener (self .queue , self .process_item ,
165
+ queue_get_timeout = queue_get_timeout )
164
166
self .listener .start ()
165
167
self .lock = threading .Lock ()
166
168
0 commit comments