|
4 | 4 | import math |
5 | 5 | from itertools import product |
6 | 6 |
|
| 7 | +from dask import delayed |
7 | 8 | import dask.array as da |
8 | 9 | import numpy as np |
9 | 10 | from dask.base import tokenize |
10 | 11 | from dask.highlevelgraph import HighLevelGraph |
11 | 12 | import scipy |
12 | 13 | from scipy.ndimage import affine_transform as ndimage_affine_transform |
| 14 | +from scipy.ndimage import map_coordinates as ndimage_map_coordinates |
| 15 | +from scipy.ndimage import labeled_comprehension as\ |
| 16 | + ndimage_labeled_comprehension |
13 | 17 | from scipy.special import sindg, cosdg |
14 | 18 |
|
15 | 19 | from ..dispatch._dispatch_ndinterp import ( |
|
18 | 22 | dispatch_spline_filter, |
19 | 23 | dispatch_spline_filter1d, |
20 | 24 | ) |
| 25 | +from ..dispatch._utils import get_type |
21 | 26 | from ..ndfilters._utils import _get_depth_boundary, _update_wrapper |
22 | 27 |
|
23 | 28 | __all__ = [ |
24 | 29 | "affine_transform", |
| 30 | + "map_coordinates", |
25 | 31 | "rotate", |
26 | 32 | "spline_filter", |
27 | 33 | "spline_filter1d", |
@@ -540,3 +546,236 @@ def spline_filter1d( |
540 | 546 | ) |
541 | 547 |
|
542 | 548 | return result |
| 549 | + |
| 550 | + |
| 551 | +def _map_single_coordinates_array_chunk( |
| 552 | + input, coordinates, order=3, mode='constant', |
| 553 | + cval=0.0, prefilter=False): |
| 554 | + """ |
| 555 | + Central helper function for implementing map_coordinates. |
| 556 | +
|
| 557 | + Receives 'input' as a dask array and computed coordinates. |
| 558 | +
|
| 559 | + Implementation details and steps: |
| 560 | + 1) associate each coordinate in coordinates with the chunk |
| 561 | + it maps to in the input |
| 562 | + 2) for each input chunk that has been associated to at least one |
| 563 | + coordinate, calculate the minimal slice required to map |
| 564 | + all coordinates that are associated to it (note that resulting slice |
| 565 | + coordinates can lie outside of the coordinate's chunk) |
| 566 | + 3) for each previously obtained slice and its associated coordinates, |
| 567 | + define a dask task and apply ndimage.map_coordinates |
| 568 | + 4) outputs of ndimage.map_coordinates are rearranged to match input order |
| 569 | + """ |
| 570 | + |
| 571 | + # STEP 1: Associate each coordinate in coordinates with |
| 572 | + # the chunk it maps to in the input array |
| 573 | + |
| 574 | + # get the input chunks each coordinate maps onto |
| 575 | + coords_input_chunk_locations = coordinates.T // np.array(input.chunksize) |
| 576 | + |
| 577 | + # map out-of-bounds chunk locations to valid input chunks |
| 578 | + coords_input_chunk_locations = np.clip( |
| 579 | + coords_input_chunk_locations, 0, np.array(input.numblocks) - 1 |
| 580 | + ) |
| 581 | + |
| 582 | + # all input chunk locations |
| 583 | + input_chunk_locations = np.array([i for i in np.ndindex(input.numblocks)]) |
| 584 | + |
| 585 | + # linearize input chunk locations |
| 586 | + coords_input_chunk_locations_linear = np.sum( |
| 587 | + coords_input_chunk_locations * np.array( |
| 588 | + [np.prod(input.numblocks[:dim]) |
| 589 | + for dim in range(input.ndim)])[::-1], |
| 590 | + axis=1, dtype=np.int64) |
| 591 | + |
| 592 | + # determine the input chunks that have coords associated and |
| 593 | + # count how many coords map onto each input chunk |
| 594 | + chunk_indices_count = np.bincount(coords_input_chunk_locations_linear, |
| 595 | + minlength=np.prod(input.numblocks)) |
| 596 | + required_input_chunk_indices = np.where(chunk_indices_count > 0)[0] |
| 597 | + required_input_chunks = input_chunk_locations[required_input_chunk_indices] |
| 598 | + coord_rc_count = chunk_indices_count[required_input_chunk_indices] |
| 599 | + |
| 600 | + # inverse mapping: input chunks to coordinates |
| 601 | + required_input_chunk_coords_indices = \ |
| 602 | + [np.where(coords_input_chunk_locations_linear == irc)[0] |
| 603 | + for irc in required_input_chunk_indices] |
| 604 | + |
| 605 | + # STEP 2: for each input chunk that has been associated to at least |
| 606 | + # one coordinate, calculate the minimal slice required to map all |
| 607 | + # coordinates that are associated to it (note that resulting slice |
| 608 | + # coordinates can lie outside of the coordinate's chunk) |
| 609 | + |
| 610 | + # determine the slices of the input array that are required for |
| 611 | + # mapping all coordinates associated to a given input chunk. |
| 612 | + # Note that this slice can be larger than the given chunk when coords |
| 613 | + # lie at chunk borders. |
| 614 | + # (probably there's a more efficient way to do this) |
| 615 | + input_slices_lower = np.array([np.clip( |
| 616 | + ndimage_labeled_comprehension( |
| 617 | + np.floor(coordinates[dim] - order // 2), |
| 618 | + coords_input_chunk_locations_linear, |
| 619 | + required_input_chunk_indices, |
| 620 | + np.min, np.int64, 0), |
| 621 | + 0, input.shape[dim] - 1) |
| 622 | + for dim in range(input.ndim)]) |
| 623 | + |
| 624 | + input_slices_upper = np.array([np.clip( |
| 625 | + ndimage_labeled_comprehension( |
| 626 | + np.ceil(coordinates[dim] + order // 2) + 1, |
| 627 | + coords_input_chunk_locations_linear, |
| 628 | + required_input_chunk_indices, |
| 629 | + np.max, np.int64, 0), |
| 630 | + 0, input.shape[dim]) |
| 631 | + for dim in range(input.ndim)]) |
| 632 | + |
| 633 | + input_slices = np.array([input_slices_lower, input_slices_upper])\ |
| 634 | + .swapaxes(1, 2) |
| 635 | + |
| 636 | + # STEP 3: For each previously obtained slice and its associated |
| 637 | + # coordinates, define a dask task and apply ndimage.map_coordinates |
| 638 | + |
| 639 | + # prepare building dask graph |
| 640 | + # define one task per associated input chunk |
| 641 | + name = "map_coordinates_chunk-%s" % tokenize( |
| 642 | + input, |
| 643 | + coordinates, |
| 644 | + order, |
| 645 | + mode, |
| 646 | + cval, |
| 647 | + prefilter |
| 648 | + ) |
| 649 | + |
| 650 | + keys = [(name, i) for i in range(len(required_input_chunks))] |
| 651 | + |
| 652 | + # pair map_coordinates calls with input slices and mapped coordinates |
| 653 | + values = [] |
| 654 | + for irc in range(len(required_input_chunks)): |
| 655 | + |
| 656 | + ric_slice = [slice( |
| 657 | + input_slices[0][irc][dim], |
| 658 | + input_slices[1][irc][dim]) |
| 659 | + for dim in range(input.ndim)] |
| 660 | + ric_offset = input_slices[0][irc] |
| 661 | + |
| 662 | + values.append(( |
| 663 | + ndimage_map_coordinates, |
| 664 | + input[tuple(ric_slice)], |
| 665 | + coordinates[:, required_input_chunk_coords_indices[irc]] |
| 666 | + - ric_offset[:, None], |
| 667 | + None, |
| 668 | + order, |
| 669 | + mode, |
| 670 | + cval, |
| 671 | + prefilter |
| 672 | + )) |
| 673 | + |
| 674 | + # build dask graph |
| 675 | + dsk = dict(zip(keys, values)) |
| 676 | + ar = da.Array(dsk, name, tuple([list(coord_rc_count)]), input.dtype) |
| 677 | + |
| 678 | + # STEP 4: rearrange outputs of ndimage.map_coordinates |
| 679 | + # to match input order |
| 680 | + orig_order = np.argsort( |
| 681 | + [ic for ric_ci in required_input_chunk_coords_indices |
| 682 | + for ic in ric_ci]) |
| 683 | + |
| 684 | + # compute result and reorder |
| 685 | + # (ordering first would probably unnecessarily inflate the task graph) |
| 686 | + return ar.compute()[orig_order] |
| 687 | + |
| 688 | + |
| 689 | +def map_coordinates(input, coordinates, order=3, |
| 690 | + mode='constant', cval=0.0, prefilter=False): |
| 691 | + """ |
| 692 | + Wraps ndimage.map_coordinates. |
| 693 | +
|
| 694 | + Both the input and coordinate arrays can be dask arrays. |
| 695 | + GPU arrays are not supported. |
| 696 | +
|
| 697 | + For each chunk in the coordinates array, the coordinates are computed |
| 698 | + and mapped onto the required slices of the input array. One task is |
| 699 | + is defined for each input array chunk that has been associated to at |
| 700 | + least one coordinate. The outputs of the tasks are then rearranged to |
| 701 | + match the input order. For more details see the docstring of |
| 702 | + '_map_single_coordinates_array_chunk'. |
| 703 | +
|
| 704 | + Using this function together with schedulers that support |
| 705 | + parallelism (threads, processes, distributed) makes sense in the |
| 706 | + case of either a large input array or a large coordinates array. |
| 707 | + When both arrays are large, it is recommended to use the |
| 708 | + single-threaded scheduler. A scheduler can be specified using e.g. |
| 709 | + `with dask.config.set(scheduler='threads'): ...`. |
| 710 | +
|
| 711 | + input : array_like |
| 712 | + The input array. |
| 713 | + coordinates : array_like |
| 714 | + The coordinates at which to sample the input. |
| 715 | + order : int, optional |
| 716 | + The order of the spline interpolation, default is 3. The order has to |
| 717 | + be in the range 0-5. |
| 718 | + mode : boundary behavior mode, optional |
| 719 | + cval : float, optional |
| 720 | + Value to fill past edges of input if mode is 'constant'. Default is 0.0 |
| 721 | + prefilter : bool, optional |
| 722 | + If True, prefilter the input before interpolation. Default is False. |
| 723 | + Warning: prefilter is True by default in |
| 724 | + `scipy.ndimage.map_coordinates`. Prefiltering here is performed on a |
| 725 | + chunk-by-chunk basis, which may lead to different results than |
| 726 | + `scipy.ndimage.map_coordinates` in case of chunked input arrays |
| 727 | + and order > 1. |
| 728 | + Note: prefilter is not necessary when: |
| 729 | + - You are using nearest neighbour interpolation, by setting order=0 |
| 730 | + - You are using linear interpolation, by setting order=1, or |
| 731 | + - You have already prefiltered the input array, |
| 732 | + using the spline_filter or spline_filter1d functions. |
| 733 | +
|
| 734 | + Comments: |
| 735 | + - in case of a small coordinate array, it might make sense to rechunk |
| 736 | + it into a single chunk |
| 737 | + - note the different default for `prefilter` compared to |
| 738 | + `scipy.ndimage.map_coordinates`, which is True by default. |
| 739 | + """ |
| 740 | + if "cupy" in str(get_type(input)) or "cupy" in str(get_type(coordinates)): |
| 741 | + raise NotImplementedError( |
| 742 | + "GPU cupy arrays are not supported by " |
| 743 | + "dask_image.ndinterp.map_overlap") |
| 744 | + |
| 745 | + # if coordinate array is not a dask array, convert it into one |
| 746 | + if type(coordinates) is not da.Array: |
| 747 | + coordinates = da.from_array(coordinates, chunks=coordinates.shape) |
| 748 | + else: |
| 749 | + # make sure indices are not split across chunks, i.e. that there's |
| 750 | + # no chunking along the first dimension |
| 751 | + if len(coordinates.chunks[0]) > 1: |
| 752 | + coordinates = da.rechunk( |
| 753 | + coordinates, |
| 754 | + (-1,) + coordinates.chunks[1:]) |
| 755 | + |
| 756 | + # if the input array is not a dask array, convert it into one |
| 757 | + if type(input) is not da.Array: |
| 758 | + input = da.from_array(input, chunks=input.shape) |
| 759 | + |
| 760 | + # Map each chunk of the coordinates array onto the entire input array. |
| 761 | + # 'input' is passed to `_map_single_coordinates_array_chunk` using a bit of |
| 762 | + # a dirty trick: it is split into its components and passed as a delayed |
| 763 | + # object, which reconstructs the original array when the task is |
| 764 | + # executed. Therefore two `compute` calls are required to obtain the |
| 765 | + # final result, one of which is peformed by |
| 766 | + # `_map_single_coordinates_array_chunk` |
| 767 | + # Discussion https://dask.discourse.group/t/passing-dask-objects-to-delayed-computations-without-triggering-compute/1441 # noqa: E501 |
| 768 | + output = da.map_blocks( |
| 769 | + _map_single_coordinates_array_chunk, |
| 770 | + delayed(da.Array)(input.dask, input.name, input.chunks, input.dtype), |
| 771 | + coordinates, |
| 772 | + order=order, |
| 773 | + mode=mode, |
| 774 | + cval=cval, |
| 775 | + prefilter=prefilter, |
| 776 | + dtype=input.dtype, |
| 777 | + chunks=coordinates.chunks[1:], |
| 778 | + drop_axis=0, |
| 779 | + ) |
| 780 | + |
| 781 | + return output |
0 commit comments