-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
106 lines (85 loc) · 3.08 KB
/
server.py
File metadata and controls
106 lines (85 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
'''
A simple RPC server that uses RabbitMQ as a message broker.
'''
import logging
import socket
from dataclasses import dataclass, field
from functools import partial
from typing import Callable, Dict, NoReturn, Optional, TypeVar, overload
import dill
import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.connection import Parameters
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker
from pika.frame import Method
from retry import retry
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").setLevel(logging.WARN)
T = TypeVar('T')
@dataclass
class Server:
'''
A simple RPC server that uses RabbitMQ as a message broker.
'''
server_queue: str
connection_params: Parameters
server_name: Optional[str] = None
_methods: Dict[str, Callable] = field(default_factory=dict)
@overload
def register(self, method: str) -> Callable[[Callable[..., T]], Callable[..., T]]:
'''
Register a method to be called by the server.
Uses the given string as the RPC method name.
'''
...
@overload
def register(self, method: Callable[..., T]) -> Callable[..., T]:
'''
Register a method to be called by the server.
Uses the method name as the RPC method name.
'''
...
def register(self, method): # type: ignore
if isinstance(method, str):
return partial(self._register, method)
else:
return self._register(method.__name__, method)
def _register(self, name: str, method: Callable) -> Callable:
self._methods[name] = method
return method
@retry(socket.gaierror, delay=10, jitter=3)
@retry(ChannelClosedByBroker, delay=10, jitter=3)
@retry(AMQPConnectionError, delay=5, jitter=3)
def serve(self) -> NoReturn:
'''
Start the server and wait for requests.
'''
with pika.BlockingConnection(self.connection_params) as conn:
channel = conn.channel()
channel.queue_declare(
queue=self.server_queue, exclusive=True, auto_delete=True
)
channel.basic_consume(
self.server_queue,
self.on_server_rx_rpc_request,
consumer_tag=self.server_name,
)
logging.info("Ready, waiting on work on %s", self.server_queue)
channel.start_consuming()
def on_server_rx_rpc_request(
self,
ch: BlockingChannel,
method_frame: Method,
properties: Parameters,
_body: str,
) -> None:
body = dill.loads(_body)
logging.info("RPC Server got request: %s", body)
res = {"key": body["key"]}
try:
res["body"] = self._methods[body["method"]](*body["args"], **body["kwargs"])
except Exception as e:
logging.exception("Call to %s caused exception", body["method"])
res["exception"] = e
ch.basic_publish("", routing_key=properties.reply_to, body=dill.dumps(res))
ch.basic_ack(delivery_tag=method_frame.delivery_tag)