diff --git a/ci/install-travis.sh b/ci/install-travis.sh index 1921ade..145a7d0 100755 --- a/ci/install-travis.sh +++ b/ci/install-travis.sh @@ -29,6 +29,7 @@ source activate test-environment conda install \ coverage \ cython \ + dask \ flake8 \ hypothesis \ numpy \ diff --git a/cyberpandas/__init__.py b/cyberpandas/__init__.py index 710b1b2..97bdcac 100644 --- a/cyberpandas/__init__.py +++ b/cyberpandas/__init__.py @@ -8,6 +8,7 @@ from .ip_methods import ip_range from .parser import to_ipaddress from .mac_array import MACType, MACArray +from ._compat import HAS_DASK from pkg_resources import get_distribution, DistributionNotFound try: @@ -30,3 +31,6 @@ 'ip_range', 'to_ipaddress', ] + +if HAS_DASK: + from . import dask_ip_array # noqa diff --git a/cyberpandas/_accessor.py b/cyberpandas/_accessor.py index 646267d..0961ce8 100644 --- a/cyberpandas/_accessor.py +++ b/cyberpandas/_accessor.py @@ -1,8 +1,27 @@ import pandas as pd +from ._compat import is_dask_collection + def delegated_method(method, index, name, *args, **kwargs): - return pd.Series(method(*args, **kwargs), index, name=name) + values = method(*args, **kwargs) + return wrap_result(values, index, name) + + +def wrap_result(values, index, name): + from cyberpandas.ip_array import IPType + + if is_dask_collection(values): + import dask.array as da + import dask.dataframe as dd + + if isinstance(values.dtype, IPType): + return values.to_dask_series(index=index, name=name) + + elif isinstance(values, da.Array): + return dd.from_dask_array(values, columns=name, index=index) + + return pd.Series(values, index=index, name=name) class Delegated: @@ -16,7 +35,10 @@ def __get__(self, obj, type=None): index = object.__getattribute__(obj, '_index') name = object.__getattribute__(obj, '_name') result = self._get_result(obj) - return pd.Series(result, index, name=name) + return wrap_result(result, index, name) + + def _get_result(self, obj, type=None): + raise NotImplementedError class DelegatedProperty(Delegated): @@ -25,8 +47,7 @@ def _get_result(self, obj, type=None): class DelegatedMethod(Delegated): - def __get__(self, obj, type=None): - index = object.__getattribute__(obj, '_index') - name = object.__getattribute__(obj, '_name') + def _get_result(self, obj, type=None): method = getattr(object.__getattribute__(obj, '_data'), self.name) - return delegated_method(method, index, name) + values = method() + return values diff --git a/cyberpandas/_compat.py b/cyberpandas/_compat.py new file mode 100644 index 0000000..34e878a --- /dev/null +++ b/cyberpandas/_compat.py @@ -0,0 +1,47 @@ +from functools import singledispatch +from collections import abc +import numpy as np + +try: + import dask.array + import dask.dataframe +except ImportError: + HAS_DASK = False +else: + HAS_DASK = True + + +@singledispatch +def asarray(values, *args, **kwargs): + return np.asarray(values, *args, **kwargs) + + +@singledispatch +def atleast_1d(values): + return np.atleast_1d(values) + + +def is_dask_collection(x): + if HAS_DASK: + import dask + return dask.is_dask_collection(x) + return False + + +if HAS_DASK: + @asarray.register(dask.array.Array) + def _(values, *args, **kwargs): + return dask.array.asarray(values, *args, **kwargs) + + @atleast_1d.register(dask.array.Array) + def _(values): + return dask.array.atleast_1d(values) + + +def is_array_like(obj): + attrs = set(dir(obj)) + return bool(attrs & {'__array__', 'ndim', 'dtype'}) + + +def is_list_like(obj): + return isinstance(obj, abc.Sized) diff --git a/cyberpandas/dask_ip_array.py b/cyberpandas/dask_ip_array.py new file mode 100644 index 0000000..a99bc1f --- /dev/null +++ b/cyberpandas/dask_ip_array.py @@ -0,0 +1,36 @@ +import ipaddress + +import numpy as np +import dask.array as da +import dask.dataframe as dd +from dask.dataframe.extensions import ( + make_scalar, make_array_nonempty, register_series_accessor) +from .ip_array import IPAccessor, IPType, IPArray + + +@make_array_nonempty.register(IPType) +def _(dtype): + return IPArray._from_sequence([1, 2], dtype=dtype) + + +@make_scalar.register(ipaddress.IPv4Address) +@make_scalar.register(ipaddress.IPv6Address) +def _(x): + return ipaddress.ip_address(x) + + +@register_series_accessor("ip") +class DaskIPAccessor(IPAccessor): + @staticmethod + def _extract_array(obj): + # TODO: remove delayed trip + objs = obj.to_delayed() + dtype = obj.dtype._record_type + arrays = [da.from_delayed(x.array.data, shape=(np.nan,), dtype=dtype) + for x in objs] + arr = da.concatenate(arrays) + return IPArray(arr) + + @property + def _constructor(self): + return dd.Series diff --git a/cyberpandas/ip_array.py b/cyberpandas/ip_array.py index e2cc67f..c08ee2c 100644 --- a/cyberpandas/ip_array.py +++ b/cyberpandas/ip_array.py @@ -13,6 +13,7 @@ from .base import NumPyBackedExtensionArrayMixin from .common import _U8_MAX, _IPv4_MAX from .parser import _to_ipaddress_pyint, _as_ip_object +from . import _compat # ----------------------------------------------------------------------------- # Extension Type @@ -215,7 +216,10 @@ def take(self, indices, allow_fill=False, fill_value=None): # ------------------------------------------------------------------------- def __repr__(self): - formatted = self._format_values() + if isinstance(self.data, np.ndarray): + formatted = self._format_values() + else: + formatted = self.data return "IPArray({!r})".format(formatted) def _format_values(self): @@ -320,6 +324,47 @@ def to_bytes(self): """ return self.data.tobytes() + def to_delayed(self): + """ + Convert an IPArray to a list of Delayed objects. + + This only works for IPArrays backed by a Dask Array. + Returns + ------- + List[dask.delayed.Delayed] + """ + from dask import delayed + cls = delayed(type(self)) + + return [cls(x) for x in self.data.to_delayed()] + + def to_dask_series(self, index=None, name=None): + """ + Convert to a dask Series + + index : dask.dataframe.Index, optional + name : str, optional + Name to use for the resulting dask Series. + + returns + ------- + dask.dataframe.Series + """ + import dask + import dask.dataframe as dd + + blocks = self.to_delayed() + if index is not None: + args = zip(blocks, index.to_delayed()) + divisions = index.divisions + else: + args = zip(blocks) + divisions = None + blocks = [dask.delayed(pd.Series)(*b) for b in args] + result = dd.from_delayed(blocks, meta=(name, self.dtype), + divisions=divisions) + return result + def astype(self, dtype, copy=True): if isinstance(dtype, IPType): if copy: @@ -658,6 +703,48 @@ def mask(self, mask): masked = np.bitwise_and(a, b).ravel().view(self.dtype._record_type) return type(self)(masked) + def compute(self, **kwargs): + import dask + return dask.compute(self, **kwargs) + + def persist(self, *args, **kwargs): + import dask + return dask.persist(self, *args, **kwargs) + + if _compat.HAS_DASK: + import dask.threaded + import dask.context + + def __dask_graph__(self): + return self.data.__dask_graph__() + + def __dask_keys__(self): + return self.data.__dask_keys__() + + def __dask_layers__(self): + return self.data.__dask_layers__() + + @property + def __dask_optimize__(self): + return self.data.__dask_optimize__ + + @property + def __dask_scheduler__(self): + return self.data.__dask_scheduler__ + + def __dask_postcompute__(self): + func, args = self.data.__dask_postcompute__() + return self._dask_finalize, (func, args) + + def __dask_postpersist__(self): + func, args = self.data.__dask_postpersist__() + return self._dask_finalize, (func, args) + + @staticmethod + def _dask_finalize(results, func, args): + ds = func(results, *args) + return IPArray(ds) + # ----------------------------------------------------------------------------- # Accessor @@ -683,10 +770,18 @@ class IPAccessor: def __init__(self, obj): self._validate(obj) - self._data = obj.values + self._data = self._extract_array(obj) self._index = obj.index self._name = obj.name + @property + def _constructor(self): + return pd.Series + + @staticmethod + def _extract_array(obj): + return obj.array + @staticmethod def _validate(obj): if not is_ipaddress_type(obj): diff --git a/cyberpandas/mac_array.py b/cyberpandas/mac_array.py index 183b4a5..40e522c 100644 --- a/cyberpandas/mac_array.py +++ b/cyberpandas/mac_array.py @@ -1,4 +1,4 @@ -from collections import Iterable +from collections.abc import Iterable import numpy as np import six diff --git a/cyberpandas/parser.py b/cyberpandas/parser.py index 215381b..9600dc6 100644 --- a/cyberpandas/parser.py +++ b/cyberpandas/parser.py @@ -1,9 +1,9 @@ import ipaddress import numpy as np -from pandas.api.types import is_list_like from ._utils import pack, unpack +from . import _compat def to_ipaddress(values): @@ -36,7 +36,7 @@ def to_ipaddress(values): """ from . import IPArray - if not is_list_like(values): + if not _compat.is_list_like(values): values = [values] return IPArray(_to_ip_array(values)) @@ -47,19 +47,20 @@ def _to_ip_array(values): if isinstance(values, IPArray): return values.data + array_like = _compat.is_array_like(values) - if (isinstance(values, np.ndarray) and - values.ndim == 1 and + if (array_like and values.ndim == 1 and + isinstance(values.dtype, np.dtype) and np.issubdtype(values.dtype, np.integer)): # We assume we're given the low bits here. values = values.astype("u8") - values = np.asarray(values, dtype=IPType._record_type) + values = _compat.asarray(values).astype(dtype=IPType._record_type) values['hi'] = 0 - elif not (isinstance(values, np.ndarray) and - values.dtype == IPType._record_type): + elif not (array_like and values.dtype == IPType._record_type): values = _to_int_pairs(values) - return np.atleast_1d(np.asarray(values, dtype=IPType._record_type)) + return _compat.atleast_1d(_compat.asarray(values, + dtype=IPType._record_type)) def _to_int_pairs(values): diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 7d0eedf..51e52a0 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -133,3 +133,37 @@ IP Accessor ser.ip.isna df['addresses'].ip.is_ipv6 + + +Dask Integration +---------------- + +:class:`IPArray` also works well with `Dask `_. +In this case the ``.data`` attribute backing an :class:`IPArray` +will be a Dask Array, rather than a NumPy ndarray. + +.. ipython:: python + + import dask.array as da + + arr = cyberpandas.ip_range(10) + arr.data + + dask_data = da.from_array(arr.data, chunks=2) + dask_data + + dask_arr = cyberpandas.IPArray(dask_data) + dask_arr + +These dask-backed IPArrays may be stored in a dask Series or DataFrame + +.. ipython:: python + + ds = dask_arr.to_dask_series() + ds + +An ``.ip`` accessor is provided for dask Series + +.. ipython:: python + + ds.ip.isna.compute() diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 0000000..555af21 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,52 @@ +import pytest +import cyberpandas +import pandas as pd +import pandas.util.testing as tm + +dd = pytest.importorskip('dask.dataframe') +dask = pytest.importorskip("dask") +da = pytest.importorskip("dask.array") + + +def test_constructor(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + b = da.from_array(a, chunks=2) + + iparr = cyberpandas.IPArray(b) + assert isinstance(iparr.data, da.Array) + da.utils.assert_eq(iparr.data, b) + da.utils.assert_eq(iparr.data, a) + + result, = dask.compute(iparr) + assert isinstance(result, cyberpandas.IPArray) + + +def test_basics(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + b = da.from_array(a, chunks=2) + + c = cyberpandas.IPArray(a) + d = cyberpandas.IPArray(b) + + da.utils.assert_eq(c.isna(), d.isna()) + da.utils.assert_eq(c.is_ipv4, d.is_ipv4) + + meta = dd.utils.meta_nonempty(pd.Series(c)) + expected = pd.Series(cyberpandas.IPArray(['0.0.0.1', '0.0.0.2'])) + tm.assert_series_equal(meta, expected) + + +def test_dask_series(): + a = cyberpandas.IPArray(cyberpandas.to_ipaddress([1, 2, 3, 4]).data) + b = cyberpandas.IPArray(da.from_array(a.data, chunks=2)).to_dask_series() + + b.loc[0] + + +def test_accessor(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + a = pd.Series(cyberpandas.IPArray(a)) + b = dd.from_pandas(a, 2) + + dd.utils.assert_eq(a.ip.is_ipv4, b.ip.is_ipv4) + dd.utils.assert_eq(a.ip.netmask(), b.ip.netmask())