Skip to content

Commit 8d16f65

Browse files
committed
Merge remote-tracking branch 'origin/dmw'
* origin/dmw: issue #533: update routing to account for DEL_ROUTE propagation race tests: use defer_sync() Rather than defer() + ancient sync_with_broker() tests: one case from doas_test was invoking su tests: hide memory-mapped files from lsof output issue #615: remove meaningless test issue #625: ignore SIGINT within MuxProcess issue #625: use exec() instead of subprocess in mitogen_ansible_playbook issue #615: regression test issue #615: update Changelog.
2 parents e701fae + bcca47d commit 8d16f65

File tree

10 files changed

+126
-36
lines changed

10 files changed

+126
-36
lines changed

ansible_mitogen/plugins/action/mitogen_fetch.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,30 +45,25 @@ def run(self, tmp=None, task_vars=None):
4545
task_vars = dict()
4646

4747
result = super(ActionModule, self).run(tmp, task_vars)
48-
del tmp # tmp no longer has any effect
49-
5048
try:
5149
if self._play_context.check_mode:
5250
result['skipped'] = True
5351
result['msg'] = 'check mode not (yet) supported for this module'
5452
return result
5553

56-
source = self._task.args.get('src', None)
57-
dest = self._task.args.get('dest', None)
5854
flat = boolean(self._task.args.get('flat'), strict=False)
5955
fail_on_missing = boolean(self._task.args.get('fail_on_missing', True), strict=False)
6056
validate_checksum = boolean(self._task.args.get('validate_checksum', True), strict=False)
6157

6258
# validate source and dest are strings FIXME: use basic.py and module specs
59+
source = self._task.args.get('src')
6360
if not isinstance(source, string_types):
6461
result['msg'] = "Invalid type supplied for source option, it must be a string"
6562

63+
dest = self._task.args.get('dest')
6664
if not isinstance(dest, string_types):
6765
result['msg'] = "Invalid type supplied for dest option, it must be a string"
6866

69-
if source is None or dest is None:
70-
result['msg'] = "src and dest are required"
71-
7267
if result.get('msg'):
7368
result['failed'] = True
7469
return result

ansible_mitogen/process.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import os
3434
import resource
3535
import socket
36+
import signal
3637
import sys
3738

3839
try:
@@ -659,6 +660,12 @@ def worker_main(self):
659660
connected to the parent to be closed (indicating the parent has died).
660661
"""
661662
save_pid('mux')
663+
664+
# #623: MuxProcess ignores SIGINT because it wants to live until every
665+
# Ansible worker process has been cleaned up by
666+
# TaskQueueManager.cleanup(), otherwise harmles yet scary warnings
667+
# about being unable connect to MuxProess could be printed.
668+
signal.signal(signal.SIGINT, signal.SIG_IGN)
662669
ansible_mitogen.logging.set_process_name('mux')
663670
ansible_mitogen.affinity.policy.assign_muxprocess(self.index)
664671

docs/changelog.rst

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ Enhancements
3232
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
3333
are not yet handled.
3434

35-
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
36-
installation, both Operon and Ansible are supported by a unified release.
37-
3835
* `#419 <https://github.com/dw/mitogen/issues/419>`_,
3936
`#470 <https://github.com/dw/mitogen/issues/470>`_, file descriptor usage
4037
during large runs is halved, as it is no longer necessary to manage read and
@@ -54,6 +51,17 @@ Enhancements
5451
available to manipulate `Buildah <https://buildah.io/>`_ containers, and is
5552
exposed to Ansible as the ``buildah`` transport.
5653

54+
* `#615 <https://github.com/dw/mitogen/issues/615>`_: the ``mitogen_fetch``
55+
action is included, and the standard Ansible ``fetch`` action is redirected
56+
to it. This implements streaming file transfer in every case, including when
57+
``become`` is active, preventing excessive CPU usage and memory spikes, and
58+
significantly improving throughput. A copy of 2 files of 512 MiB each drops
59+
from 47 seconds to just under 7 seconds, with peak memory usage dropping from
60+
10.7 GiB to 64.8 MiB.
61+
62+
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
63+
installation, both Operon and Ansible are supported by a unified release.
64+
5765
* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection
5866
multiplexer into per-CPU workers. This may improve throughput for runs
5967
involving large file transfers, and is required for future in-process SSH
@@ -190,6 +198,18 @@ Core Library
190198
`closed` flag, preventing historical bugs where a double close could destroy
191199
descriptors belonging to unrelated streams.
192200

201+
* `#533 <https://github.com/dw/mitogen/issues/533>`_: routing accounts for
202+
a race between a parent sending a message to a child via an intermediary,
203+
where the child had recently disconnected, and ``DEL_ROUTE`` propagating from
204+
the intermediary to the parent, informing it that the child no longer exists.
205+
This condition is detected at the intermediary and a dead message is returned
206+
to the parent.
207+
208+
Previously since the intermediary had already removed its route for the
209+
child, the *route messages upwards* rule would be triggered, causing the
210+
message (with a privileged ``src_id``/``auth_id``) to be sent upstream,
211+
resulting in a ``bad auth_id`` log message and a hang.
212+
193213
* `#586 <https://github.com/dw/mitogen/issues/586>`_: fix import of
194214
:mod:`__main__` on later versions of Python 3 when running from the
195215
interactive console.

mitogen/core.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3272,34 +3272,48 @@ def _async_route(self, msg, in_stream=None):
32723272
))
32733273
return
32743274

3275-
# Perform source verification.
3275+
parent_stream = self._stream_by_id.get(mitogen.parent_id)
3276+
src_stream = self._stream_by_id.get(msg.src_id, parent_stream)
3277+
3278+
# When the ingress stream is known, verify the message was received on
3279+
# the same as the stream we would expect to receive messages from the
3280+
# src_id and auth_id. This is like Reverse Path Filtering in IP, and
3281+
# ensures messages from a privileged context cannot be spoofed by a
3282+
# child.
32763283
if in_stream:
3277-
parent = self._stream_by_id.get(mitogen.parent_id)
3278-
expect = self._stream_by_id.get(msg.auth_id, parent)
3279-
if in_stream != expect:
3284+
auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream)
3285+
if in_stream != auth_stream:
32803286
LOG.error('%r: bad auth_id: got %r via %r, not %r: %r',
3281-
self, msg.auth_id, in_stream, expect, msg)
3287+
self, msg.auth_id, in_stream, auth_stream, msg)
32823288
return
32833289

3284-
if msg.src_id != msg.auth_id:
3285-
expect = self._stream_by_id.get(msg.src_id, parent)
3286-
if in_stream != expect:
3287-
LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
3288-
self, msg.src_id, in_stream, expect, msg)
3289-
return
3290+
if msg.src_id != msg.auth_id and in_stream != src_stream:
3291+
LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
3292+
self, msg.src_id, in_stream, src_stream, msg)
3293+
return
32903294

3295+
# If the stream's MitogenProtocol has auth_id set, copy it to the
3296+
# message. This allows subtrees to become privileged by stamping a
3297+
# parent's context ID. It is used by mitogen.unix to mark client
3298+
# streams (like Ansible WorkerProcess) as having the same rights as
3299+
# the parent.
32913300
if in_stream.protocol.auth_id is not None:
32923301
msg.auth_id = in_stream.protocol.auth_id
32933302

3294-
# Maintain a set of IDs the source ever communicated with.
3303+
# Record the IDs the source ever communicated with.
32953304
in_stream.protocol.egress_ids.add(msg.dst_id)
32963305

32973306
if msg.dst_id == mitogen.context_id:
32983307
return self._invoke(msg, in_stream)
32993308

33003309
out_stream = self._stream_by_id.get(msg.dst_id)
3301-
if out_stream is None:
3302-
out_stream = self._stream_by_id.get(mitogen.parent_id)
3310+
if (not out_stream) and (parent_stream != src_stream or not in_stream):
3311+
# No downstream route exists. The message could be from a child or
3312+
# ourselves for a parent, in which case we must forward it
3313+
# upstream, or it could be from a parent for a dead child, in which
3314+
# case its src_id/auth_id would fail verification if returned to
3315+
# the parent, so in that case reply with a dead message instead.
3316+
out_stream = parent_stream
33033317

33043318
if out_stream is None:
33053319
self._maybe_send_dead(True, msg, self.no_route_msg,
@@ -3310,9 +3324,9 @@ def _async_route(self, msg, in_stream=None):
33103324
(in_stream.protocol.is_privileged or
33113325
out_stream.protocol.is_privileged):
33123326
self._maybe_send_dead(True, msg, self.unidirectional_msg,
3313-
in_stream.protocol.remote_id,
3314-
out_stream.protocol.remote_id,
3315-
mitogen.context_id)
3327+
in_stream.protocol.remote_id,
3328+
out_stream.protocol.remote_id,
3329+
mitogen.context_id)
33163330
return
33173331

33183332
out_stream.protocol._send(msg)

tests/ansible/mitogen_ansible_playbook.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,8 @@
33
import subprocess
44
import sys
55
os.environ['ANSIBLE_STRATEGY'] = 'mitogen_linear'
6-
subprocess.check_call(['./run_ansible_playbook.py'] + sys.argv[1:])
6+
os.execlp(
7+
'./run_ansible_playbook.py',
8+
'./run_ansible_playbook.py',
9+
*sys.argv[1:]
10+
)

tests/ansible/regression/all.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@
1111
- include: issue_558_unarchive_failed.yml
1212
- include: issue_590__sys_modules_crap.yml
1313
- include: issue_591__setuptools_cwd_crash.yml
14+
- include: issue_615__streaming_transfer.yml
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# issue #615: 'fetch' with become: was internally using slurp.
2+
3+
- hosts: target
4+
any_errors_fatal: True
5+
gather_facts: no
6+
become: true
7+
vars:
8+
mitogen_ssh_compression: false
9+
tasks:
10+
- shell: |
11+
dd if=/dev/zero of=/tmp/512mb.zero bs=1048576 count=512;
12+
chmod go= /tmp/512mb.zero
13+
14+
- fetch:
15+
src: /tmp/512mb.zero
16+
dest: /tmp/fetch-out
17+
18+
- file:
19+
path: /tmp/fetch-out
20+
state: absent
21+
delegate_to: localhost

tests/doas_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_password_okay(self):
5757
username='mitogen__has_sudo',
5858
password='has_sudo_password',
5959
)
60-
context = self.router.su(via=ssh, password='rootpassword')
60+
context = self.router.doas(via=ssh, password='has_sudo_password')
6161
self.assertEquals(0, context.call(os.getuid))
6262

6363

tests/router_test.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ def test_bad_auth_id(self):
6262
recv = mitogen.core.Receiver(self.router)
6363
self.child2_msg.handle = recv.handle
6464

65-
self.broker.defer(self.router._async_route,
66-
self.child2_msg,
67-
in_stream=self.child1_stream)
68-
69-
# Wait for IO loop to finish everything above.
70-
self.sync_with_broker()
65+
self.broker.defer_sync(
66+
lambda: self.router._async_route(
67+
self.child2_msg,
68+
in_stream=self.child1_stream
69+
)
70+
)
7171

7272
# Ensure message wasn't forwarded.
7373
self.assertTrue(recv.empty())
@@ -76,6 +76,34 @@ def test_bad_auth_id(self):
7676
expect = 'bad auth_id: got %r via' % (self.child2_msg.auth_id,)
7777
self.assertTrue(expect in log.stop())
7878

79+
def test_parent_unaware_of_disconnect(self):
80+
# Parent -> Child A -> Child B. B disconnects concurrent to Parent
81+
# sending message. Parent does not yet know B has disconnected, A
82+
# receives message from Parent with Parent's auth_id, for a stream that
83+
# no longer exists.
84+
c1 = self.router.local()
85+
strm = self.router.stream_by_id(c1.context_id)
86+
recv = mitogen.core.Receiver(self.router)
87+
88+
self.broker.defer(lambda:
89+
strm.protocol._send(
90+
mitogen.core.Message(
91+
dst_id=1234, # nonexistent child
92+
handle=1234,
93+
src_id=mitogen.context_id,
94+
reply_to=recv.handle,
95+
)
96+
)
97+
)
98+
99+
e = self.assertRaises(mitogen.core.ChannelError,
100+
lambda: recv.get().unpickle()
101+
)
102+
self.assertEquals(e.args[0], self.router.no_route_msg % (
103+
1234,
104+
c1.context_id,
105+
))
106+
79107
def test_bad_src_id(self):
80108
# Deliver a message locally from child2 with the correct auth_id, but
81109
# the wrong src_id.

tests/testlib.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def _teardown_check_threads(self):
338338
def _teardown_check_fds(self):
339339
mitogen.core.Latch._on_fork()
340340
if get_fd_count() != self._fd_count_before:
341-
import os; os.system('lsof +E -w -p %s' % (os.getpid(),))
341+
import os; os.system('lsof +E -w -p %s | grep -vw mem' % (os.getpid(),))
342342
assert 0, "%s leaked FDs. Count before: %s, after: %s" % (
343343
self, self._fd_count_before, get_fd_count(),
344344
)

0 commit comments

Comments
 (0)