Skip to content

Commit d51e7c7

Browse files
committed
start heart beating after other channels
reduces likelihood of engine registration completing before scheduler connections are established Does not eliminate the race!
1 parent 924812c commit d51e7c7

File tree

2 files changed

+52
-44
lines changed

2 files changed

+52
-44
lines changed

ipyparallel/apps/ipcontrollerapp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@
111111
'nodb': (
112112
{'HubFactory': {'db_class': 'ipyparallel.controller.dictdb.NoDB'}},
113113
"""use dummy DB backend, which doesn't store any information.
114-
114+
115115
This is the default as of IPython 0.13.
116-
116+
117117
To enable delayed or repeated retrieval of results from the Hub,
118118
select one of the true db backends.
119119
""",

ipyparallel/engine/engine.py

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class EngineFactory(RegistrationFactory):
7070
max_heartbeat_misses = Integer(
7171
50,
7272
config=True,
73-
help="""The maximum number of times a check for the heartbeat ping of a
73+
help="""The maximum number of times a check for the heartbeat ping of a
7474
controller can be missed before shutting down the engine.
75-
75+
7676
If set to 0, the check is disabled.""",
7777
)
7878
sshserver = Unicode(
@@ -93,7 +93,7 @@ class EngineFactory(RegistrationFactory):
9393
False,
9494
config=True,
9595
help="""Enable MPI integration.
96-
96+
9797
If set, MPI rank will be requested for my rank,
9898
and additionally `mpi_init` will be executed in the interactive shell.
9999
""",
@@ -118,7 +118,7 @@ def tunnel_mod(self):
118118
allow_none=True,
119119
config=True,
120120
help="""Request this engine ID.
121-
121+
122122
If run in MPI, will use the MPI rank.
123123
Otherwise, let the Hub decide what our rank should be.
124124
""",
@@ -260,25 +260,7 @@ def urls(key):
260260
"Did not get the requested id: %i != %i", content['id'], self.id
261261
)
262262
self.id = content['id']
263-
264-
# launch heartbeat
265-
# possibly forward hb ports with tunnels
266-
hb_ping = maybe_tunnel(url('hb_ping'))
267-
hb_pong = maybe_tunnel(url('hb_pong'))
268-
269-
hb_monitor = None
270-
if self.max_heartbeat_misses > 0:
271-
# Add a monitor socket which will record the last time a ping was seen
272-
mon = self.context.socket(zmq.SUB)
273-
mport = mon.bind_to_random_port('tcp://%s' % localhost())
274-
mon.setsockopt(zmq.SUBSCRIBE, b"")
275-
self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
276-
self._hb_listener.on_recv(self._report_ping)
277-
278-
hb_monitor = "tcp://%s:%i" % (localhost(), mport)
279-
280-
heart = Heart(hb_ping, hb_pong, hb_monitor, heart_id=identity)
281-
heart.start()
263+
self.log.name += f".{self.id}"
282264

283265
# create Shell Connections (MUX, Task, etc.):
284266
shell_addrs = [url('mux'), url('task')] + urls('broadcast')
@@ -288,6 +270,8 @@ def urls(key):
288270
# Use only one shell stream for mux and tasks
289271
stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
290272
stream.setsockopt(zmq.IDENTITY, identity)
273+
# TODO: enable PROBE_ROUTER when schedulers can handle the empty message
274+
# stream.setsockopt(zmq.PROBE_ROUTER, 1)
291275
self.log.debug("Setting shell identity %r", identity)
292276

293277
shell_streams = [stream]
@@ -348,25 +332,6 @@ def urls(key):
348332
'engine.%i.displaypub' % self.id
349333
)
350334

351-
# periodically check the heartbeat pings of the controller
352-
# Should be started here and not in "start()" so that the right period can be taken
353-
# from the hubs HeartBeatMonitor.period
354-
if self.max_heartbeat_misses > 0:
355-
# Use a slightly bigger check period than the hub signal period to not warn unnecessary
356-
self.hb_check_period = int(content['hb_period']) + 10
357-
self.log.info(
358-
"Starting to monitor the heartbeat signal from the hub every %i ms.",
359-
self.hb_check_period,
360-
)
361-
self._hb_reporter = ioloop.PeriodicCallback(
362-
self._hb_monitor, self.hb_check_period
363-
)
364-
self._hb_reporter.start()
365-
else:
366-
self.log.info(
367-
"Monitoring of the heartbeat signal from the hub is not enabled."
368-
)
369-
370335
# FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
371336
app = IPKernelApp(
372337
parent=self, shell=self.kernel.shell, kernel=self.kernel, log=self.log
@@ -381,8 +346,51 @@ def urls(key):
381346
self.log.fatal("Registration Failed: %s" % msg)
382347
raise Exception("Registration Failed: %s" % msg)
383348

349+
self.start_heartbeat(
350+
maybe_tunnel(url('hb_ping')),
351+
maybe_tunnel(url('hb_pong')),
352+
content['hb_period'],
353+
identity,
354+
)
384355
self.log.info("Completed registration with id %i" % self.id)
385356

357+
def start_heartbeat(self, hb_ping, hb_pong, hb_period, identity):
358+
"""Start our heart beating"""
359+
self.log.info("Starting heartbeat")
360+
361+
hb_monitor = None
362+
if self.max_heartbeat_misses > 0:
363+
# Add a monitor socket which will record the last time a ping was seen
364+
mon = self.context.socket(zmq.SUB)
365+
mport = mon.bind_to_random_port('tcp://%s' % localhost())
366+
mon.setsockopt(zmq.SUBSCRIBE, b"")
367+
self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
368+
self._hb_listener.on_recv(self._report_ping)
369+
370+
hb_monitor = "tcp://%s:%i" % (localhost(), mport)
371+
372+
heart = Heart(hb_ping, hb_pong, hb_monitor, heart_id=identity)
373+
heart.start()
374+
375+
# periodically check the heartbeat pings of the controller
376+
# Should be started here and not in "start()" so that the right period can be taken
377+
# from the hubs HeartBeatMonitor.period
378+
if self.max_heartbeat_misses > 0:
379+
# Use a slightly bigger check period than the hub signal period to not warn unnecessary
380+
self.hb_check_period = hb_period + 500
381+
self.log.info(
382+
"Starting to monitor the heartbeat signal from the hub every %i ms.",
383+
self.hb_check_period,
384+
)
385+
self._hb_reporter = ioloop.PeriodicCallback(
386+
self._hb_monitor, self.hb_check_period
387+
)
388+
self._hb_reporter.start()
389+
else:
390+
self.log.info(
391+
"Monitoring of the heartbeat signal from the hub is not enabled."
392+
)
393+
386394
def abort(self):
387395
self.log.fatal("Registration timed out after %.1f seconds" % self.timeout)
388396
if self.url.startswith('127.'):

0 commit comments

Comments
 (0)