Skip to content

Commit 8eeff66

Browse files
committed
Merge remote-tracking branch 'origin/dmw'
* origin/dmw: docs: merge signals.rst into internals.rst os_fork: do not attempt to cork the active thread. parent: fix get_log_level() for split out loggers. issue #547: fix service_test failures. issue #547: update Changelog. issue #547: core/service: race/deadlock-free service pool init docs: update Changelog. select: make Select.add() handle multiple buffered items. core/select: add {Select,Latch,Receiver}.size(), deprecate empty() parent: docstring fixes core: remove dead Router.on_shutdown() and Router "shutdown" signal testlib: use lsof +E for much clearer leaked FD output [stream-refactor] stop leaking FD 100 for the life of the child core: split preserve_tty_fp() out into a function parent: zombie reaping v3 issue #410: fix test failure due to obsolete parentfp/childfp issue #170: replace Timer.cancelled with Timer.active core: more descriptive graceful shutdown timeout error docs: update changelog core: fix Python2.4 crash due to missing Logger.getChild(). issue #410: automatically work around SELinux braindamage. core: cache stream reference in DelimitedProtocol parent: docstring formatting docs: remove fakessh from home page, it's been broken forever docs: add changelog thanks Disable Azure pipelines build for docs-master too. docs: udpate Changelog. docs: tweak Changelog wording [linear2] merge fallout: re-enable _send_module_forwards(). docs: another round of docstring cleanups. master: allow filtering forwarded logs using logging package functions. docs: many more internals.rst tidyups tests: fix error in affinity_test service: centralize fetching thread name, and tidy up logs [stream-refactor] get caught up on internals.rst updates Stop using mitogen root logger in more modules, remove unused loggers tests: stop dumping Docker help output in the log. parent: move subprocess creation to mux thread too Split out and make readable more log messages across both packages ansible: log affinity assignments ci: log failed command line, and try enabling stdout line buffering ansible: improve docstring [linear2] simplify _listener_for_name() ansible: stop relying on SIGTERM to shut down service pool tests: move tty_create_child tests together ansible: cleanup various docstrings parent: define Connection behaviour during Broker.shutdown() issue #549: ansible: reduce risk by capping RLIM_INFINITY
2 parents 7ca2d00 + 5970b04 commit 8eeff66

36 files changed

+1372
-682
lines changed

.ci/azure-pipelines.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
# Add steps that analyze code, save the dist with the build record, publish to a PyPI-compatible index, and more:
44
# https://docs.microsoft.com/azure/devops/pipelines/languages/python
55

6+
trigger:
7+
branches:
8+
exclude:
9+
- docs-master
10+
611
jobs:
712

813
- job: Mac

.ci/ci_lib.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ def have_docker():
5757

5858
# -----------------
5959

60-
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
60+
# Force line buffering on stdout.
61+
sys.stdout = os.fdopen(1, 'w', 1)
6162

63+
# Force stdout FD 1 to be a pipe, so tools like pip don't spam progress bars.
6264
if 'TRAVIS_HOME' in os.environ:
6365
proc = subprocess.Popen(
6466
args=['stdbuf', '-oL', 'cat'],
@@ -86,8 +88,13 @@ def _argv(s, *args):
8688
def run(s, *args, **kwargs):
8789
argv = ['/usr/bin/time', '--'] + _argv(s, *args)
8890
print('Running: %s' % (argv,))
89-
ret = subprocess.check_call(argv, **kwargs)
90-
print('Finished running: %s' % (argv,))
91+
try:
92+
ret = subprocess.check_call(argv, **kwargs)
93+
print('Finished running: %s' % (argv,))
94+
except Exception:
95+
print('Exception occurred while running: %s' % (argv,))
96+
raise
97+
9198
return ret
9299

93100

@@ -217,6 +224,7 @@ def start_containers(containers):
217224
"docker rm -f %(name)s || true" % container,
218225
"docker run "
219226
"--rm "
227+
"--cpuset-cpus 0,1 "
220228
"--detach "
221229
"--privileged "
222230
"--cap-add=SYS_PTRACE "

ansible_mitogen/affinity.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@
7373
decisions.
7474
"""
7575

76+
from __future__ import absolute_import
7677
import ctypes
78+
import logging
7779
import mmap
7880
import multiprocessing
7981
import os
@@ -83,6 +85,9 @@
8385
import mitogen.parent
8486

8587

88+
LOG = logging.getLogger(__name__)
89+
90+
8691
try:
8792
_libc = ctypes.CDLL(None, use_errno=True)
8893
_strerror = _libc.strerror
@@ -207,40 +212,42 @@ def __init__(self, cpu_count=None):
207212
self._reserve_mask = 3
208213
self._reserve_shift = 2
209214

210-
def _set_affinity(self, mask):
215+
def _set_affinity(self, descr, mask):
216+
if descr:
217+
LOG.debug('CPU mask for %s: %#08x', descr, mask)
211218
mitogen.parent._preexec_hook = self._clear
212219
self._set_cpu_mask(mask)
213220

214-
def _balance(self):
221+
def _balance(self, descr):
215222
self.state.lock.acquire()
216223
try:
217224
n = self.state.counter
218225
self.state.counter += 1
219226
finally:
220227
self.state.lock.release()
221228

222-
self._set_cpu(self._reserve_shift + (
229+
self._set_cpu(descr, self._reserve_shift + (
223230
(n % (self.cpu_count - self._reserve_shift))
224231
))
225232

226-
def _set_cpu(self, cpu):
227-
self._set_affinity(1 << (cpu % self.cpu_count))
233+
def _set_cpu(self, descr, cpu):
234+
self._set_affinity(descr, 1 << (cpu % self.cpu_count))
228235

229236
def _clear(self):
230237
all_cpus = (1 << self.cpu_count) - 1
231-
self._set_affinity(all_cpus & ~self._reserve_mask)
238+
self._set_affinity(None, all_cpus & ~self._reserve_mask)
232239

233240
def assign_controller(self):
234241
if self._reserve_controller:
235-
self._set_cpu(1)
242+
self._set_cpu('Ansible top-level process', 1)
236243
else:
237-
self._balance()
244+
self._balance('Ansible top-level process')
238245

239246
def assign_muxprocess(self, index):
240-
self._set_cpu(index)
247+
self._set_cpu('MuxProcess %d' % (index,), index)
241248

242249
def assign_worker(self):
243-
self._balance()
250+
self._balance('WorkerProcess')
244251

245252
def assign_subprocess(self):
246253
self._clear()

ansible_mitogen/process.py

Lines changed: 52 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,14 @@ def increase_open_file_limit():
246246
limit is much higher.
247247
"""
248248
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
249-
LOG.debug('inherited open file limits: soft=%d hard=%d', soft, hard)
249+
if hard == resource.RLIM_INFINITY:
250+
hard_s = '(infinity)'
251+
# cap in case of O(RLIMIT_NOFILE) algorithm in some subprocess.
252+
hard = 524288
253+
else:
254+
hard_s = str(hard)
255+
256+
LOG.debug('inherited open file limits: soft=%d hard=%s', soft, hard_s)
250257
if soft >= hard:
251258
LOG.debug('max open files already set to hard limit: %d', hard)
252259
return
@@ -268,13 +275,13 @@ def common_setup(enable_affinity=True, _init_logging=True):
268275
save_pid('controller')
269276
ansible_mitogen.logging.set_process_name('top')
270277

278+
if _init_logging:
279+
ansible_mitogen.logging.setup()
280+
271281
if enable_affinity:
272282
ansible_mitogen.affinity.policy.assign_controller()
273283

274284
mitogen.utils.setup_gil()
275-
if _init_logging:
276-
ansible_mitogen.logging.setup()
277-
278285
if faulthandler is not None:
279286
faulthandler.enable()
280287

@@ -352,6 +359,11 @@ def close(self):
352359

353360

354361
class WorkerModel(object):
362+
"""
363+
Interface used by StrategyMixin to manage various Mitogen services, by
364+
default running in one or more connection multiplexer subprocesses spawned
365+
off the top-level Ansible process.
366+
"""
355367
def on_strategy_start(self):
356368
"""
357369
Called prior to strategy start in the top-level process. Responsible
@@ -368,6 +380,11 @@ def on_strategy_complete(self):
368380
raise NotImplementedError()
369381

370382
def get_binding(self, inventory_name):
383+
"""
384+
Return a :class:`Binding` to access Mitogen services for
385+
`inventory_name`. Usually called from worker processes, but may also be
386+
called from top-level process to handle "meta: reset_connection".
387+
"""
371388
raise NotImplementedError()
372389

373390

@@ -427,13 +444,10 @@ class ClassicWorkerModel(WorkerModel):
427444

428445
def __init__(self, _init_logging=True):
429446
"""
430-
Arrange for classic model multiplexers to be started, if they are not
431-
already running.
432-
433-
The parent process picks a UNIX socket path each child will use prior
434-
to fork, creates a socketpair used essentially as a semaphore, then
435-
blocks waiting for the child to indicate the UNIX socket is ready for
436-
use.
447+
Arrange for classic model multiplexers to be started. The parent choses
448+
UNIX socket paths each child will use prior to fork, creates a
449+
socketpair used essentially as a semaphore, then blocks waiting for the
450+
child to indicate the UNIX socket is ready for use.
437451
438452
:param bool _init_logging:
439453
For testing, if :data:`False`, don't initialize logging.
@@ -466,12 +480,10 @@ def _listener_for_name(self, name):
466480
Given an inventory hostname, return the UNIX listener that should
467481
communicate with it. This is a simple hash of the inventory name.
468482
"""
469-
if len(self._muxes) == 1:
470-
return self._muxes[0].path
471-
472-
idx = abs(hash(name)) % len(self._muxes)
473-
LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path)
474-
return self._muxes[idx].path
483+
mux = self._muxes[abs(hash(name)) % len(self._muxes)]
484+
LOG.debug('will use multiplexer %d (%s) to connect to "%s"',
485+
mux.index, mux.path, name)
486+
return mux.path
475487

476488
def _reconnect(self, path):
477489
if self.router is not None:
@@ -498,9 +510,9 @@ def _on_process_exit(self):
498510
This is an :mod:`atexit` handler installed in the top-level process.
499511
500512
Shut the write end of `sock`, causing the receive side of the socket in
501-
every worker process to return 0-byte reads, and causing their main
502-
threads to wake and initiate shutdown. After shutting the socket down,
503-
wait on each child to finish exiting.
513+
every :class:`MuxProcess` to return 0-byte reads, and causing their
514+
main threads to wake and initiate shutdown. After shutting the socket
515+
down, wait on each child to finish exiting.
504516
505517
This is done using :mod:`atexit` since Ansible lacks any better hook to
506518
run code during exit, and unless some synchronization exists with
@@ -523,30 +535,19 @@ def _on_process_exit(self):
523535
for mux in self._muxes:
524536
_, status = os.waitpid(mux.pid, 0)
525537
status = mitogen.fork._convert_exit_status(status)
526-
LOG.debug('mux %d PID %d %s', mux.index, mux.pid,
538+
LOG.debug('multiplexer %d PID %d %s', mux.index, mux.pid,
527539
mitogen.parent.returncode_to_str(status))
528540

529541
def _test_reset(self):
530542
"""
531543
Used to clean up in unit tests.
532544
"""
533-
# TODO: split this up a bit.
534-
global _classic_worker_model
535-
assert self.parent_sock is not None
536-
self.parent_sock.close()
537-
self.parent_sock = None
538-
self.listener_path = None
539-
self.router = None
540-
self.parent = None
541-
542-
for mux in self._muxes:
543-
pid, status = os.waitpid(mux.pid, 0)
544-
status = mitogen.fork._convert_exit_status(status)
545-
LOG.debug('mux PID %d %s', pid,
546-
mitogen.parent.returncode_to_str(status))
545+
self.on_binding_close()
546+
self._on_process_exit()
547+
set_worker_model(None)
547548

549+
global _classic_worker_model
548550
_classic_worker_model = None
549-
set_worker_model(None)
550551

551552
def on_strategy_start(self):
552553
"""
@@ -579,6 +580,7 @@ def on_binding_close(self):
579580
self.broker.join()
580581
self.router = None
581582
self.broker = None
583+
self.parent = None
582584
self.listener_path = None
583585

584586
# #420: Ansible executes "meta" actions in the top-level process,
@@ -694,8 +696,8 @@ def _setup_master(self):
694696
max_message_size=4096 * 1048576,
695697
)
696698
_setup_responder(self.router.responder)
697-
mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown)
698-
mitogen.core.listen(self.broker, 'exit', self.on_broker_exit)
699+
mitogen.core.listen(self.broker, 'shutdown', self._on_broker_shutdown)
700+
mitogen.core.listen(self.broker, 'exit', self._on_broker_exit)
699701
self.listener = mitogen.unix.Listener.build_stream(
700702
router=self.router,
701703
path=self.path,
@@ -715,26 +717,20 @@ def _setup_services(self):
715717
)
716718
setup_pool(self.pool)
717719

718-
def on_broker_shutdown(self):
720+
def _on_broker_shutdown(self):
719721
"""
720-
Respond to broker shutdown by beginning service pool shutdown. Do not
721-
join on the pool yet, since that would block the broker thread which
722-
then cannot clean up pending handlers, which is required for the
723-
threads to exit gracefully.
722+
Respond to broker shutdown by shutting down the pool. Do not join on it
723+
yet, since that would block the broker thread which then cannot clean
724+
up pending handlers and connections, which is required for the threads
725+
to exit gracefully.
724726
"""
725-
# In normal operation we presently kill the process because there is
726-
# not yet any way to cancel connect().
727-
self.pool.stop(join=self.profiling)
727+
self.pool.stop(join=False)
728728

729-
def on_broker_exit(self):
729+
def _on_broker_exit(self):
730730
"""
731-
Respond to the broker thread about to exit by sending SIGTERM to
732-
ourself. In future this should gracefully join the pool, but TERM is
733-
fine for now.
731+
Respond to the broker thread about to exit by finally joining on the
732+
pool. This is safe since pools only block in connection attempts, and
733+
connection attempts fail with CancelledError when broker shutdown
734+
begins.
734735
"""
735-
if not os.environ.get('MITOGEN_PROFILING'):
736-
# In normal operation we presently kill the process because there is
737-
# not yet any way to cancel connect(). When profiling, threads
738-
# including the broker must shut down gracefully, otherwise pstats
739-
# won't be written.
740-
os.kill(os.getpid(), signal.SIGTERM)
736+
self.pool.join()

ansible_mitogen/services.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def put(self, context):
180180
Return a reference, making it eligable for recycling once its reference
181181
count reaches zero.
182182
"""
183-
LOG.debug('%r.put(%r)', self, context)
183+
LOG.debug('decrementing reference count for %r', context)
184184
self._lock.acquire()
185185
try:
186186
if self._refs_by_context.get(context, 0) == 0:
@@ -326,7 +326,6 @@ def _on_context_disconnect(self, context):
326326
)
327327

328328
def _send_module_forwards(self, context):
329-
return
330329
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
331330

332331
_candidate_temp_dirs = None
@@ -383,7 +382,6 @@ def _connect(self, key, spec, via=None):
383382
mitogen.core.listen(context, 'disconnect',
384383
lambda: self._on_context_disconnect(context))
385384

386-
#self._send_module_forwards(context) TODO
387385
self._send_module_forwards(context)
388386
init_child_result = context.call(
389387
ansible_mitogen.target.init_child,

0 commit comments

Comments
 (0)