Skip to content

Commit e1de0cd

Browse files
authored
Merge pull request #522 from minrk/nicer-process-names
name threads/processes in controller
2 parents 7a9e5d8 + b083df0 commit e1de0cd

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

ipyparallel/controller/app.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -721,11 +721,15 @@ def init_hub(self):
721721
if self.restore_engines:
722722
self.hub._load_engine_state()
723723

724-
def launch_python_scheduler(self, scheduler_args, children):
724+
def launch_python_scheduler(self, name, scheduler_args, children):
725725
if 'Process' in self.mq_class:
726726
# run the Python scheduler in a Process
727-
q = Process(target=launch_scheduler, kwargs=scheduler_args)
728-
q.daemon = True
727+
q = Process(
728+
target=launch_scheduler,
729+
kwargs=scheduler_args,
730+
name=name,
731+
daemon=True,
732+
)
729733
children.append(q)
730734
else:
731735
# single-threaded Controller
@@ -750,12 +754,16 @@ def get_python_scheduler_args(
750754
}
751755

752756
def launch_broadcast_schedulers(self, monitor_url, children):
753-
def launch_in_thread_or_process(scheduler_args):
757+
def launch_in_thread_or_process(scheduler_args, depth, identity):
754758

755759
if 'Process' in self.mq_class:
756760
# run the Python scheduler in a Process
757-
q = Process(target=launch_broadcast_scheduler, kwargs=scheduler_args)
758-
q.daemon = True
761+
q = Process(
762+
target=launch_broadcast_scheduler,
763+
kwargs=scheduler_args,
764+
name=f"BroadcastScheduler(depth={depth}, id={identity})",
765+
daemon=True,
766+
)
759767
children.append(q)
760768
else:
761769
# single-threaded Controller
@@ -805,7 +813,7 @@ def recursively_start_schedulers(identity, depth):
805813
self.client_url(BroadcastScheduler.port_name, outgoing_id2),
806814
]
807815
)
808-
launch_in_thread_or_process(scheduler_args)
816+
launch_in_thread_or_process(scheduler_args, depth=depth, identity=identity)
809817
if not is_leaf:
810818
recursively_start_schedulers(outgoing_id1, depth + 1)
811819
recursively_start_schedulers(outgoing_id2, depth + 1)
@@ -823,6 +831,7 @@ def init_schedulers(self):
823831
# maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
824832
# IOPub relay (in a Process)
825833
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A', b'iopub')
834+
q.name = "IOPubScheduler"
826835
q.bind_in(self.client_url('iopub'))
827836
q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
828837
q.bind_out(self.engine_url('iopub'))
@@ -833,6 +842,7 @@ def init_schedulers(self):
833842

834843
# Multiplexer Queue (in a Process)
835844
q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
845+
q.name = "DirectScheduler"
836846

837847
q.bind_in(self.client_url('mux'))
838848
q.setsockopt_in(zmq.IDENTITY, b'mux_in')
@@ -844,6 +854,7 @@ def init_schedulers(self):
844854

845855
# Control Queue (in a Process)
846856
q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
857+
q.name = "ControlScheduler"
847858
q.bind_in(self.client_url('control'))
848859
q.setsockopt_in(zmq.IDENTITY, b'control_in')
849860
q.bind_out(self.engine_url('control'))
@@ -859,6 +870,7 @@ def init_schedulers(self):
859870
if scheme == 'pure':
860871
self.log.warn("task::using pure DEALER Task scheduler")
861872
q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
873+
q.name = "TaskScheduler(pure)"
862874
# q.setsockopt_out(zmq.HWM, hub.hwm)
863875
q.bind_in(self.client_url('task'))
864876
q.setsockopt_in(zmq.IDENTITY, b'task_in')
@@ -868,11 +880,12 @@ def init_schedulers(self):
868880
q.daemon = True
869881
children.append(q)
870882
elif scheme == 'none':
871-
self.log.warn("task::using no Task scheduler")
883+
self.log.warning("task::using no Task scheduler")
872884

873885
else:
874886
self.log.info("task::using Python %s Task scheduler" % scheme)
875887
self.launch_python_scheduler(
888+
'TaskScheduler',
876889
self.get_python_scheduler_args('task', TaskScheduler, monitor_url),
877890
children,
878891
)
@@ -944,6 +957,14 @@ def start(self):
944957
# otherwise signal-handling will fire multiple times
945958
for child in self.children:
946959
child.start()
960+
if hasattr(child, 'launcher'):
961+
# apply name to actual process/thread for logging
962+
setattr(child.launcher, 'name', child.name)
963+
if not self.use_threads:
964+
process = getattr(child, 'launcher', child)
965+
self.log.debug(f"Started process {child.name}: {process.pid}")
966+
else:
967+
self.log.debug(f"Started thread {child.name}")
947968

948969
self.init_signal()
949970

ipyparallel/controller/broadcast_scheduler.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ class BroadcastScheduler(Scheduler):
2121
outgoing_streams = List()
2222

2323
def start(self):
24-
self.log.info(
25-
'Broadcast Scheduler started with pid=%s',
26-
os.getpid(),
27-
)
2824
self.client_stream.on_recv(self.dispatch_submission, copy=False)
2925
if self.is_leaf:
3026
super().start()

0 commit comments

Comments
 (0)