@@ -230,6 +230,47 @@ def test_read(self):
230230 self .assertEqual (type (s ), bytes )
231231 self .assertEqual (s , b"spam" )
232232
233+ def test_readinto (self ):
234+ with open (os_helper .TESTFN , "w+b" ) as fobj :
235+ fobj .write (b"spam" )
236+ fobj .flush ()
237+ fd = fobj .fileno ()
238+ os .lseek (fd , 0 , 0 )
239+ buffer = bytearray (8 )
240+ s = os .readinto (fd , buffer )
241+ self .assertEqual (type (s ), int )
242+ self .assertEqual (s , 4 )
243+ # Should overwrite the first 4 bytes of the buffer.
244+ self .assertEqual (bytes (buffer ), b"spam\0 \0 \0 \0 " )
245+
246+ # Readinto at EOF shold return 0 and not touch buffer
247+ buffer [:] = b"notspam\0 "
248+ s = os .readinto (fd , buffer )
249+ self .assertEqual (type (s ), int )
250+ self .assertEqual (s , 0 )
251+ self .assertEqual (bytes (buffer ), b"notspam\0 " )
252+ s = os .readinto (fd , buffer )
253+ self .assertEqual (s , 0 )
254+ self .assertEqual (bytes (buffer ), b"notspam\0 " )
255+
256+ def test_readinto_badbuffer (self ):
257+ with open (os_helper .TESTFN , "w+b" ) as fobj :
258+ fobj .write (b"spam" )
259+ fobj .flush ()
260+ fd = fobj .fileno ()
261+ os .lseek (fd , 0 , 0 )
262+
263+ for bad_arg in ("test" , bytes (), 14 ):
264+ with self .subTest (f"{ type (bad_arg )} " ):
265+ with self .assertRaises (TypeError ):
266+ os .readinto (fd , bad_arg )
267+
268+ # No data should have been read with the bad arguments.
269+ buffer = bytearray (8 )
270+ s = os .readinto (fd , buffer )
271+ self .assertEqual (s , 4 )
272+ self .assertEqual (bytes (buffer ), b"spam\0 \0 \0 \0 " )
273+
233274 @support .cpython_only
234275 # Skip the test on 32-bit platforms: the number of bytes must fit in a
235276 # Py_ssize_t type
@@ -249,6 +290,29 @@ def test_large_read(self, size):
249290 # operating system is free to return less bytes than requested.
250291 self .assertEqual (data , b'test' )
251292
293+
294+ @support .cpython_only
295+ # Skip the test on 32-bit platforms: the number of bytes must fit in a
296+ # Py_ssize_t type
297+ @unittest .skipUnless (INT_MAX < PY_SSIZE_T_MAX ,
298+ "needs INT_MAX < PY_SSIZE_T_MAX" )
299+ @support .bigmemtest (size = INT_MAX + 10 , memuse = 1 , dry_run = False )
300+ def test_large_readinto (self , size ):
301+ self .addCleanup (os_helper .unlink , os_helper .TESTFN )
302+ create_file (os_helper .TESTFN , b'test' )
303+
304+ # Issue #21932: For readinto the buffer contains the length rather than
305+ # a length being passed explicitly to read, shold still get capped to a
306+ # valid size / not raise an OverflowError for sizes larger than INT_MAX.
307+ buffer = bytearray (INT_MAX + 10 )
308+ with open (os_helper .TESTFN , "rb" ) as fp :
309+ length = os .readinto (fp .fileno (), buffer )
310+
311+ # The test does not try to read more than 2 GiB at once because the
312+ # operating system is free to return less bytes than requested.
313+ self .assertEqual (length , 4 )
314+ self .assertEqual (buffer [:4 ], b'test' )
315+
252316 def test_write (self ):
253317 # os.write() accepts bytes- and buffer-like objects but not strings
254318 fd = os .open (os_helper .TESTFN , os .O_CREAT | os .O_WRONLY )
@@ -2467,6 +2531,10 @@ def test_lseek(self):
24672531 def test_read (self ):
24682532 self .check (os .read , 1 )
24692533
2534+ @unittest .skipUnless (hasattr (os , 'readinto' ), 'test needs os.readinto()' )
2535+ def test_readinto (self ):
2536+ self .check (os .readinto , bytearray (5 ))
2537+
24702538 @unittest .skipUnless (hasattr (os , 'readv' ), 'test needs os.readv()' )
24712539 def test_readv (self ):
24722540 buf = bytearray (10 )
0 commit comments