Skip to content

Commit 3846c16

Browse files
committed
Alpha release!
In this commit many things happened, but due to the major change of the structure I just squashed everything to one feature commit. First of all the server/client model was updated according to #24: slave instances now run as a limited version of a master server, keeping track of their own local components. The master server starts a new slave managing socket server, that listens for new connections of slave servers and sends and receives serialized commands over the socket connection. The logging model has been changed fundamentally too (#26). The demo example with start all and an opened interactive cli gui generates the following structure: /tmp/Hyperion/log/ ├── hyperion-vm │   ├── client │   ├── component │   │   ├── nav@hyperion-vm │   │   │   └── latest.log │   │   └── top@hyperion-vm │   │   └── latest.log │   ├── server │   ├── slave │   │   └── Hyperion-DEMO.log │   └── standalone ├── localhost │   ├── client │   │   └── 13-39-08.log │   ├── component │   │   ├── roscore@localhost │   │   │   └── latest.log │   │   ├── xclock@localhost │   │   │   └── latest.log │   │   └── xeyes@localhost │   │   └── latest.log │   ├── server │   │   └── Hyperion-DEMO.log │   ├── slave │   └── standalone └── remote └── slave └── Hyperion-DEMO@hyperion-vm_13-38-57.log The log directory holds 3 types of directory on the first level: 'localhost' for commits from locally started components, 'remote' for remote logs that are copied over and 'HOSTNAME' which is an sshfs mount pointing to a slave machines /tmp/log/Hyperion/localhost directory. Inside the localhost directory are directories for each (non-trivial) run mode and a directory for component logs. In summary this commit provides all initially planned functionality for the interactive gui. The PyQt gui will be updated soon and a web ui is planned too. From here on the focus is shifted from new feature developement to bringing more stability. Fixes #27, #26, #23, #24
1 parent 7e3c719 commit 3846c16

File tree

11 files changed

+1113
-569
lines changed

11 files changed

+1113
-569
lines changed

hyperion/data/default-logger.config

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,7 @@ format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
1111
format=%(asctime)s: %(name)s %(funcName)20s() [%(levelname)s]: %(message)s
1212

1313
[handlers]
14-
keys=file,info_file,screen
15-
16-
[handler_file]
17-
class=handlers.RotatingFileHandler
18-
backupCount=50
19-
formatter=complex
20-
level=DEBUG
21-
args=('/tmp/Hyperion/log/debug.log', 'w')
22-
23-
[handler_info_file]
24-
class=handlers.RotatingFileHandler
25-
backupCount=5
26-
formatter=simple
27-
level=INFO
28-
args=('/tmp/Hyperion/log/info.log', 'w')
14+
keys=screen
2915

3016
[handler_screen]
3117
class=StreamHandler
@@ -35,4 +21,4 @@ args=(sys.stdout,)
3521

3622
[logger_root]
3723
level=DEBUG
38-
handlers=screen,file,info_file
24+
handlers=screen

hyperion/lib/networking/clientInterface.py

Lines changed: 181 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,183 @@ def recvall(connection, n):
4242
return data
4343

4444

45-
class RemoteControllerInterface(AbstractController):
45+
class BaseClient:
46+
"""Base class for socket clients."""
4647
def __init__(self, host, port):
47-
super(RemoteControllerInterface, self).__init__(None)
48-
self.host_list = None
49-
self.config = None
5048
self.host = host
5149
self.port = port
52-
self.logger = logging.getLogger(__name__)
53-
self.receive_queue = queue.Queue()
50+
self.logger = logging.getLogger(self.__class__.__name__)
5451
self.send_queue = queue.Queue()
5552
self.mysel = selectors.DefaultSelector()
5653
self.keep_running = True
57-
self.ui_event_queue = None
58-
self.mounted_hosts = []
5954

6055
signal(SIGINT, self._handle_sigint)
6156

57+
def _handle_sigint(self, signum, frame):
58+
self.logger.debug("Received C-c")
59+
self._quit()
60+
61+
def _quit(self):
62+
self.keep_running = False
63+
64+
def _interpret_message(self, action, args):
65+
raise NotImplementedError
66+
67+
def _loop(self):
68+
raise NotImplementedError
69+
70+
71+
class RemoteSlaveInterface(BaseClient):
72+
def __init__(self, host, port, cc):
73+
"""Init remote slave interface for communication to the server at `host` on `port` with slave controller `cc`.
74+
75+
:param host: Hostname of the server to connect to
76+
:type host: str
77+
:param port: Port of the server to connect to
78+
:type port: int
79+
:param cc: Slave manager to dispatch calls to and forward messages from
80+
:type cc: hyperion.manager.SlaveManager
81+
"""
82+
BaseClient.__init__(self, host, port)
83+
self.cc = cc
84+
85+
server_address = (host, port)
86+
self.logger.debug('connecting to {} port {}'.format(*server_address))
87+
self.sock = sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
88+
self.event_queue = queue.Queue()
89+
self.cc.add_subscriber(self.event_queue)
90+
91+
try:
92+
sock.connect(server_address)
93+
except socket.error:
94+
self.logger.critical("Master session does not seem to be running. Quitting remote client")
95+
self._quit()
96+
sys.exit(1)
97+
sock.setblocking(False)
98+
99+
# Set up the selector to watch for when the socket is ready
100+
# to send data as well as when there is data to read.
101+
self.mysel.register(
102+
sock,
103+
selectors.EVENT_READ | selectors.EVENT_WRITE,
104+
)
105+
106+
self.function_mapping = {
107+
'start': self._start_wrapper,
108+
'check': self._check_wrapper,
109+
'stop': self._stop_wrapper,
110+
'quit': self._quit,
111+
'suspend': self._suspend
112+
}
113+
self._loop()
114+
115+
self.logger.debug("Shutdown complete!")
116+
117+
def _suspend(self):
118+
self.keep_running = False
119+
worker = threading.Thread(
120+
target=self.cc.cleanup,
121+
args=[False],
122+
name="Suspend slave thread"
123+
)
124+
worker.start()
125+
worker.join()
126+
127+
def _quit(self):
128+
self.keep_running = False
129+
worker = threading.Thread(
130+
target=self.cc.cleanup,
131+
args=[True],
132+
name="Shutdown slave thread"
133+
)
134+
worker.start()
135+
worker.join()
136+
137+
def _interpret_message(self, action, args):
138+
self.logger.debug("Action: %s, args: %s" % (action, args))
139+
func = self.function_mapping.get(action)
140+
141+
try:
142+
func(*args)
143+
except TypeError:
144+
self.logger.error("Ignoring unrecognized slave action '%s'" % action)
145+
146+
def _start_wrapper(self, comp_id):
147+
self.cc.start_component(self.cc.get_component_by_id(comp_id))
148+
149+
def _check_wrapper(self, comp_id):
150+
self.cc.check_component(self.cc.get_component_by_id(comp_id))
151+
152+
def _stop_wrapper(self, comp_id):
153+
self.cc.stop_component(self.cc.get_component_by_id(comp_id))
154+
155+
def _process_events(self):
156+
"""Process events enqueued by the manager and send them to connected clients if necessary.
157+
158+
:return: None
159+
"""
160+
while not self.event_queue.empty():
161+
event = self.event_queue.get_nowait()
162+
self.logger.debug("Forwarding event '%s' to slave manager server" % event)
163+
message = actionSerializer.serialize_request('queue_event', [event])
164+
self.send_queue.put(message)
165+
166+
def _loop(self):
167+
self.logger.debug("Started slave client messaging loop")
168+
# Keep alive until shutdown is requested and no messages are left to send
169+
while self.keep_running:
170+
for key, mask in self.mysel.select(timeout=1):
171+
connection = key.fileobj
172+
173+
if mask & selectors.EVENT_READ:
174+
self.logger.debug("Got read event")
175+
raw_msglen = connection.recv(4)
176+
if raw_msglen:
177+
# A readable client socket has data
178+
msglen = struct.unpack('>I', raw_msglen)[0]
179+
data = recvall(connection, msglen)
180+
self.logger.debug("Received message")
181+
action, args = actionSerializer.deserialize(data)
182+
self._interpret_message(action, args)
183+
184+
# Interpret empty result as closed connection
185+
else:
186+
self.keep_running = False
187+
# Reset queue for shutdown condition
188+
self.send_queue = queue.Queue()
189+
self.logger.critical("Connection to server was lost!")
190+
self._quit()
191+
192+
if mask & selectors.EVENT_WRITE:
193+
if not self.send_queue.empty(): # Server is ready to read, check if we have messages to send
194+
self.logger.debug("Sending next message in queue to Server")
195+
next_msg = self.send_queue.get()
196+
self.sock.sendall(next_msg)
197+
self._process_events()
198+
self.logger.debug("Exiting messaging loop")
199+
200+
201+
class RemoteControllerInterface(AbstractController, BaseClient):
202+
def _stop_remote_component(self, comp):
203+
self.logger.critical("This function should not be called in this context!")
204+
pass
205+
206+
def _start_remote_component(self, comp):
207+
self.logger.critical("This function should not be called in this context!")
208+
pass
209+
210+
def _check_remote_component(self, comp):
211+
self.logger.critical("This function should not be called in this context!")
212+
pass
213+
214+
def __init__(self, host, port):
215+
AbstractController.__init__(self, None)
216+
BaseClient.__init__(self, host, port)
217+
218+
self.host_list = None
219+
self.config = None
220+
self.mounted_hosts = []
221+
62222
self.function_mapping = {
63223
'get_conf_response': self._set_config,
64224
'get_host_list_response': self._set_host_list,
@@ -83,7 +243,7 @@ def __init__(self, host, port):
83243
selectors.EVENT_READ | selectors.EVENT_WRITE,
84244
)
85245

86-
self.thread = threading.Thread(target=self.loop)
246+
self.thread = threading.Thread(target=self._loop)
87247
self.thread.start()
88248

89249
self.request_config()
@@ -105,6 +265,10 @@ def request_config(self):
105265
message = actionSerializer.serialize_request(action, payload)
106266
self.send_queue.put(message)
107267

268+
def _quit(self):
269+
self.keep_running = False
270+
self.cleanup(False)
271+
108272
def cleanup(self, full=False, exit_code=0):
109273
if full:
110274
action = 'quit'
@@ -185,8 +349,8 @@ def _set_host_list(self, host_list):
185349
self.logger.debug("Updated host list")
186350

187351
def _forward_event(self, event):
188-
if self.ui_event_queue:
189-
self.ui_event_queue.put(event)
352+
if self.monitor_queue:
353+
self.monitor_queue.put(event)
190354

191355
# Special events handling
192356
if isinstance(event, DisconnectEvent):
@@ -196,7 +360,7 @@ def _forward_event(self, event):
196360
self.host_list[event.host_name] = True
197361
self._mount_host(event.host_name)
198362

199-
def loop(self):
363+
def _loop(self):
200364
# Keep alive until shutdown is requested and no messages are left to send
201365
while self.keep_running or not self.send_queue.empty():
202366
for key, mask in self.mysel.select(timeout=1):
@@ -219,7 +383,7 @@ def loop(self):
219383
# Reset queue for shutdown condition
220384
self.send_queue = queue.Queue()
221385
self.logger.critical("Connection to server was lost!")
222-
self.ui_event_queue.put(ServerDisconnectEvent())
386+
self.monitor_queue.put(ServerDisconnectEvent())
223387

224388
if mask & selectors.EVENT_WRITE:
225389
if not self.send_queue.empty(): # Server is ready to read, check if we have messages to send
@@ -234,7 +398,7 @@ def add_subscriber(self, subscriber_queue):
234398
:type subscriber_queue: queue.Queue
235399
:return: None
236400
"""
237-
self.ui_event_queue = subscriber_queue
401+
self.monitor_queue = subscriber_queue
238402

239403
###################
240404
# Host related
@@ -262,7 +426,7 @@ def _mount_host(self, hostname):
262426
else:
263427
self.logger.error("Error while trying to create directory '%s'" % directory)
264428

265-
cmd = 'sshfs %s:%s %s -F %s' % (hostname,
429+
cmd = 'sshfs %s:%s/localhost %s -F %s' % (hostname,
266430
config.TMP_LOG_PATH,
267431
directory,
268432
config.SSH_CONFIG_PATH
@@ -330,7 +494,8 @@ def is_localhost(self, hostname):
330494
else:
331495
self.logger.debug("Host '%s' is not localhost" % hostname)
332496
return False
333-
except socket.gaierror:
497+
except socket.gaierror as err:
498+
self.logger.debug("%s gaierror: %s" % (hostname, err))
334499
raise exceptions.HostUnknownException("Host '%s' is unknown! Update your /etc/hosts file!" % hostname)
335500

336501
def run_on_localhost(self, comp):
@@ -345,6 +510,3 @@ def run_on_localhost(self, comp):
345510
return self.is_localhost(comp['host'])
346511
except exceptions.HostUnknownException as ex:
347512
raise ex
348-
349-
def _handle_sigint(self, signum, frame):
350-
self.cleanup(False)

0 commit comments

Comments
 (0)