|
28 | 28 | class QueueListener(object):
|
29 | 29 | _sentinel_item = None
|
30 | 30 |
|
31 |
| - def __init__(self, queue, queue_get_timeout, *handlers): |
| 31 | + def __init__(self, queue, *handlers, **kwargs): |
32 | 32 | self.queue = queue
|
33 |
| - self.queue_get_timeout = queue_get_timeout |
| 33 | + self.queue_get_timeout = kwargs.get("queue_get_timeout", None) |
34 | 34 | self.handlers = handlers
|
35 | 35 | self._stop_nowait = threading.Event()
|
36 | 36 | self._stop = threading.Event()
|
37 | 37 | self._thread = None
|
38 | 38 |
|
39 |
| - def dequeue(self, block=True, timeout=None): |
| 39 | + def dequeue(self, block=True): |
40 | 40 | """Dequeue a record and return item."""
|
41 |
| - return self.queue.get(block, timeout) |
| 41 | + return self.queue.get(block, self.queue_get_timeout) |
42 | 42 |
|
43 | 43 | def start(self):
|
44 | 44 | """Start the listener.
|
@@ -83,7 +83,7 @@ def _monitor(self):
|
83 | 83 | has_task_done = hasattr(q, 'task_done')
|
84 | 84 | while not self._stop.isSet():
|
85 | 85 | try:
|
86 |
| - record = self.dequeue(True, self.queue_get_timeout) |
| 86 | + record = self.dequeue(True) |
87 | 87 | if record is self._sentinel_item:
|
88 | 88 | break
|
89 | 89 | self.handle(record)
|
@@ -161,8 +161,8 @@ def __init__(self, endpoint, project, token, api_base="api/v1",
|
161 | 161 | "start_test_item", "finish_test_item", "log"]
|
162 | 162 |
|
163 | 163 | self.queue = queue.Queue()
|
164 |
| - self.listener = QueueListener(self.queue, queue_get_timeout, |
165 |
| - self.process_item) |
| 164 | + self.listener = QueueListener(self.queue, self.process_item, |
| 165 | + queue_get_timeout=queue_get_timeout) |
166 | 166 | self.listener.start()
|
167 | 167 | self.lock = threading.Lock()
|
168 | 168 |
|
|
0 commit comments