2727 import_helper , is_apple , os_helper , threading_helper , warnings_helper ,
2828)
2929from test .support .os_helper import FakePath
30+ from .utils import byteslike , CTestCase , PyTestCase
3031
3132import codecs
3233import io # C implementation of io
3334import _pyio as pyio # Python implementation of io
3435
35- try :
36- import ctypes
37- except ImportError :
38- def byteslike (* pos , ** kw ):
39- return array .array ("b" , bytes (* pos , ** kw ))
40- else :
41- def byteslike (* pos , ** kw ):
42- """Create a bytes-like object having no string or sequence methods"""
43- data = bytes (* pos , ** kw )
44- obj = EmptyStruct ()
45- ctypes .resize (obj , len (data ))
46- memoryview (obj ).cast ("B" )[:] = data
47- return obj
48- class EmptyStruct (ctypes .Structure ):
49- pass
50-
5136
5237def _default_chunk_size ():
5338 """Get the default TextIOWrapper chunk size"""
@@ -63,298 +48,6 @@ class BadIndex:
6348 def __index__ (self ):
6449 1 / 0
6550
66- class MockRawIOWithoutRead :
67- """A RawIO implementation without read(), so as to exercise the default
68- RawIO.read() which calls readinto()."""
69-
70- def __init__ (self , read_stack = ()):
71- self ._read_stack = list (read_stack )
72- self ._write_stack = []
73- self ._reads = 0
74- self ._extraneous_reads = 0
75-
76- def write (self , b ):
77- self ._write_stack .append (bytes (b ))
78- return len (b )
79-
80- def writable (self ):
81- return True
82-
83- def fileno (self ):
84- return 42
85-
86- def readable (self ):
87- return True
88-
89- def seekable (self ):
90- return True
91-
92- def seek (self , pos , whence ):
93- return 0 # wrong but we gotta return something
94-
95- def tell (self ):
96- return 0 # same comment as above
97-
98- def readinto (self , buf ):
99- self ._reads += 1
100- max_len = len (buf )
101- try :
102- data = self ._read_stack [0 ]
103- except IndexError :
104- self ._extraneous_reads += 1
105- return 0
106- if data is None :
107- del self ._read_stack [0 ]
108- return None
109- n = len (data )
110- if len (data ) <= max_len :
111- del self ._read_stack [0 ]
112- buf [:n ] = data
113- return n
114- else :
115- buf [:] = data [:max_len ]
116- self ._read_stack [0 ] = data [max_len :]
117- return max_len
118-
119- def truncate (self , pos = None ):
120- return pos
121-
122- class CMockRawIOWithoutRead (MockRawIOWithoutRead , io .RawIOBase ):
123- pass
124-
125- class PyMockRawIOWithoutRead (MockRawIOWithoutRead , pyio .RawIOBase ):
126- pass
127-
128-
129- class MockRawIO (MockRawIOWithoutRead ):
130-
131- def read (self , n = None ):
132- self ._reads += 1
133- try :
134- return self ._read_stack .pop (0 )
135- except :
136- self ._extraneous_reads += 1
137- return b""
138-
139- class CMockRawIO (MockRawIO , io .RawIOBase ):
140- pass
141-
142- class PyMockRawIO (MockRawIO , pyio .RawIOBase ):
143- pass
144-
145-
146- class MisbehavedRawIO (MockRawIO ):
147- def write (self , b ):
148- return super ().write (b ) * 2
149-
150- def read (self , n = None ):
151- return super ().read (n ) * 2
152-
153- def seek (self , pos , whence ):
154- return - 123
155-
156- def tell (self ):
157- return - 456
158-
159- def readinto (self , buf ):
160- super ().readinto (buf )
161- return len (buf ) * 5
162-
163- class CMisbehavedRawIO (MisbehavedRawIO , io .RawIOBase ):
164- pass
165-
166- class PyMisbehavedRawIO (MisbehavedRawIO , pyio .RawIOBase ):
167- pass
168-
169-
170- class SlowFlushRawIO (MockRawIO ):
171- def __init__ (self ):
172- super ().__init__ ()
173- self .in_flush = threading .Event ()
174-
175- def flush (self ):
176- self .in_flush .set ()
177- time .sleep (0.25 )
178-
179- class CSlowFlushRawIO (SlowFlushRawIO , io .RawIOBase ):
180- pass
181-
182- class PySlowFlushRawIO (SlowFlushRawIO , pyio .RawIOBase ):
183- pass
184-
185-
186- class CloseFailureIO (MockRawIO ):
187- closed = 0
188-
189- def close (self ):
190- if not self .closed :
191- self .closed = 1
192- raise OSError
193-
194- class CCloseFailureIO (CloseFailureIO , io .RawIOBase ):
195- pass
196-
197- class PyCloseFailureIO (CloseFailureIO , pyio .RawIOBase ):
198- pass
199-
200-
201- class MockFileIO :
202-
203- def __init__ (self , data ):
204- self .read_history = []
205- super ().__init__ (data )
206-
207- def read (self , n = None ):
208- res = super ().read (n )
209- self .read_history .append (None if res is None else len (res ))
210- return res
211-
212- def readinto (self , b ):
213- res = super ().readinto (b )
214- self .read_history .append (res )
215- return res
216-
217- class CMockFileIO (MockFileIO , io .BytesIO ):
218- pass
219-
220- class PyMockFileIO (MockFileIO , pyio .BytesIO ):
221- pass
222-
223-
224- class MockUnseekableIO :
225- def seekable (self ):
226- return False
227-
228- def seek (self , * args ):
229- raise self .UnsupportedOperation ("not seekable" )
230-
231- def tell (self , * args ):
232- raise self .UnsupportedOperation ("not seekable" )
233-
234- def truncate (self , * args ):
235- raise self .UnsupportedOperation ("not seekable" )
236-
237- class CMockUnseekableIO (MockUnseekableIO , io .BytesIO ):
238- UnsupportedOperation = io .UnsupportedOperation
239-
240- class PyMockUnseekableIO (MockUnseekableIO , pyio .BytesIO ):
241- UnsupportedOperation = pyio .UnsupportedOperation
242-
243-
244- class MockCharPseudoDevFileIO (MockFileIO ):
245- # GH-95782
246- # ftruncate() does not work on these special files (and CPython then raises
247- # appropriate exceptions), so truncate() does not have to be accounted for
248- # here.
249- def __init__ (self , data ):
250- super ().__init__ (data )
251-
252- def seek (self , * args ):
253- return 0
254-
255- def tell (self , * args ):
256- return 0
257-
258- class CMockCharPseudoDevFileIO (MockCharPseudoDevFileIO , io .BytesIO ):
259- pass
260-
261- class PyMockCharPseudoDevFileIO (MockCharPseudoDevFileIO , pyio .BytesIO ):
262- pass
263-
264-
265- class MockNonBlockWriterIO :
266-
267- def __init__ (self ):
268- self ._write_stack = []
269- self ._blocker_char = None
270-
271- def pop_written (self ):
272- s = b"" .join (self ._write_stack )
273- self ._write_stack [:] = []
274- return s
275-
276- def block_on (self , char ):
277- """Block when a given char is encountered."""
278- self ._blocker_char = char
279-
280- def readable (self ):
281- return True
282-
283- def seekable (self ):
284- return True
285-
286- def seek (self , pos , whence = 0 ):
287- # naive implementation, enough for tests
288- return 0
289-
290- def writable (self ):
291- return True
292-
293- def write (self , b ):
294- b = bytes (b )
295- n = - 1
296- if self ._blocker_char :
297- try :
298- n = b .index (self ._blocker_char )
299- except ValueError :
300- pass
301- else :
302- if n > 0 :
303- # write data up to the first blocker
304- self ._write_stack .append (b [:n ])
305- return n
306- else :
307- # cancel blocker and indicate would block
308- self ._blocker_char = None
309- return None
310- self ._write_stack .append (b )
311- return len (b )
312-
313- class CMockNonBlockWriterIO (MockNonBlockWriterIO , io .RawIOBase ):
314- BlockingIOError = io .BlockingIOError
315-
316- class PyMockNonBlockWriterIO (MockNonBlockWriterIO , pyio .RawIOBase ):
317- BlockingIOError = pyio .BlockingIOError
318-
319-
320- # Build classes which point to all the right mocks per io implementation
321- class CTestCase (unittest .TestCase ):
322- io = io
323- is_C = True
324-
325- MockRawIO = CMockRawIO
326- MisbehavedRawIO = CMisbehavedRawIO
327- MockFileIO = CMockFileIO
328- CloseFailureIO = CCloseFailureIO
329- MockNonBlockWriterIO = CMockNonBlockWriterIO
330- MockUnseekableIO = CMockUnseekableIO
331- MockRawIOWithoutRead = CMockRawIOWithoutRead
332- SlowFlushRawIO = CSlowFlushRawIO
333- MockCharPseudoDevFileIO = CMockCharPseudoDevFileIO
334-
335- # Use the class as a proxy to the io module members.
336- def __getattr__ (self , name ):
337- return getattr (io , name )
338-
339-
340- class PyTestCase (unittest .TestCase ):
341- io = pyio
342- is_C = False
343-
344- MockRawIO = PyMockRawIO
345- MisbehavedRawIO = PyMisbehavedRawIO
346- MockFileIO = PyMockFileIO
347- CloseFailureIO = PyCloseFailureIO
348- MockNonBlockWriterIO = PyMockNonBlockWriterIO
349- MockUnseekableIO = PyMockUnseekableIO
350- MockRawIOWithoutRead = PyMockRawIOWithoutRead
351- SlowFlushRawIO = PySlowFlushRawIO
352- MockCharPseudoDevFileIO = PyMockCharPseudoDevFileIO
353-
354- # Use the class as a proxy to the _pyio module members.
355- def __getattr__ (self , name ):
356- return getattr (pyio , name )
357-
35851
35952class IOTest (unittest .TestCase ):
36053
0 commit comments