Skip to content

Commit f9c2626

Browse files
committed
Added timeout to dequeue to properly raise queue.Empty instead of infinity hang
1 parent 6455069 commit f9c2626

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

reportportal_client/service_async.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,17 @@
2828
class QueueListener(object):
2929
_sentinel_item = None
3030

31-
def __init__(self, queue, *handlers):
31+
def __init__(self, queue, queue_get_timeout, *handlers):
3232
self.queue = queue
33+
self.queue_get_timeout = queue_get_timeout
3334
self.handlers = handlers
3435
self._stop_nowait = threading.Event()
3536
self._stop = threading.Event()
3637
self._thread = None
3738

38-
def dequeue(self, block=True):
39+
def dequeue(self, block=True, timeout=None):
3940
"""Dequeue a record and return item."""
40-
return self.queue.get(block)
41+
return self.queue.get(block, timeout)
4142

4243
def start(self):
4344
"""Start the listener.
@@ -82,7 +83,7 @@ def _monitor(self):
8283
has_task_done = hasattr(q, 'task_done')
8384
while not self._stop.isSet():
8485
try:
85-
record = self.dequeue(True)
86+
record = self.dequeue(True, self.queue_get_timeout)
8687
if record is self._sentinel_item:
8788
break
8889
self.handle(record)
@@ -133,7 +134,7 @@ class ReportPortalServiceAsync(object):
133134
def __init__(self, endpoint, project, token, api_base="api/v1",
134135
error_handler=None, log_batch_size=20,
135136
is_skipped_an_issue=True,
136-
verify_ssl=True):
137+
verify_ssl=True, queue_get_timeout=5):
137138
"""Init the service class.
138139
139140
Args:
@@ -160,7 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1",
160161
"start_test_item", "finish_test_item", "log"]
161162

162163
self.queue = queue.Queue()
163-
self.listener = QueueListener(self.queue, self.process_item)
164+
self.listener = QueueListener(self.queue, queue_get_timeout,
165+
self.process_item)
164166
self.listener.start()
165167
self.lock = threading.Lock()
166168

0 commit comments

Comments
 (0)