Skip to content

Commit 7e3c719

Browse files
committed
Introduce server base class for extending
Also server can now handle shutting down while handling other messages, if a quit message is received.
1 parent 9f034c7 commit 7e3c719

File tree

1 file changed

+49
-33
lines changed

1 file changed

+49
-33
lines changed

hyperion/lib/networking/server.py

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -43,25 +43,65 @@ def recvall(connection, n):
4343
return data
4444

4545

46-
class Server:
47-
def __init__(self, port, cc):
48-
self.port = port
46+
class BaseServer:
47+
"""Base class for servers."""
48+
def __init__(self):
49+
self.port = None
4950
self.sel = selectors.DefaultSelector()
5051
self.keep_running = True
51-
self.cc = cc # type: hyperion.ControlCenter
5252
self.logger = logging.getLogger(__name__)
5353
self.send_queues = {}
5454
self.event_queue = queue.Queue()
55-
self.cc.add_subscriber(self.event_queue)
56-
5755
signal(SIGINT, self._handle_sigint)
5856

57+
def accept(self, sock, mask):
58+
"""Callback for new connections"""
59+
new_connection, addr = sock.accept()
60+
self.logger.debug('accept({})'.format(addr))
61+
new_connection.setblocking(False)
62+
self.send_queues[new_connection] = queue.Queue()
63+
self.sel.register(new_connection, selectors.EVENT_READ | selectors.EVENT_WRITE)
64+
65+
def interpret_message(self, action, args, connection):
66+
raise NotImplementedError
67+
68+
def write(self, connection):
69+
"""Callback for write events"""
70+
send_queue = self.send_queues.get(connection)
71+
if send_queue and not send_queue.empty() and self.keep_running:
72+
# Messages available
73+
next_msg = send_queue.get()
74+
try:
75+
connection.sendall(next_msg)
76+
except socket.error as err:
77+
self.logger.error("Error while writing message to socket: %s" % err)
78+
79+
def read(self, connection):
80+
raise NotImplementedError
81+
82+
def _handle_sigint(self, signum, frame):
83+
self.logger.debug("Received C-c")
84+
self._quit()
85+
86+
def _quit(self):
87+
self.send_queues = {}
88+
89+
self.keep_running = False
90+
91+
92+
class Server(BaseServer):
93+
def __init__(self, port, cc):
94+
BaseServer.__init__(self)
95+
self.port = port
96+
self.cc = cc # type: hyperion.ControlCenter
97+
self.cc.add_subscriber(self.event_queue)
98+
5999
server_address = ('', port)
60-
self.logger.debug("Starting server on localhost:%s" % port)
61100
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
62101
server.setblocking(False)
63102
try:
64103
server.bind(server_address)
104+
self.logger.debug("Starting server on localhost:%s" % server.getsockname()[1])
65105
except socket.error as e:
66106
if e.errno == 98:
67107
self.logger.critical("Server adress is already in use! Try waiting a few seconds if you are sure there"
@@ -95,7 +135,7 @@ def __init__(self, port, cc):
95135
while self.keep_running:
96136
for key, mask in self.sel.select(timeout=1):
97137
connection = key.fileobj
98-
if key.data:
138+
if key.data and self.keep_running:
99139
callback = key.data
100140
callback(connection, mask)
101141

@@ -110,14 +150,6 @@ def __init__(self, port, cc):
110150
print('shutting down')
111151
self.sel.close()
112152

113-
def write(self, connection):
114-
"""Callback for write events"""
115-
send_queue = self.send_queues.get(connection)
116-
if send_queue and not send_queue.empty():
117-
# Messages available
118-
next_msg = send_queue.get()
119-
connection.sendall(next_msg)
120-
121153
def read(self, connection):
122154
"""Callback for read events"""
123155
try:
@@ -133,7 +165,7 @@ def read(self, connection):
133165

134166
if action == 'quit':
135167
worker.join()
136-
sys.exit(0)
168+
self._quit()
137169
else:
138170
# Handle uncontrolled connection loss
139171
self.send_queues.pop(connection)
@@ -147,14 +179,6 @@ def read(self, connection):
147179
self.sel.unregister(connection)
148180
connection.close()
149181

150-
def accept(self, sock, mask):
151-
"Callback for new connections"
152-
new_connection, addr = sock.accept()
153-
print('accept({})'.format(addr))
154-
new_connection.setblocking(False)
155-
self.send_queues[new_connection] = queue.Queue()
156-
self.sel.register(new_connection, selectors.EVENT_READ | selectors.EVENT_WRITE)
157-
158182
def interpret_message(self, action, args, connection):
159183
self.logger.debug("Action: %s, args: %s" % (action, args))
160184
func = self.function_mapping.get(action)
@@ -223,10 +247,6 @@ def _stop_component_wrapper(self, comp_id):
223247
except exceptions.ComponentNotFoundException as e:
224248
self.logger.error(e.message)
225249

226-
def _shutdown(self):
227-
self.keep_running = False
228-
self.cc.cleanup(True)
229-
230250
def _send_config(self):
231251
return self.cc.config
232252

@@ -238,7 +258,3 @@ def _send_host_list(self):
238258
else:
239259
lst[key] = False
240260
return lst
241-
242-
def _handle_sigint(self, signum, frame):
243-
self.keep_running = False
244-
self.cc.cleanup(True)

0 commit comments

Comments
 (0)