Skip to content

Commit 347606b

Browse files
authored
API worker for non-blocking HTTP requests
1 parent 6c3842f commit 347606b

File tree

4 files changed

+205
-4
lines changed

4 files changed

+205
-4
lines changed

reportportal_client/core/rp_requests.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ def __init__(self):
6767
self._priority = DEFAULT_PRIORITY
6868
self._response = None
6969

70-
def __ge__(self, other):
71-
"""Use for comparison when put in to PriorityQueue."""
72-
return self.priority >= other.priority
70+
def __lt__(self, other):
71+
"""Priority protocol for the PriorityQueue."""
72+
return self.priority < other.priority
7373

7474
@property
7575
def http_request(self):

reportportal_client/core/rp_requests.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class RPRequestBase(metaclass=AbstractBaseClass):
2424
_priority: Priority = ...
2525
_response: Optional[RPResponse] = ...
2626
def __init__(self) -> None: ...
27-
def __ge__(self, other: RPRequestBase) -> bool: ...
27+
def __lt__(self, other: RPRequestBase) -> bool: ...
2828
@property
2929
def http_request(self) -> HttpRequest: ...
3030
@http_request.setter

reportportal_client/core/worker.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""This module contains worker that makes non-blocking HTTP requests.
2+
3+
Copyright (c) 2018 http://reportportal.io .
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
"""
17+
18+
from enum import auto, Enum, unique
19+
import logging
20+
from threading import currentThread, Thread
21+
from queue import Empty
22+
23+
logger = logging.getLogger(__name__)
24+
logger.addHandler(logging.NullHandler())
25+
26+
27+
@unique
28+
class ControlCommand(Enum):
29+
"""This class stores worker control commands."""
30+
31+
CLEAR_QUEUE = auto()
32+
NOP = auto()
33+
REPORT_STATUS = auto()
34+
STOP = auto()
35+
STOP_IMMEDIATE = auto()
36+
37+
def is_stop_cmd(self):
38+
"""Verify if the command is the stop one."""
39+
return self in (ControlCommand.STOP, ControlCommand.STOP_IMMEDIATE)
40+
41+
42+
class APIWorker(object):
43+
"""Worker that makes non-blocking HTTP requests to the Report Portal."""
44+
45+
def __init__(self, cmd_queue, data_queue):
46+
"""Initialize instance attributes.
47+
48+
:param cmd_queue: Queue for the control commands
49+
:param data_queue: Queue for the RP requests to process
50+
"""
51+
self._cmd_queue = cmd_queue
52+
self._data_queue = data_queue
53+
self._thread = None
54+
self.name = self.__class__.__name__
55+
56+
def _command_get(self):
57+
"""Get control command from the control queue."""
58+
try:
59+
cmd = self._cmd_queue.get_nowait()
60+
logger.debug('[%s] Received {%s} command', self.name, cmd)
61+
return cmd
62+
except Empty:
63+
return None
64+
65+
def _command_process(self, cmd):
66+
"""Process control command sent to the worker.
67+
68+
:param cmd: ControlCommand to be processed
69+
"""
70+
if not cmd:
71+
return # No command received
72+
73+
logger.debug('[%s] Processing {%s} command', self.name, cmd)
74+
if cmd == ControlCommand.REPORT_STATUS:
75+
logger.debug('[%s] Current status for tasks is: {%s} unfinished',
76+
self.name, self._data_queue.unfinished_tasks)
77+
78+
if cmd == ControlCommand.STOP:
79+
request = self._request_get()
80+
while request is not None:
81+
self._request_process(request)
82+
request = self._request_get()
83+
84+
if cmd.is_stop_cmd():
85+
self._stop()
86+
87+
def _monitor(self):
88+
"""Monitor worker queues and process them.
89+
90+
This method runs on a separate, internal thread. The thread will
91+
terminate if the stop_immediate control command is received. If
92+
the stop control command is sent, the worker will process all the
93+
items from the queue before terminate.
94+
"""
95+
while True:
96+
cmd = self._command_get()
97+
self._command_process(cmd)
98+
99+
if cmd and cmd.is_stop_cmd():
100+
logger.debug('[%s] Exiting due to {%s} command',
101+
self.name, cmd)
102+
break
103+
104+
request = self._request_get()
105+
self._request_process(request)
106+
107+
def _request_get(self):
108+
"""Get response object from the data queue."""
109+
try:
110+
_, request = self._data_queue.get_nowait()
111+
logger.debug('[%s] Received {%s} request', self.name, request)
112+
return request
113+
except Empty:
114+
return None
115+
116+
def _request_process(self, request):
117+
"""Send request to RP and update response attribute of the request."""
118+
if not request:
119+
return # No request received
120+
121+
logger.debug('[%s] Processing {%s} request', self.name, request)
122+
request.response = request.http_request.make()
123+
self._data_queue.task_done()
124+
125+
def _stop(self):
126+
"""Routine that stops the worker thread(s).
127+
128+
This asks the thread to terminate, and then waits for it to do so.
129+
Note that if you don't call this before your application exits, there
130+
may be some records still left on the queue, which won't be processed.
131+
"""
132+
if self._thread.isAlive() and self._thread is not currentThread():
133+
self._thread.join()
134+
self._thread = None
135+
136+
def send_command(self, cmd):
137+
"""Send control command to the worker queue."""
138+
self._cmd_queue.put(cmd)
139+
140+
def send_request(self, request):
141+
"""Send a request to the worker queue.
142+
143+
:param request: RPRequest object
144+
"""
145+
self._data_queue.put(request)
146+
147+
def start(self):
148+
"""Start the worker.
149+
150+
This starts up a background thread to monitor the queue for
151+
requests to process.
152+
"""
153+
self._thread = Thread(target=self._monitor)
154+
self._thread.setDaemon(True)
155+
self._thread.start()
156+
157+
def stop(self):
158+
"""Stop the worker.
159+
160+
Send the appropriate control command to the worker.
161+
"""
162+
self.send_command(ControlCommand.STOP)
163+
164+
def stop_immediate(self):
165+
"""Stop the worker immediately.
166+
167+
Send the appropriate control command to the worker.
168+
"""
169+
self.send_command(ControlCommand.STOP_IMMEDIATE)

reportportal_client/core/worker.pyi

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from enum import Enum
2+
from logging import Logger
3+
from queue import PriorityQueue, Queue
4+
from reportportal_client.core.rp_requests import RPRequest as RPRequest
5+
from typing import Any, Optional, Text
6+
7+
logger: Logger
8+
9+
class ControlCommand(Enum):
10+
CLEAR_QUEUE: Any = ...
11+
NOP: Any = ...
12+
REPORT_STATUS: Any = ...
13+
STOP: Any = ...
14+
STOP_IMMEDIATE: Any = ...
15+
def is_stop_cmd(self) -> bool: ...
16+
17+
class APIWorker:
18+
_cmd_queue: Queue = ...
19+
_data_queue: PriorityQueue = ...
20+
name: Text = ...
21+
def __init__(self, cmd_queue: Queue, data_queue: PriorityQueue) -> None: ...
22+
def _command_get(self) -> Optional[ControlCommand]: ...
23+
def _command_process(self, cmd: Optional[ControlCommand]) -> None: ...
24+
def _monitor(self) -> None: ...
25+
def _request_get(self) -> Optional[RPRequest]: ...
26+
def _request_process(self, request: Optional[RPRequest]) -> None: ...
27+
def _stop(self) -> None: ...
28+
def send_command(self, cmd: ControlCommand) -> Any: ...
29+
def send_request(self, request: RPRequest) -> Any: ...
30+
def start(self) -> None: ...
31+
def stop(self) -> None: ...
32+
def stop_immediate(self) -> None: ...

0 commit comments

Comments
 (0)