Skip to content

Commit df32be0

Browse files
author
paskozdilar
committed
Add context manager functionality to Server
1 parent cdbfe63 commit df32be0

File tree

2 files changed

+92
-5
lines changed

2 files changed

+92
-5
lines changed

tests/test_rpc.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,31 @@ def run_client():
147147
multiprocessing.Process(target=run_client, daemon=True).start()
148148

149149
assert rpc_event.wait(5)
150+
151+
152+
def test_multiprocessing():
153+
"""
154+
Test whether ZRPC works in multiprocessing environment.
155+
"""
156+
rpc_event = multiprocessing.Event()
157+
158+
with tempfile.TemporaryDirectory() as tempdir:
159+
160+
class MockServer(Server):
161+
@rpc_method
162+
def mock_method(self):
163+
return {'success': True}
164+
165+
def run_server():
166+
MockServer(socket_dir=tempdir).run()
167+
168+
def run_client():
169+
mock_client = Client(socket_dir=tempdir)
170+
response = mock_client.call('mock_server', 'mock_method')
171+
if response['success']:
172+
rpc_event.set()
173+
174+
multiprocessing.Process(target=run_server, daemon=True).start()
175+
multiprocessing.Process(target=run_client, daemon=True).start()
176+
177+
assert rpc_event.wait(1)

zrpc/server.py

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,20 @@
33
44
Subclass the `Server` class and use the `rpc_method` decorator to make the
55
method available to ZRPC clients.
6+
7+
User API:
8+
9+
Server:
10+
|- start()
11+
|- stop()
12+
|
13+
|- register(fd, callback)
14+
|- unregister(fd)
15+
|
16+
|- run()
17+
|- run_once(timeout=None)
18+
19+
rpc_method: <decorator for Server methods>
620
"""
721

822

@@ -52,6 +66,35 @@ def __init__(self, name=None, socket_dir=None):
5266

5367
socket_dir = os.path.abspath(socket_dir or '/tmp/zrpc_sockets')
5468

69+
self._name = name
70+
self._context = None
71+
self._socket_dir = socket_dir
72+
self._socket_path = None
73+
self._socket = None
74+
self._poller = None
75+
76+
self._cache = None
77+
self._fd_callbacks = None
78+
79+
self._started = False
80+
81+
def __del__(self):
82+
self.stop()
83+
84+
def __enter__(self):
85+
self.start()
86+
return self
87+
88+
def __exit__(self, exc_type, exc_value, exc_traceback):
89+
self.stop()
90+
91+
def start(self):
92+
if self._started:
93+
return
94+
95+
name = self._name
96+
socket_dir = self._socket_dir
97+
5598
context = zmq.Context.instance()
5699
socket = context.socket(zmq.REP)
57100
poller = zmq.Poller()
@@ -87,12 +130,18 @@ def __init__(self, name=None, socket_dir=None):
87130

88131
self._cache = _RPCCache(maxsize=10)
89132
self._fd_callbacks = {}
133+
self._started = True
90134

91-
def __del__(self):
135+
def stop(self):
92136
try:
137+
if not self._started:
138+
return
139+
self._context.term()
93140
os.unlink(self._socket_path)
94141
except (OSError, AttributeError):
95142
pass
143+
finally:
144+
self._started = False
96145

97146
def register(self, fd, callback):
98147
"""
@@ -119,20 +168,30 @@ def unregister(self, fd):
119168
def run(self):
120169
""" Run service forever. """
121170
self._logger.info('Running "{}" forever...'.format(self._name))
122-
while True:
123-
self.run_once()
171+
if self._started:
172+
self._logger.info('STARTEd')
173+
while True:
174+
self.run_once()
175+
else:
176+
self._logger.info('NOT STARTEd')
177+
with self:
178+
while True:
179+
self.run_once()
124180

125181
def run_once(self, timeout=None):
126182
""" Run service once (process single event or wait for timeout) """
183+
if not self._started:
184+
raise RuntimeError('Server not started')
185+
127186
if timeout is not None:
128187
timeout = int(1000 * timeout)
129188

130189
socket = self._socket
131190
poller = self._poller
132191

133-
self._logger.debug('Polling for requests...')
192+
self._logger.info('Polling for requests...')
134193
ready_sockets = dict(poller.poll(timeout=timeout))
135-
self._logger.debug('Ready_sockets: {}'.format(ready_sockets))
194+
self._logger.info('Ready_sockets: {}'.format(ready_sockets))
136195

137196
for ready_socket in ready_sockets:
138197
if ready_socket is socket:

0 commit comments

Comments
 (0)