| 
1 | 1 | import array  | 
 | 2 | +import os  | 
 | 3 | +import struct  | 
 | 4 | +import threading  | 
2 | 5 | import unittest  | 
3 | 6 | from test.support import get_attribute  | 
 | 7 | +from test.support import threading_helper  | 
4 | 8 | from test.support.import_helper import import_module  | 
5 |  | -import os, struct  | 
6 | 9 | fcntl = import_module('fcntl')  | 
7 | 10 | termios = import_module('termios')  | 
8 |  | -get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature  | 
9 |  | - | 
10 |  | -try:  | 
11 |  | -    tty = open("/dev/tty", "rb")  | 
12 |  | -except OSError:  | 
13 |  | -    raise unittest.SkipTest("Unable to open /dev/tty")  | 
14 |  | -else:  | 
15 |  | -    with tty:  | 
16 |  | -        # Skip if another process is in foreground  | 
17 |  | -        r = fcntl.ioctl(tty, termios.TIOCGPGRP, struct.pack("i", 0))  | 
18 |  | -    rpgrp = struct.unpack("i", r)[0]  | 
19 |  | -    if rpgrp not in (os.getpgrp(), os.getsid(0)):  | 
20 |  | -        raise unittest.SkipTest("Neither the process group nor the session "  | 
21 |  | -                                "are attached to /dev/tty")  | 
22 |  | -    del tty, r, rpgrp  | 
23 | 11 | 
 
  | 
24 | 12 | try:  | 
25 | 13 |     import pty  | 
26 | 14 | except ImportError:  | 
27 | 15 |     pty = None  | 
28 | 16 | 
 
  | 
29 |  | -class IoctlTests(unittest.TestCase):  | 
 | 17 | +class IoctlTestsTty(unittest.TestCase):  | 
 | 18 | +    @classmethod  | 
 | 19 | +    def setUpClass(cls):  | 
 | 20 | +        TIOCGPGRP = get_attribute(termios, 'TIOCGPGRP')  | 
 | 21 | +        try:  | 
 | 22 | +            tty = open("/dev/tty", "rb")  | 
 | 23 | +        except OSError:  | 
 | 24 | +            raise unittest.SkipTest("Unable to open /dev/tty")  | 
 | 25 | +        with tty:  | 
 | 26 | +            # Skip if another process is in foreground  | 
 | 27 | +            r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0))  | 
 | 28 | +        rpgrp = struct.unpack("i", r)[0]  | 
 | 29 | +        if rpgrp not in (os.getpgrp(), os.getsid(0)):  | 
 | 30 | +            raise unittest.SkipTest("Neither the process group nor the session "  | 
 | 31 | +                                    "are attached to /dev/tty")  | 
 | 32 | + | 
30 | 33 |     def test_ioctl_immutable_buf(self):  | 
31 | 34 |         # If this process has been put into the background, TIOCGPGRP returns  | 
32 | 35 |         # the session ID instead of the process group id.  | 
@@ -132,23 +135,27 @@ def test_ioctl_mutate_2048(self):  | 
132 | 135 |         self._check_ioctl_mutate_len(2048)  | 
133 | 136 |         self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048)  | 
134 | 137 | 
 
  | 
 | 138 | + | 
 | 139 | +@unittest.skipIf(pty is None, 'pty module required')  | 
 | 140 | +class IoctlTestsPty(unittest.TestCase):  | 
 | 141 | +    def setUp(self):  | 
 | 142 | +        self.master_fd, self.slave_fd = pty.openpty()  | 
 | 143 | +        self.addCleanup(os.close, self.slave_fd)  | 
 | 144 | +        self.addCleanup(os.close, self.master_fd)  | 
 | 145 | + | 
 | 146 | +    @unittest.skipUnless(hasattr(termios, 'TCFLSH'), 'requires termios.TCFLSH')  | 
135 | 147 |     def test_ioctl_tcflush(self):  | 
136 |  | -        with open("/dev/tty", "rb") as tty:  | 
137 |  | -            r = fcntl.ioctl(tty, termios.TCFLSH, termios.TCIFLUSH)  | 
138 |  | -            self.assertEqual(r, 0)  | 
 | 148 | +        r = fcntl.ioctl(self.master_fd, termios.TCFLSH, termios.TCIFLUSH)  | 
 | 149 | +        self.assertEqual(r, 0)  | 
 | 150 | +        r = fcntl.ioctl(self.master_fd, termios.TCFLSH, termios.TCOFLUSH)  | 
 | 151 | +        self.assertEqual(r, 0)  | 
139 | 152 | 
 
  | 
140 |  | -    @unittest.skipIf(pty is None, 'pty module required')  | 
141 | 153 |     def test_ioctl_set_window_size(self):  | 
142 |  | -        mfd, sfd = pty.openpty()  | 
143 |  | -        try:  | 
144 |  | -            # (rows, columns, xpixel, ypixel)  | 
145 |  | -            our_winsz = struct.pack("HHHH", 20, 40, 0, 0)  | 
146 |  | -            result = fcntl.ioctl(mfd, termios.TIOCSWINSZ, our_winsz)  | 
147 |  | -            new_winsz = struct.unpack("HHHH", result)  | 
148 |  | -            self.assertEqual(new_winsz[:2], (20, 40))  | 
149 |  | -        finally:  | 
150 |  | -            os.close(mfd)  | 
151 |  | -            os.close(sfd)  | 
 | 154 | +        # (rows, columns, xpixel, ypixel)  | 
 | 155 | +        our_winsz = struct.pack("HHHH", 20, 40, 0, 0)  | 
 | 156 | +        result = fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, our_winsz)  | 
 | 157 | +        new_winsz = struct.unpack("HHHH", result)  | 
 | 158 | +        self.assertEqual(new_winsz[:2], (20, 40))  | 
152 | 159 | 
 
  | 
153 | 160 | 
 
  | 
154 | 161 | if __name__ == "__main__":  | 
 | 
0 commit comments