diff --git a/mitogen/core.py b/mitogen/core.py index 5be36a95d..c61658897 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -108,6 +108,11 @@ def _path_importer_cache(cls, path): except NameError: BaseException = Exception +if sys.version_info >= (2, 6): + from io import BlockingIOError +else: + BlockingIOError = None + try: ModuleNotFoundError except NameError: @@ -585,29 +590,63 @@ def io_op(func, *args): or :class:`OSError`, trapping UNIX error codes relating to disconnection and retry events in various subsystems: - * When a signal is delivered to the process on Python 2, system call retry - is signalled through :data:`errno.EINTR`. The invocation is automatically - restarted. - * When performing IO against a TTY, disconnection of the remote end is - signalled by :data:`errno.EIO`. - * When performing IO against a socket, disconnection of the remote end is - signalled by :data:`errno.ECONNRESET`. - * When performing IO against a pipe, disconnection of the remote end is - signalled by :data:`errno.EPIPE`. - + :data:`errno.EINTR` + A system call was interrupted by a signal being delivered. + Python >= 3.5 retries most calls, forever (see PEP 475). + This wrapper retries all calls, also forever. + + :exc:`BlockingIOError` + :data:`errno.EAGAIN` + :data:`errno.EWOULDBLOCK` + A system call on a non-blocking file would have blocked. + Python doesn't retry calls. It raises an exception or returns + :data:`None` - depending on the the call, the file, and the version. + This wrapper tries upto ``max_attempts`` times. + + :data:`errno.EIO` + :data:`errno.ECONNRESET` + :data:`errno.EPIPE` + IO on a TTY, socket, or pipe respectively disconnected at the other end. + + :param func: + The callable to run (e.g. :func:`os.read`, :func:`os.write`). + :param *args: + Positional arguments for the callable. :returns: Tuple of `(return_value, disconnect_reason)`, where `return_value` is the return value of `func(*args)`, and `disconnected` is an exception instance when disconnection was detected, otherwise :data:`None`. """ + max_attempts = 5 + attempt = 0 while True: + attempt += 1 try: return func(*args), None + except BlockingIOError: + e = sys.exc_info()[1] + _vv and IOLOG.debug( + 'io_op(%r) attempt %d/%d -> %r', func, attempt, max_attempts, e + ) + try: + written = e.characters_written + except AttributeError: + written = None + if written: + return written, None + if attempt < max_attempts: + continue + raise except (select.error, OSError, IOError): e = sys.exc_info()[1] - _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) + _vv and IOLOG.debug( + 'io_op(%r) attempt %d/%d -> %r', func, attempt, max_attempts, e + ) if e.args[0] == errno.EINTR: continue + if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK): + if attempt < max_attempts: + continue if e.args[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE): return None, e raise diff --git a/tests/ansible/regression/files/generate_ascii_test_data.py b/tests/ansible/regression/files/generate_ascii_test_data.py new file mode 100644 index 000000000..a1faad04b --- /dev/null +++ b/tests/ansible/regression/files/generate_ascii_test_data.py @@ -0,0 +1,65 @@ +''' +Write lines of deterministic ASCII test data + +```console +$ python generate_ascii_test_data.py 192 +000000000000000000 mitogen-test-file ABCDEFGHIJKLMNOPQRSTUVWXYZ +000000000000000064 mitogen-test-file BCDEFGHIJKLMNOPQRSTUVWXYZA +000000000000000128 mitogen-test-file CDEFGHIJKLMNOPQRSTUVWXYZAB +``` +''' + +import os +import sys + +if sys.version_info < (3, 0): + range = xrange # noqa: F821 + +# Padding added to make each line LINE_SIZE bytes long, including a newline. +# PADDING_POOL is repeated to eliminate repeated concatenations in the loop. +PADDING_TEXT = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.encode('ascii') +PADDING_SIZE = 26 +PADDING_POOL = PADDING_TEXT * 2 + +LINE_TMPL = '%%018d mitogen-test-file %%.%ds\n'.encode('ascii') % PADDING_SIZE +LINE_SIZE = 64 + +def format_line(lineno): + # type: (int) -> bytes + line_offset = lineno * LINE_SIZE + padding_shift = lineno % PADDING_SIZE + return LINE_TMPL % (line_offset, PADDING_POOL[padding_shift:]) + + +def main(): + assert len(PADDING_POOL) >= 2 * PADDING_SIZE + assert len(format_line(0)) == LINE_SIZE + + try: + output_size = int(sys.argv[1]) + if output_size < 0 or output_size % LINE_SIZE != 0: + raise ValueError + except IndexError: + prog = os.path.basename(sys.argv[0]) + raise SystemExit('Usage: %s output_size [output_file]' % prog) + except ValueError: + raise SystemExit( + 'Error: output_size must be >= 0 and a multiple of line size (%d), ' + 'got: %s' % (LINE_SIZE, sys.argv[1]) + ) + + if len(sys.argv) >= 3 and sys.argv[2] != '-': + output_file = open(sys.argv[2], 'wb') + else: + output_file = os.fdopen(sys.stdout.fileno(), 'wb') + + with output_file as f: + for lineno in range(0, output_size // LINE_SIZE): + line = format_line(lineno) + f.write(line) + + raise SystemExit + + +if __name__ == '__main__': + main() diff --git a/tests/ansible/regression/issue_615__streaming_transfer.yml b/tests/ansible/regression/issue_615__streaming_transfer.yml index 6fe52d55d..88a5e9f6a 100644 --- a/tests/ansible/regression/issue_615__streaming_transfer.yml +++ b/tests/ansible/regression/issue_615__streaming_transfer.yml @@ -7,30 +7,36 @@ become: true vars: mitogen_ssh_compression: false + file_path_controller: "/tmp/fetch-{{ inventory_hostname }}-512mb.txt" + file_path_target: /tmp/mitogen-test-512mb.txt + file_size: "{{ 512 * 2**20 }}" tasks: - include_tasks: _mitogen_only.yml - block: - - name: Create /tmp/512mb.zero - shell: | - dd if=/dev/zero of=/tmp/512mb.zero bs=1048576 count=512; - chmod go= /tmp/512mb.zero - args: - creates: /tmp/512mb.zero + - name: Create test file on target + script: + cmd: generate_ascii_test_data.py "{{ file_size }}" "{{ file_path_target }}" + executable: "{{ ansible_python_interpreter | default(ansible_facts.discovered_interpreter_python) }}" + creates: "{{ file_path_target }}" + register: target_file_task - - name: Fetch /tmp/512mb.zero + - debug: + var: target_file_task + + - name: Fetch test file fetch: - src: /tmp/512mb.zero - dest: /tmp/fetch-{{ inventory_hostname }}-512mb.zero + src: "{{ file_path_target }}" + dest: "{{ file_path_controller }}" flat: true - - name: Cleanup /tmp/512mb.zero + - name: Cleanup target file: - path: /tmp/512mb.zero + path: "{{ file_path_target }}" state: absent - - name: Cleanup fetched file + - name: Cleanup controller file: - path: /tmp/fetch-{{ inventory_hostname }}-512mb.zero + path: "{{ file_path_controller }}" state: absent become: false delegate_to: localhost