1
1
""" Utilities for getting array slices out of file-like objects
2
2
"""
3
3
from __future__ import division
4
+ from contextlib import contextmanager
4
5
5
6
import operator
6
7
from numbers import Integral
@@ -622,7 +623,7 @@ def slicers2segments(read_slicers, in_shape, offset, itemsize):
622
623
return all_segments
623
624
624
625
625
- def read_segments (fileobj , segments , n_bytes ):
626
+ def read_segments (fileobj , segments , n_bytes , lock = None ):
626
627
""" Read `n_bytes` byte data implied by `segments` from `fileobj`
627
628
628
629
Parameters
@@ -634,29 +635,42 @@ def read_segments(fileobj, segments, n_bytes):
634
635
absolute file offset in bytes and number of bytes to read
635
636
n_bytes : int
636
637
total number of bytes that will be read
638
+ lock : threading.Lock
639
+ If provided, used to ensure that paired calls to ``seek`` and ``read``
640
+ cannot be interrupted by another thread accessing the same ``fileobj``.
641
+
637
642
638
643
Returns
639
644
-------
640
645
buffer : buffer object
641
646
object implementing buffer protocol, such as byte string or ndarray or
642
647
mmap or ctypes ``c_char_array``
643
648
"""
649
+ # Make a dummy lock-like thing to make the code below a bit nicer
650
+ if lock is None :
651
+ @contextmanager
652
+ def dummy_lock ():
653
+ yield
654
+ lock = dummy_lock
655
+
644
656
if len (segments ) == 0 :
645
657
if n_bytes != 0 :
646
658
raise ValueError ("No segments, but non-zero n_bytes" )
647
659
return b''
648
660
if len (segments ) == 1 :
649
661
offset , length = segments [0 ]
650
- fileobj .seek (offset )
651
- bytes = fileobj .read (length )
662
+ with lock :
663
+ fileobj .seek (offset )
664
+ bytes = fileobj .read (length )
652
665
if len (bytes ) != n_bytes :
653
666
raise ValueError ("Whoops, not enough data in file" )
654
667
return bytes
655
668
# More than one segment
656
669
bytes = mmap (- 1 , n_bytes )
657
670
for offset , length in segments :
658
- fileobj .seek (offset )
659
- bytes .write (fileobj .read (length ))
671
+ with lock :
672
+ fileobj .seek (offset )
673
+ bytes .write (fileobj .read (length ))
660
674
if bytes .tell () != n_bytes :
661
675
raise ValueError ("Oh dear, n_bytes does not look right" )
662
676
return bytes
@@ -700,7 +714,7 @@ def _simple_fileslice(fileobj, sliceobj, shape, dtype, offset=0, order='C',
700
714
701
715
702
716
def fileslice (fileobj , sliceobj , shape , dtype , offset = 0 , order = 'C' ,
703
- heuristic = threshold_heuristic ):
717
+ heuristic = threshold_heuristic , lock = None ):
704
718
""" Slice array in `fileobj` using `sliceobj` slicer and array definitions
705
719
706
720
`fileobj` contains the contiguous binary data for an array ``A`` of shape,
@@ -737,6 +751,9 @@ def fileslice(fileobj, sliceobj, shape, dtype, offset=0, order='C',
737
751
returning one of 'full', 'contiguous', None. See
738
752
:func:`optimize_slicer` and see :func:`threshold_heuristic` for an
739
753
example.
754
+ lock: threading.Lock, optional
755
+ If provided, used to ensure that paired calls to ``seek`` and ``read``
756
+ cannot be interrupted by another thread accessing the same ``fileobj``.
740
757
741
758
Returns
742
759
-------
@@ -750,7 +767,7 @@ def fileslice(fileobj, sliceobj, shape, dtype, offset=0, order='C',
750
767
segments , sliced_shape , post_slicers = calc_slicedefs (
751
768
sliceobj , shape , itemsize , offset , order )
752
769
n_bytes = reduce (operator .mul , sliced_shape , 1 ) * itemsize
753
- bytes = read_segments (fileobj , segments , n_bytes )
770
+ bytes = read_segments (fileobj , segments , n_bytes , lock )
754
771
sliced = np .ndarray (sliced_shape , dtype , buffer = bytes , order = order )
755
772
return sliced [post_slicers ]
756
773
0 commit comments