|
1 | 1 | import array |
| 2 | +import os |
| 3 | +import struct |
| 4 | +import threading |
2 | 5 | import unittest |
3 | 6 | from test.support import get_attribute |
| 7 | +from test.support import threading_helper |
4 | 8 | from test.support.import_helper import import_module |
5 | | -import os, struct |
6 | 9 | fcntl = import_module('fcntl') |
7 | 10 | termios = import_module('termios') |
8 | | -get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature |
9 | | - |
10 | | -try: |
11 | | - tty = open("/dev/tty", "rb") |
12 | | -except OSError: |
13 | | - raise unittest.SkipTest("Unable to open /dev/tty") |
14 | | -else: |
15 | | - with tty: |
16 | | - # Skip if another process is in foreground |
17 | | - r = fcntl.ioctl(tty, termios.TIOCGPGRP, struct.pack("i", 0)) |
18 | | - rpgrp = struct.unpack("i", r)[0] |
19 | | - if rpgrp not in (os.getpgrp(), os.getsid(0)): |
20 | | - raise unittest.SkipTest("Neither the process group nor the session " |
21 | | - "are attached to /dev/tty") |
22 | | - del tty, r, rpgrp |
23 | 11 |
|
24 | 12 | try: |
25 | 13 | import pty |
26 | 14 | except ImportError: |
27 | 15 | pty = None |
28 | 16 |
|
29 | | -class IoctlTests(unittest.TestCase): |
| 17 | +class IoctlTestsTty(unittest.TestCase): |
| 18 | + @classmethod |
| 19 | + def setUpClass(cls): |
| 20 | + TIOCGPGRP = get_attribute(termios, 'TIOCGPGRP') |
| 21 | + try: |
| 22 | + tty = open("/dev/tty", "rb") |
| 23 | + except OSError: |
| 24 | + raise unittest.SkipTest("Unable to open /dev/tty") |
| 25 | + with tty: |
| 26 | + # Skip if another process is in foreground |
| 27 | + r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0)) |
| 28 | + rpgrp = struct.unpack("i", r)[0] |
| 29 | + if rpgrp not in (os.getpgrp(), os.getsid(0)): |
| 30 | + raise unittest.SkipTest("Neither the process group nor the session " |
| 31 | + "are attached to /dev/tty") |
| 32 | + |
30 | 33 | def test_ioctl_immutable_buf(self): |
31 | 34 | # If this process has been put into the background, TIOCGPGRP returns |
32 | 35 | # the session ID instead of the process group id. |
@@ -132,23 +135,27 @@ def test_ioctl_mutate_2048(self): |
132 | 135 | self._check_ioctl_mutate_len(2048) |
133 | 136 | self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048) |
134 | 137 |
|
| 138 | + |
| 139 | +@unittest.skipIf(pty is None, 'pty module required') |
| 140 | +class IoctlTestsPty(unittest.TestCase): |
| 141 | + def setUp(self): |
| 142 | + self.master_fd, self.slave_fd = pty.openpty() |
| 143 | + self.addCleanup(os.close, self.slave_fd) |
| 144 | + self.addCleanup(os.close, self.master_fd) |
| 145 | + |
| 146 | + @unittest.skipUnless(hasattr(termios, 'TCFLSH'), 'requires termios.TCFLSH') |
135 | 147 | def test_ioctl_tcflush(self): |
136 | | - with open("/dev/tty", "rb") as tty: |
137 | | - r = fcntl.ioctl(tty, termios.TCFLSH, termios.TCIFLUSH) |
138 | | - self.assertEqual(r, 0) |
| 148 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCIFLUSH) |
| 149 | + self.assertEqual(r, 0) |
| 150 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCOFLUSH) |
| 151 | + self.assertEqual(r, 0) |
139 | 152 |
|
140 | | - @unittest.skipIf(pty is None, 'pty module required') |
141 | 153 | def test_ioctl_set_window_size(self): |
142 | | - mfd, sfd = pty.openpty() |
143 | | - try: |
144 | | - # (rows, columns, xpixel, ypixel) |
145 | | - our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
146 | | - result = fcntl.ioctl(mfd, termios.TIOCSWINSZ, our_winsz) |
147 | | - new_winsz = struct.unpack("HHHH", result) |
148 | | - self.assertEqual(new_winsz[:2], (20, 40)) |
149 | | - finally: |
150 | | - os.close(mfd) |
151 | | - os.close(sfd) |
| 154 | + # (rows, columns, xpixel, ypixel) |
| 155 | + our_winsz = struct.pack("HHHH", 20, 40, 0, 0) |
| 156 | + result = fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, our_winsz) |
| 157 | + new_winsz = struct.unpack("HHHH", result) |
| 158 | + self.assertEqual(new_winsz[:2], (20, 40)) |
152 | 159 |
|
153 | 160 |
|
154 | 161 | if __name__ == "__main__": |
|
0 commit comments