@@ -1549,6 +1549,51 @@ def _set_selection(self, indexer, value, fields=None):
1549
1549
# put data
1550
1550
self ._chunk_setitem (chunk_coords , chunk_selection , chunk_value , fields = fields )
1551
1551
1552
+ def _process_chunk (self , out , cdata , chunk_selection , drop_axes ,
1553
+ out_is_ndarray ):
1554
+ if (out_is_ndarray and
1555
+ not fields and
1556
+ is_contiguous_selection (out_selection ) and
1557
+ is_total_slice (chunk_selection , self ._chunks ) and
1558
+ not self ._filters and
1559
+ self ._dtype != object ):
1560
+
1561
+ dest = out [out_selection ]
1562
+ write_direct = (
1563
+ dest .flags .writeable and
1564
+ (
1565
+ (self ._order == 'C' and dest .flags .c_contiguous ) or
1566
+ (self ._order == 'F' and dest .flags .f_contiguous )
1567
+ )
1568
+ )
1569
+
1570
+ if write_direct :
1571
+
1572
+ # optimization: we want the whole chunk, and the destination is
1573
+ # contiguous, so we can decompress directly from the chunk
1574
+ # into the destination array
1575
+
1576
+ if self ._compressor :
1577
+ self ._compressor .decode (cdata , dest )
1578
+ else :
1579
+ chunk = ensure_ndarray (cdata ).view (self ._dtype )
1580
+ chunk = chunk .reshape (self ._chunks , order = self ._order )
1581
+ np .copyto (dest , chunk )
1582
+ return
1583
+
1584
+ # decode chunk
1585
+ chunk = self ._decode_chunk (cdata )
1586
+
1587
+ # select data from chunk
1588
+ if fields :
1589
+ chunk = chunk [fields ]
1590
+ tmp = chunk [chunk_selection ]
1591
+ if drop_axes :
1592
+ tmp = np .squeeze (tmp , axis = drop_axes )
1593
+
1594
+ # store selected data in output
1595
+ out [out_selection ] = tmp
1596
+
1552
1597
def _chunk_getitem (self , chunk_coords , chunk_selection , out , out_selection ,
1553
1598
drop_axes = None , fields = None ):
1554
1599
"""Obtain part or whole of a chunk.
@@ -1572,12 +1617,6 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
1572
1617
1573
1618
assert len (chunk_coords ) == len (self ._cdata_shape )
1574
1619
1575
- out_is_ndarray = True
1576
- try :
1577
- out = ensure_ndarray (out )
1578
- except TypeError :
1579
- out_is_ndarray = False
1580
-
1581
1620
# obtain key for chunk
1582
1621
ckey = self ._chunk_key (chunk_coords )
1583
1622
@@ -1595,48 +1634,31 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
1595
1634
out [out_selection ] = fill_value
1596
1635
1597
1636
else :
1637
+ self ._process_chunk (out , cdata , chunk_selection , drop_axes ,
1638
+ out_is_ndarray )
1598
1639
1599
- if (out_is_ndarray and
1600
- not fields and
1601
- is_contiguous_selection (out_selection ) and
1602
- is_total_slice (chunk_selection , self ._chunks ) and
1603
- not self ._filters and
1604
- self ._dtype != object ):
1605
-
1606
- dest = out [out_selection ]
1607
- write_direct = (
1608
- dest .flags .writeable and (
1609
- (self ._order == 'C' and dest .flags .c_contiguous ) or
1610
- (self ._order == 'F' and dest .flags .f_contiguous )
1611
- )
1612
- )
1613
-
1614
- if write_direct :
1615
-
1616
- # optimization: we want the whole chunk, and the destination is
1617
- # contiguous, so we can decompress directly from the chunk
1618
- # into the destination array
1640
+ def _chunk_getitems (self , lchunk_coords , lchunk_selection , out , lout_selection ,
1641
+ drop_axes = None , fields = None ):
1642
+ out_is_ndarray = True
1643
+ try :
1644
+ out = ensure_ndarray (out )
1645
+ except TypeError :
1646
+ out_is_ndarray = False
1619
1647
1620
- if self ._compressor :
1621
- self ._compressor .decode (cdata , dest )
1648
+ ckeys = [self ._chunk_key (ch ) for ch in lchunk_coords ]
1649
+ cdatas = self .chunk_store .getitems (ckeys )
1650
+ for ckey , chunk_select , out_select in zip (ckeys , lchunk_selection , lout_selection ):
1651
+ if ckey in cdatas :
1652
+ self ._process_chunk (out , cdatas [ckey ], chunk_select , drop_axes ,
1653
+ out_is_ndarray )
1654
+ else :
1655
+ # check exception type
1656
+ if self ._fill_value is not None :
1657
+ if fields :
1658
+ fill_value = self ._fill_value [fields ]
1622
1659
else :
1623
- chunk = ensure_ndarray (cdata ).view (self ._dtype )
1624
- chunk = chunk .reshape (self ._chunks , order = self ._order )
1625
- np .copyto (dest , chunk )
1626
- return
1627
-
1628
- # decode chunk
1629
- chunk = self ._decode_chunk (cdata )
1630
-
1631
- # select data from chunk
1632
- if fields :
1633
- chunk = chunk [fields ]
1634
- tmp = chunk [chunk_selection ]
1635
- if drop_axes :
1636
- tmp = np .squeeze (tmp , axis = drop_axes )
1637
-
1638
- # store selected data in output
1639
- out [out_selection ] = tmp
1660
+ fill_value = self ._fill_value
1661
+ out [out_select ] = fill_value
1640
1662
1641
1663
def _chunk_setitem (self , chunk_coords , chunk_selection , value , fields = None ):
1642
1664
"""Replace part or whole of a chunk.
0 commit comments