@@ -460,6 +460,44 @@ def test_concatenated_gzip():
460
460
assert data == result
461
461
462
462
463
+ def test_seek ():
464
+ from io import SEEK_CUR , SEEK_END , SEEK_SET
465
+ with tempfile .NamedTemporaryFile ("wb" ) as tmpfile :
466
+ tmpfile .write (gzip .compress (b"X" * 500 + b"A" + b"X" * 499 ))
467
+ tmpfile .write (gzip .compress (b"X" * 500 + b"B" + b"X" * 499 ))
468
+ tmpfile .write (gzip .compress (b"X" * 500 + b"C" + b"X" * 499 ))
469
+ tmpfile .write (gzip .compress (b"X" * 500 + b"D" + b"X" * 499 ))
470
+ tmpfile .flush ()
471
+ with igzip .open (tmpfile .name , "rb" ) as gzip_file :
472
+ # Start testing forward seek
473
+ gzip_file .seek (500 )
474
+ assert gzip_file .read (1 ) == b"A"
475
+ gzip_file .seek (1500 )# Any positive number should end up at the end
476
+ assert gzip_file .read (1 ) == b"B"
477
+ # Test reverse
478
+ gzip_file .seek (500 )
479
+ assert gzip_file .read (1 ) == b"A"
480
+ # Again, but with explicit SEEK_SET
481
+ gzip_file .seek (500 , SEEK_SET )
482
+ assert gzip_file .read (1 ) == b"A"
483
+ gzip_file .seek (1500 , SEEK_SET )
484
+ assert gzip_file .read (1 ) == b"B"
485
+ gzip_file .seek (500 , SEEK_SET )
486
+ assert gzip_file .read (1 ) == b"A"
487
+ # Seeking from current position
488
+ gzip_file .seek (500 )
489
+ gzip_file .seek (2000 , SEEK_CUR )
490
+ assert gzip_file .read (1 ) == b"C"
491
+ gzip_file .seek (- 1001 , SEEK_CUR )
492
+ assert gzip_file .read (1 ) == b"B"
493
+ # Seeking from end
494
+ # Any positive number should end up at the end
495
+ gzip_file .seek (200 , SEEK_END )
496
+ assert gzip_file .read (1 ) == b""
497
+ gzip_file .seek (- 1500 , SEEK_END )
498
+ assert gzip_file .read (1 ) == b"C"
499
+
500
+
463
501
def test_bgzip ():
464
502
bgzip_file = Path (__file__ ).parent / "data" / "test.fastq.bgzip.gz"
465
503
gzip_file = Path (__file__ ).parent / "data" / "test.fastq.gz"
0 commit comments