2727    import_helper , is_apple , os_helper , threading_helper , warnings_helper ,
2828)
2929from  test .support .os_helper  import  FakePath 
30+ from  .utils  import  byteslike , CTestCase , PyTestCase 
3031
3132import  codecs 
3233import  io   # C implementation of io 
3334import  _pyio  as  pyio  # Python implementation of io 
3435
35- try :
36-     import  ctypes 
37- except  ImportError :
38-     def  byteslike (* pos , ** kw ):
39-         return  array .array ("b" , bytes (* pos , ** kw ))
40- else :
41-     def  byteslike (* pos , ** kw ):
42-         """Create a bytes-like object having no string or sequence methods""" 
43-         data  =  bytes (* pos , ** kw )
44-         obj  =  EmptyStruct ()
45-         ctypes .resize (obj , len (data ))
46-         memoryview (obj ).cast ("B" )[:] =  data 
47-         return  obj 
48-     class  EmptyStruct (ctypes .Structure ):
49-         pass 
50- 
5136
5237def  _default_chunk_size ():
5338    """Get the default TextIOWrapper chunk size""" 
@@ -63,298 +48,6 @@ class BadIndex:
6348    def  __index__ (self ):
6449        1 / 0 
6550
66- class  MockRawIOWithoutRead :
67-     """A RawIO implementation without read(), so as to exercise the default 
68-     RawIO.read() which calls readinto().""" 
69- 
70-     def  __init__ (self , read_stack = ()):
71-         self ._read_stack  =  list (read_stack )
72-         self ._write_stack  =  []
73-         self ._reads  =  0 
74-         self ._extraneous_reads  =  0 
75- 
76-     def  write (self , b ):
77-         self ._write_stack .append (bytes (b ))
78-         return  len (b )
79- 
80-     def  writable (self ):
81-         return  True 
82- 
83-     def  fileno (self ):
84-         return  42 
85- 
86-     def  readable (self ):
87-         return  True 
88- 
89-     def  seekable (self ):
90-         return  True 
91- 
92-     def  seek (self , pos , whence ):
93-         return  0    # wrong but we gotta return something 
94- 
95-     def  tell (self ):
96-         return  0    # same comment as above 
97- 
98-     def  readinto (self , buf ):
99-         self ._reads  +=  1 
100-         max_len  =  len (buf )
101-         try :
102-             data  =  self ._read_stack [0 ]
103-         except  IndexError :
104-             self ._extraneous_reads  +=  1 
105-             return  0 
106-         if  data  is  None :
107-             del  self ._read_stack [0 ]
108-             return  None 
109-         n  =  len (data )
110-         if  len (data ) <=  max_len :
111-             del  self ._read_stack [0 ]
112-             buf [:n ] =  data 
113-             return  n 
114-         else :
115-             buf [:] =  data [:max_len ]
116-             self ._read_stack [0 ] =  data [max_len :]
117-             return  max_len 
118- 
119-     def  truncate (self , pos = None ):
120-         return  pos 
121- 
122- class  CMockRawIOWithoutRead (MockRawIOWithoutRead , io .RawIOBase ):
123-     pass 
124- 
125- class  PyMockRawIOWithoutRead (MockRawIOWithoutRead , pyio .RawIOBase ):
126-     pass 
127- 
128- 
129- class  MockRawIO (MockRawIOWithoutRead ):
130- 
131-     def  read (self , n = None ):
132-         self ._reads  +=  1 
133-         try :
134-             return  self ._read_stack .pop (0 )
135-         except :
136-             self ._extraneous_reads  +=  1 
137-             return  b"" 
138- 
139- class  CMockRawIO (MockRawIO , io .RawIOBase ):
140-     pass 
141- 
142- class  PyMockRawIO (MockRawIO , pyio .RawIOBase ):
143-     pass 
144- 
145- 
146- class  MisbehavedRawIO (MockRawIO ):
147-     def  write (self , b ):
148-         return  super ().write (b ) *  2 
149- 
150-     def  read (self , n = None ):
151-         return  super ().read (n ) *  2 
152- 
153-     def  seek (self , pos , whence ):
154-         return  - 123 
155- 
156-     def  tell (self ):
157-         return  - 456 
158- 
159-     def  readinto (self , buf ):
160-         super ().readinto (buf )
161-         return  len (buf ) *  5 
162- 
163- class  CMisbehavedRawIO (MisbehavedRawIO , io .RawIOBase ):
164-     pass 
165- 
166- class  PyMisbehavedRawIO (MisbehavedRawIO , pyio .RawIOBase ):
167-     pass 
168- 
169- 
170- class  SlowFlushRawIO (MockRawIO ):
171-     def  __init__ (self ):
172-         super ().__init__ ()
173-         self .in_flush  =  threading .Event ()
174- 
175-     def  flush (self ):
176-         self .in_flush .set ()
177-         time .sleep (0.25 )
178- 
179- class  CSlowFlushRawIO (SlowFlushRawIO , io .RawIOBase ):
180-     pass 
181- 
182- class  PySlowFlushRawIO (SlowFlushRawIO , pyio .RawIOBase ):
183-     pass 
184- 
185- 
186- class  CloseFailureIO (MockRawIO ):
187-     closed  =  0 
188- 
189-     def  close (self ):
190-         if  not  self .closed :
191-             self .closed  =  1 
192-             raise  OSError 
193- 
194- class  CCloseFailureIO (CloseFailureIO , io .RawIOBase ):
195-     pass 
196- 
197- class  PyCloseFailureIO (CloseFailureIO , pyio .RawIOBase ):
198-     pass 
199- 
200- 
201- class  MockFileIO :
202- 
203-     def  __init__ (self , data ):
204-         self .read_history  =  []
205-         super ().__init__ (data )
206- 
207-     def  read (self , n = None ):
208-         res  =  super ().read (n )
209-         self .read_history .append (None  if  res  is  None  else  len (res ))
210-         return  res 
211- 
212-     def  readinto (self , b ):
213-         res  =  super ().readinto (b )
214-         self .read_history .append (res )
215-         return  res 
216- 
217- class  CMockFileIO (MockFileIO , io .BytesIO ):
218-     pass 
219- 
220- class  PyMockFileIO (MockFileIO , pyio .BytesIO ):
221-     pass 
222- 
223- 
224- class  MockUnseekableIO :
225-     def  seekable (self ):
226-         return  False 
227- 
228-     def  seek (self , * args ):
229-         raise  self .UnsupportedOperation ("not seekable" )
230- 
231-     def  tell (self , * args ):
232-         raise  self .UnsupportedOperation ("not seekable" )
233- 
234-     def  truncate (self , * args ):
235-         raise  self .UnsupportedOperation ("not seekable" )
236- 
237- class  CMockUnseekableIO (MockUnseekableIO , io .BytesIO ):
238-     UnsupportedOperation  =  io .UnsupportedOperation 
239- 
240- class  PyMockUnseekableIO (MockUnseekableIO , pyio .BytesIO ):
241-     UnsupportedOperation  =  pyio .UnsupportedOperation 
242- 
243- 
244- class  MockCharPseudoDevFileIO (MockFileIO ):
245-     # GH-95782 
246-     # ftruncate() does not work on these special files (and CPython then raises 
247-     # appropriate exceptions), so truncate() does not have to be accounted for 
248-     # here. 
249-     def  __init__ (self , data ):
250-         super ().__init__ (data )
251- 
252-     def  seek (self , * args ):
253-         return  0 
254- 
255-     def  tell (self , * args ):
256-         return  0 
257- 
258- class  CMockCharPseudoDevFileIO (MockCharPseudoDevFileIO , io .BytesIO ):
259-     pass 
260- 
261- class  PyMockCharPseudoDevFileIO (MockCharPseudoDevFileIO , pyio .BytesIO ):
262-     pass 
263- 
264- 
265- class  MockNonBlockWriterIO :
266- 
267-     def  __init__ (self ):
268-         self ._write_stack  =  []
269-         self ._blocker_char  =  None 
270- 
271-     def  pop_written (self ):
272-         s  =  b"" .join (self ._write_stack )
273-         self ._write_stack [:] =  []
274-         return  s 
275- 
276-     def  block_on (self , char ):
277-         """Block when a given char is encountered.""" 
278-         self ._blocker_char  =  char 
279- 
280-     def  readable (self ):
281-         return  True 
282- 
283-     def  seekable (self ):
284-         return  True 
285- 
286-     def  seek (self , pos , whence = 0 ):
287-         # naive implementation, enough for tests 
288-         return  0 
289- 
290-     def  writable (self ):
291-         return  True 
292- 
293-     def  write (self , b ):
294-         b  =  bytes (b )
295-         n  =  - 1 
296-         if  self ._blocker_char :
297-             try :
298-                 n  =  b .index (self ._blocker_char )
299-             except  ValueError :
300-                 pass 
301-             else :
302-                 if  n  >  0 :
303-                     # write data up to the first blocker 
304-                     self ._write_stack .append (b [:n ])
305-                     return  n 
306-                 else :
307-                     # cancel blocker and indicate would block 
308-                     self ._blocker_char  =  None 
309-                     return  None 
310-         self ._write_stack .append (b )
311-         return  len (b )
312- 
313- class  CMockNonBlockWriterIO (MockNonBlockWriterIO , io .RawIOBase ):
314-     BlockingIOError  =  io .BlockingIOError 
315- 
316- class  PyMockNonBlockWriterIO (MockNonBlockWriterIO , pyio .RawIOBase ):
317-     BlockingIOError  =  pyio .BlockingIOError 
318- 
319- 
320- # Build classes which point to all the right mocks per io implementation 
321- class  CTestCase (unittest .TestCase ):
322-     io  =  io 
323-     is_C  =  True 
324- 
325-     MockRawIO  =  CMockRawIO 
326-     MisbehavedRawIO  =  CMisbehavedRawIO 
327-     MockFileIO  =  CMockFileIO 
328-     CloseFailureIO  =  CCloseFailureIO 
329-     MockNonBlockWriterIO  =  CMockNonBlockWriterIO 
330-     MockUnseekableIO  =  CMockUnseekableIO 
331-     MockRawIOWithoutRead  =  CMockRawIOWithoutRead 
332-     SlowFlushRawIO  =  CSlowFlushRawIO 
333-     MockCharPseudoDevFileIO  =  CMockCharPseudoDevFileIO 
334- 
335-     # Use the class as a proxy to the io module members. 
336-     def  __getattr__ (self , name ):
337-         return  getattr (io , name )
338- 
339- 
340- class  PyTestCase (unittest .TestCase ):
341-     io  =  pyio 
342-     is_C  =  False 
343- 
344-     MockRawIO  =  PyMockRawIO 
345-     MisbehavedRawIO  =  PyMisbehavedRawIO 
346-     MockFileIO  =  PyMockFileIO 
347-     CloseFailureIO  =  PyCloseFailureIO 
348-     MockNonBlockWriterIO  =  PyMockNonBlockWriterIO 
349-     MockUnseekableIO  =  PyMockUnseekableIO 
350-     MockRawIOWithoutRead  =  PyMockRawIOWithoutRead 
351-     SlowFlushRawIO  =  PySlowFlushRawIO 
352-     MockCharPseudoDevFileIO  =  PyMockCharPseudoDevFileIO 
353- 
354-     # Use the class as a proxy to the _pyio module members. 
355-     def  __getattr__ (self , name ):
356-         return  getattr (pyio , name )
357- 
35851
35952class  IOTest (unittest .TestCase ):
36053
0 commit comments