diff --git a/mitogen/parent.py b/mitogen/parent.py index 6e30b1c62..217a1ad75 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1378,6 +1378,9 @@ class Connection(object): #: user. exception = None + #: First stage select timeout in seconds. + _first_stage_select_timeout = 10 + #: Extra text appended to :class:`EofError` if that exception is raised on #: a failed connection attempt. May be used in subclasses to hint at common #: problems with a particular connection method. @@ -1416,11 +1419,18 @@ def __repr__(self): # r: read side of core_src FD. # w: write side of core_src FD. # C: the decompressed core source. + # n: size of the compressed core source to be read + # V: data chunk + # rl: list of FDs ready for reading + # t: timeout value in seconds + # _: throw away variable - # Final os.close(STDOUT_FILENO) to avoid --py-debug build corrupting stream with + # Final os.close(STDERR_FILENO) to avoid --py-debug build corrupting stream with # "[1234 refs]" during exit. @staticmethod def _first_stage(): + os.fstat(0) + os.fstat(1) R,W=os.pipe() r,w=os.pipe() if os.fork(): @@ -1437,8 +1447,16 @@ def _first_stage(): os.environ['ARGV0']=sys.executable os.execl(sys.executable,sys.executable+'(mitogen:%s)'%sys.argv[2]) os.write(1,'MITO000\n'.encode()) + n=int(sys.argv[3]) + t=float(sys.argv[4]) C=''.encode() - while int(sys.argv[3])-len(C)and select.select([0],[],[]):C+=os.read(0,int(sys.argv[3])-len(C)) + V='V' + while n>len(C) and V: + rl,_,_=select.select([0],[],[],t) + if not rl: + sys.exit(1) + V=os.read(0,n-len(C)) + C+=V C=zlib.decompress(C) f=os.fdopen(W,'wb',0) f.write(C) @@ -1447,7 +1465,10 @@ def _first_stage(): f.write(C) f.close() os.write(1,'MITO001\n'.encode()) - os.close(2) + try: + os.close(2) + except OSError: + pass def get_python_argv(self): """ @@ -1484,6 +1505,7 @@ def get_boot_command(self): encoded.decode(), self.options.remote_name, str(len(self.get_preamble())), + str(self._first_stage_select_timeout), ] def get_econtext_config(self): diff --git a/tests/first_stage_test.py b/tests/first_stage_test.py index 2576ec14d..ee7475600 100644 --- a/tests/first_stage_test.py +++ b/tests/first_stage_test.py @@ -1,4 +1,8 @@ -import subprocess +import fcntl +import functools +import sys +import operator +import os import mitogen.core import mitogen.parent @@ -7,6 +11,170 @@ import testlib +def own_create_child(args, blocking, pipe_size=None, preexec_fn=None, pass_stderr=True): + """ + Create a child process whose stdin/stdout/stderr is connected to a pipe. + + :param list args: + Program argument vector. + :param bool blocking: + If :data:`True`, the pipes use blocking IO, otherwise non-blocking. + :param int pipe_size: + If not :data:`None`, use the values as the pipe size. + :param function preexec_fn: + If not :data:`None`, a function to run within the post-fork child + before executing the target program. + :returns: + :class:`PopenProcess` instance. + """ + parent_rfp, child_wfp = mitogen.core.pipe(blocking=blocking) + child_rfp, parent_wfp = mitogen.core.pipe(blocking=blocking) + stderr_r, stderr = mitogen.core.pipe(blocking=blocking) + mitogen.core.set_cloexec(stderr_r.fileno()) + if pipe_size is not None: + fcntl.fcntl(parent_rfp.fileno(), fcntl.F_SETPIPE_SZ, pipe_size) + fcntl.fcntl(child_rfp.fileno(), fcntl.F_SETPIPE_SZ, pipe_size) + fcntl.fcntl(stderr_r.fileno(), fcntl.F_SETPIPE_SZ, pipe_size) + assert fcntl.fcntl(parent_rfp.fileno(), fcntl.F_GETPIPE_SZ) == pipe_size + assert fcntl.fcntl(child_rfp.fileno(), fcntl.F_GETPIPE_SZ) == pipe_size + assert fcntl.fcntl(stderr_r.fileno(), fcntl.F_GETPIPE_SZ) == pipe_size + + try: + proc = testlib.subprocess.Popen( + args=args, + stdin=child_rfp, + stdout=child_wfp, + stderr=stderr, + preexec_fn=preexec_fn, + ) + except Exception: + child_rfp.close() + child_wfp.close() + parent_rfp.close() + parent_wfp.close() + stderr_r.close() + stderr.close() + raise + + child_rfp.close() + child_wfp.close() + stderr.close() + # Only used to create a specific test scenario! + if not pass_stderr: + stderr_r.close() + stderr_r = None + return mitogen.parent.PopenProcess( + proc=proc, + stdin=parent_wfp, + stdout=parent_rfp, + stderr=stderr_r, + ) + + +class DummyConnectionBlocking(mitogen.parent.Connection): + """Dummy blocking IO connection""" + + pipe_size = 4096 if getattr(fcntl, "F_SETPIPE_SZ", None) else None + create_child = staticmethod( + functools.partial(own_create_child, blocking=True, pipe_size=pipe_size) + ) + name_prefix = "dummy_blocking" + + +class DummyConnectionNonBlocking(mitogen.parent.Connection): + """Dummy non-blocking IO connection""" + + pipe_size = 4096 if getattr(fcntl, "F_SETPIPE_SZ", None) else None + create_child = staticmethod( + functools.partial(own_create_child, blocking=False, pipe_size=pipe_size) + ) + name_prefix = "dummy_non_blocking" + + +class DummyConnectionClosedStderr(mitogen.parent.Connection): + """Dummy closed stderr connection""" + + pipe_size = 4096 if getattr(fcntl, "F_SETPIPE_SZ", None) else None + create_child = staticmethod( + functools.partial( + own_create_child, + blocking=True, + pipe_size=pipe_size, + pass_stderr=False, + # `os.close(2)` does not work here as we use file objects in + # `create_child` and that would cause problems with Python2. + preexec_fn=lambda: sys.stderr.close(), + ) + ) + name_prefix = "dummy_closed_stderr" + + +class ConnectionTest(testlib.RouterMixin, testlib.TestCase): + def test_non_blocking_stdin(self): + """Test that first stage works with non-blocking STDIN + + The boot command should read the preamble from STDIN, write all ECO + markers to STDOUT, and then execute the preamble. + + This test writes the complete preamble to non-blocking STDIN. + + 1. Fork child reads from non-blocking STDIN + 2. Fork child writes all data as expected by the protocol. + 3. A context call works as expected. + + """ + log = testlib.LogCapturer() + log.start() + ctx = self.router._connect(DummyConnectionNonBlocking, connect_timeout=0.5) + self.assertEqual(3, ctx.call(operator.add, 1, 2)) + logs = log.stop() + + def test_blocking_stdin(self): + """Test that first stage works with blocking STDIN + + The boot command should read the preamble from STDIN, write all ECO + markers to STDOUT, and then execute the preamble. + + This test writes the complete preamble to blocking STDIN. + + 1. Fork child reads from blocking STDIN + 2. Fork child writes all data as expected by the protocol. + 3. A context call works as expected. + + """ + log = testlib.LogCapturer() + log.start() + ctx = self.router._connect(DummyConnectionBlocking, connect_timeout=0.5) + self.assertEqual(3, ctx.call(operator.add, 1, 2)) + logs = log.stop() + + def test_closed_stderr(self): + """Test that first stage works with closed STDERR + + The boot command should read the preamble from STDIN, write all ECO + markers to STDOUT, and then execute the preamble. + + This test writes the complete preamble to blocking STDIN. + + 1. Fork child reads from blocking STDIN + 2. Fork child decompresses the data, does send the handshakes MITO001 and MITO002 + 3. Fork child crashes (when it tries to close the already closed + STDERR), but that's non-critical as the parent can read the data + already written by the fork child. + 4. Fork child's file descriptors (write pipes) are closed by the OS + 5. Fork parent does `dup(, )` and `exec()` + 6. Python reads all data from stdin + 7. Python runs the preamble code + 8. A context call works as expected. + + """ + log = testlib.LogCapturer() + log.start() + ctx = self.router._connect(DummyConnectionClosedStderr, connect_timeout=0.5) + self.assertEqual(3, ctx.call(operator.add, 1, 2)) + logs = log.stop() + + class CommandLineTest(testlib.RouterMixin, testlib.TestCase): # Ensure this version of Python produces a command line that is sufficient # to bootstrap this version of Python. @@ -16,29 +184,39 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase): # * 2.7 starting 3.x # * 3.x starting 2.7 - def test_valid_syntax(self): + def setUp(self): + super(CommandLineTest, self).setUp() options = mitogen.parent.Options(max_message_size=123) conn = mitogen.parent.Connection(options, self.router) conn.context = mitogen.core.Context(None, 123) - args = conn.get_boot_command() + self.args = conn.get_boot_command() + self.preamble = conn.get_preamble() + self.conn = conn + + def test_valid_syntax(self): + """Test valid syntax - # The boot command should write an ECO marker to stdout, read the - # preamble from stdin, then execute it. + The boot command should write an ECO marker to stdout, read the + preamble from stdin, then execute it. - # This test attaches /dev/zero to stdin to create a specific failure - # 1. Fork child reads bytes of NUL (`b'\0'`) - # 2. Fork child crashes (trying to decompress the junk data) - # 3. Fork child's file descriptors (write pipes) are closed by the OS - # 4. Fork parent does `dup(, )` and `exec()` - # 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) - # 6. Python runs `''` (a valid script) and exits with success + This test attaches /dev/zero to stdin to create a specific failure + + 1. Fork child reads bytes of NUL (`b'\0'`) + 2. Fork child crashes (trying to decompress the junk data) + 3. Fork child's file descriptors (write pipes) are closed by the OS + 4. Fork parent does `dup(, )` and `exec()` + 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) + 6. Python runs `''` (a valid script) and exits with success + + """ fp = open("/dev/zero", "r") try: - proc = subprocess.Popen(args, + proc = testlib.subprocess.Popen( + self.args, stdin=fp, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, ) stdout, stderr = proc.communicate() self.assertEqual(0, proc.returncode) @@ -50,3 +228,171 @@ def test_valid_syntax(self): ) finally: fp.close() + + def test_premature_eof(self): + """The boot command should write an ECO marker to stdout, read the + preamble from stdin, then execute it. + + This test writes some data to STDIN and closes it then to create an + EOF situation. + 1. Fork child tries to read from STDIN, but stops as EOF is received. + 2. Fork child crashes (trying to decompress the junk data) + 3. Fork child's file descriptors (write pipes) are closed by the OS + 4. Fork parent does `dup(, )` and `exec()` + 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) + 6. Python runs `''` (a valid script) and exits with success""" + + proc = testlib.subprocess.Popen( + args=self.args, + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, + stdin=testlib.subprocess.PIPE, + ) + + # Do not send all of the data from the preamble + proc.stdin.write(self.preamble[:-128]) + proc.stdin.flush() + proc.stdin.close() + try: + returncode = proc.wait(timeout=10) + except testlib.subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + self.fail("First stage did not handle EOF on STDIN") + try: + self.assertEqual(0, returncode) + self.assertEqual( + proc.stdout.read(), + mitogen.parent.BootstrapProtocol.EC0_MARKER + b("\n"), + ) + self.assertIn( + b("Error -5 while decompressing data"), + proc.stderr.read(), + ) + finally: + proc.stdout.close() + proc.stderr.close() + + def test_timeout_error(self): + """The boot command should write an ECO marker to stdout, try to read + the preamble from stdin, then fail with an TimeoutError as nothing has + been written. + + This test writes no data to STDIN of the fork child to enforce a time out. + 1. Fork child tries to read from STDIN, but runs into the timeout + 2. Fork child raises TimeoutError + 3. Fork child's file descriptors (write pipes) are closed by the OS + 4. Fork parent does `dup(, )` and `exec()` + 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) + 6. Python runs `''` (a valid script) and exits with success + """ + + # We do not want to wait the default of 10s, change it to 0.1s + self.conn._first_stage_select_timeout = 0.1 + args = self.conn.get_boot_command() + + proc = testlib.subprocess.Popen( + args=args, + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, + close_fds=True, + ) + try: + returncode = proc.wait(timeout=3) + except testlib.subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + self.fail("Timeout situation was not recognized") + else: + stdout = proc.stdout.read() + stderr = proc.stderr.read() + finally: + proc.stdout.close() + proc.stderr.close() + self.assertEqual(0, returncode) + self.assertEqual(stdout, mitogen.parent.BootstrapProtocol.EC0_MARKER + b("\n")) + self.assertIn( + b(""), + stderr, + ) + + def test_closed_stdin(self): + """This test closes STDIN of the child process. + + 1. The child process detects that STDIN is unavailable + 2. The child process terminates early with an OSError exception, and + reports the issue via exception printed on STDERR. + 3. The parent process correctly identifies this condition. + + """ + # We do not want to wait the default of 10s, change it to 0.1s + self.conn._first_stage_timeout = 0.1 + args = self.conn.get_boot_command() + + proc = testlib.subprocess.Popen( + args=args, + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, + preexec_fn=lambda: os.close(0), + close_fds=True, + ) + try: + stdout, stderr = proc.communicate(timeout=12) + except testlib.subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + self.fail("Closed STDIN situation was not recognized") + self.assertEqual(1, proc.returncode) + self.assertEqual(stdout, b"") + self.assertIn( + b("Bad file descriptor"), + stderr, + ) + + def test_closed_stdout(self): + """Test that first stage bails out if STDOUT is closed + + This test closes STDOUT of the child process. + + 1. The child process detects that STDOUT is unavailable + 2. The child process terminates early with an OSError exception, and + reports the issue via exception printed on STDERR. + 3. The parent process correctly identifies this condition. + + """ + + stdout_r, stdout_w = mitogen.core.pipe() + mitogen.core.set_cloexec(stdout_r.fileno()) + stderr_r, stderr_w = mitogen.core.pipe() + mitogen.core.set_cloexec(stderr_r.fileno()) + try: + proc = testlib.subprocess.Popen( + args=self.args, + stdout=stdout_w, + stderr=stderr_w, + preexec_fn=lambda: os.close(0), + ) + except Exception: + stdout_r.close() + stdout_w.close() + stderr_w.close() + stderr_r.close() + raise + stdout_w.close() + stderr_w.close() + try: + returncode = proc.wait(timeout=1) + except testlib.subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + self.fail("Closed STDOUT situation was not detected") + else: + stderr = stderr_r.read() + finally: + stderr_r.close() + stdout_r.close() + self.assertEqual(1, returncode) + self.assertIn( + b("Bad file descriptor"), + stderr, + )