Skip to content

Commit 3439e15

Browse files
committed
add dask scheduler option
1 parent 5107b8a commit 3439e15

File tree

1 file changed

+76
-25
lines changed

1 file changed

+76
-25
lines changed

climada/hazard/centroids/centr.py

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,14 @@ def append(self, centr):
350350
self.__init__()
351351
self.set_lat_lon(lat, lon, crs)
352352

353-
def get_closest_point(self, x_lon, y_lat):
353+
def get_closest_point(self, x_lon, y_lat, scheduler=None):
354354
""" Returns closest centroid and its index to a given point.
355355
356356
Parameters:
357357
x_lon (float): x coord (lon)
358358
y_lat (float): y coord (lat)
359+
scheduler (str): used for dask map_partitions. “threads”,
360+
“synchronous” or “processes”
359361
360362
Returns:
361363
x_close (float), y_close (float), idx_close (int)
@@ -367,13 +369,18 @@ def get_closest_point(self, x_lon, y_lat):
367369
i_lon = np.floor((x_lon - self.meta['transform'][2])/abs(self.meta['transform'][0]))
368370
close_idx = int(i_lat*self.meta['width'] + i_lon)
369371
else:
370-
self.set_geometry_points()
372+
self.set_geometry_points(scheduler)
371373
close_idx = self.geometry.distance(Point(x_lon, y_lat)).values.argmin()
372374
return self.lon[close_idx], self.lat[close_idx], close_idx
373375

374-
def set_region_id(self):
375-
""" Set region_id attribute for every pixel or point """
376-
lon_ne, lat_ne = self._ne_crs_xy()
376+
def set_region_id(self, scheduler=None):
377+
""" Set region_id attribute for every pixel or point
378+
379+
Parameter:
380+
scheduler (str): used for dask map_partitions. “threads”,
381+
“synchronous” or “processes”
382+
"""
383+
lon_ne, lat_ne = self._ne_crs_xy(scheduler)
377384
LOGGER.debug('Setting region_id %s points.', str(self.lat.size))
378385
countries = get_country_geometries(extent=(lon_ne.min(), lon_ne.max(),
379386
lat_ne.min(), lat_ne.max()))
@@ -382,8 +389,13 @@ def set_region_id(self):
382389
select = contains(geom[0], lon_ne, lat_ne)
383390
self.region_id[select] = int(geom[1])
384391

385-
def set_area_pixel(self):
386-
""" Set area_pixel attribute for every pixel or point. area in m*m """
392+
def set_area_pixel(self, scheduler=None):
393+
""" Set area_pixel attribute for every pixel or point. area in m*m
394+
395+
Parameter:
396+
scheduler (str): used for dask map_partitions. “threads”,
397+
“synchronous” or “processes”
398+
"""
387399
if self.meta:
388400
if hasattr(self.meta['crs'], 'linear_units') and \
389401
str.lower(self.meta['crs'].linear_units) in ['m', 'metre', 'meter']:
@@ -397,7 +409,7 @@ def set_area_pixel(self):
397409
res = self.meta['transform'].a
398410
else:
399411
res = min(get_resolution(self.lat, self.lon))
400-
self.set_geometry_points()
412+
self.set_geometry_points(scheduler)
401413
LOGGER.debug('Setting area_pixel %s points.', str(self.lat.size))
402414
xy_pixels = self.geometry.buffer(res/2).envelope
403415
if ('units' in self.geometry.crs and \
@@ -441,26 +453,40 @@ def set_area_approx(self):
441453
LOGGER.error('Pixel area of points can not be computed.')
442454
raise ValueError
443455

444-
def set_dist_coast(self):
456+
def set_dist_coast(self, scheduler=None):
445457
""" Set dist_coast attribute for every pixel or point. Distan to
446-
coast is computed in meters """
447-
lon, lat = self._ne_crs_xy()
458+
coast is computed in meters
459+
460+
Parameter:
461+
scheduler (str): used for dask map_partitions. “threads”,
462+
“synchronous” or “processes”
463+
"""
464+
lon, lat = self._ne_crs_xy(scheduler)
448465
LOGGER.debug('Setting dist_coast %s points.', str(self.lat.size))
449466
self.dist_coast = dist_to_coast(lat, lon)
450467

451-
def set_on_land(self):
452-
""" Set on_land attribute for every pixel or point """
453-
lon, lat = self._ne_crs_xy()
468+
def set_on_land(self, scheduler=None):
469+
""" Set on_land attribute for every pixel or point
470+
471+
Parameter:
472+
scheduler (str): used for dask map_partitions. “threads”,
473+
“synchronous” or “processes”
474+
"""
475+
lon, lat = self._ne_crs_xy(scheduler)
454476
LOGGER.debug('Setting on_land %s points.', str(self.lat.size))
455477
self.on_land = coord_on_land(lat, lon)
456478

457-
def remove_duplicate_points(self):
479+
def remove_duplicate_points(self, scheduler=None):
458480
""" Return Centroids with removed duplicated points
459481
482+
Parameter:
483+
scheduler (str): used for dask map_partitions. “threads”,
484+
“synchronous” or “processes”
485+
460486
Returns:
461487
Centroids
462488
"""
463-
self.set_geometry_points()
489+
self.set_geometry_points(scheduler)
464490
geom_wkb = self.geometry.apply(lambda geom: geom.wkb)
465491
sel_cen = geom_wkb.drop_duplicates().index
466492
return self.select(sel_cen=sel_cen)
@@ -532,8 +558,12 @@ def plot(self, **kwargs):
532558
axis.scatter(self.lon, self.lat, **kwargs)
533559
return fig, axis
534560

535-
def get_pixels_polygons(self):
536-
""" Compute a GeoSeries with a polygon for every pixel
561+
def calc_pixels_polygons(self, scheduler=None):
562+
""" Return a GeoSeries with a polygon for every pixel
563+
564+
Parameter:
565+
scheduler (str): used for dask map_partitions. “threads”,
566+
“synchronous” or “processes”
537567
538568
Returns:
539569
GeoSeries
@@ -544,7 +574,7 @@ def get_pixels_polygons(self):
544574
abs(self.meta['transform'].e)) > 1.0e-5:
545575
LOGGER.error('Area can not be computed for not squared pixels.')
546576
raise ValueError
547-
self.set_geometry_points()
577+
self.set_geometry_points(scheduler)
548578
return self.geometry.buffer(self.meta['transform'].a/2).envelope
549579

550580
def empty_geometry_points(self):
@@ -592,25 +622,46 @@ def coord(self):
592622
""" Get [lat, lon] array. Might take some time. """
593623
return np.array([self.lat, self.lon]).transpose()
594624

595-
def set_geometry_points(self):
596-
""" Set geometry points """
625+
def set_geometry_points(self, scheduler=None):
626+
""" Set geometry attribute of GeoSeries with Points from latitude and
627+
longitude attributes if geometry not present.
628+
629+
Parameter:
630+
scheduler (str): used for dask map_partitions. “threads”,
631+
“synchronous” or “processes”
632+
"""
633+
LOGGER.info('Setting geometry points.')
634+
def apply_point(df_exp):
635+
return df_exp.apply((lambda row: Point(row.longitude, row.latitude)), axis=1)
597636
if not self.geometry.size:
598637
if not self.lat.size or not self.lon.size:
599638
self.set_meta_to_lat_lon()
600-
self.geometry = GeoSeries(list(zip(self.lon, self.lat)), crs=self.geometry.crs)
601-
self.geometry = self.geometry.apply(Point)
639+
if not scheduler:
640+
self.geometry = GeoSeries(list(zip(self.lon, self.lat)),
641+
crs=self.geometry.crs)
642+
self.geometry = self.geometry.apply(Point)
643+
else:
644+
import dask.dataframe as dd
645+
from multiprocessing import cpu_count
646+
ddata = dd.from_pandas(self, npartitions=cpu_count())
647+
self.geometry = ddata.map_partitions(apply_point, meta=Point).\
648+
compute(scheduler=scheduler)
602649

603-
def _ne_crs_xy(self):
650+
def _ne_crs_xy(self, scheduler=None):
604651
""" Return x (lon) and y (lat) in the CRS of Natural Earth
605652
653+
Parameter:
654+
scheduler (str): used for dask map_partitions. “threads”,
655+
“synchronous” or “processes”
656+
606657
Returns:
607658
np.array, np.array
608659
"""
609660
if not self.lat.size or not self.lon.size:
610661
self.set_meta_to_lat_lon()
611662
if equal_crs(self.geometry.crs, NE_CRS):
612663
return self.lon, self.lat
613-
self.set_geometry_points()
664+
self.set_geometry_points(scheduler)
614665
xy_points = self.geometry.to_crs(NE_CRS)
615666
return xy_points.geometry[:].x.values, xy_points.geometry[:].y.values
616667

0 commit comments

Comments
 (0)