Skip to content

Commit 0be6643

Browse files
committed
Add saving lazyudf
1 parent 69816ab commit 0be6643

File tree

2 files changed

+92
-60
lines changed

2 files changed

+92
-60
lines changed

src/blosc2/lazyexpr.py

Lines changed: 70 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3029,50 +3029,6 @@ def info_items(self):
30293029
items += [("dtype", self.dtype)]
30303030
return items
30313031

3032-
def _save(self, urlpath=None, **kwargs):
3033-
if urlpath is None:
3034-
raise ValueError("To save a LazyArray you must provide an urlpath")
3035-
3036-
# Validate expression
3037-
validate_expr(self.expression)
3038-
3039-
meta = kwargs.get("meta", {})
3040-
meta["LazyArray"] = LazyArrayEnum.Expr.value
3041-
kwargs["urlpath"] = urlpath
3042-
kwargs["meta"] = meta
3043-
kwargs["mode"] = "w" # always overwrite the file in urlpath
3044-
3045-
# Create an empty array; useful for providing the shape and dtype of the outcome
3046-
array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs)
3047-
3048-
# Save the expression and operands in the metadata
3049-
operands = {}
3050-
for key, value in self.operands.items():
3051-
if isinstance(value, blosc2.C2Array):
3052-
operands[key] = {
3053-
"path": str(value.path),
3054-
"urlbase": value.urlbase,
3055-
}
3056-
continue
3057-
if key in {"numpy", "np"}:
3058-
# Provide access to cast funcs like int8 et al.
3059-
continue
3060-
if isinstance(value, blosc2.Proxy):
3061-
# Take the required info from the Proxy._cache container
3062-
value = value._cache
3063-
if not hasattr(value, "schunk"):
3064-
raise ValueError(
3065-
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
3066-
)
3067-
if value.schunk.urlpath is None:
3068-
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
3069-
operands[key] = value.schunk.urlpath
3070-
array.schunk.vlmeta["_LazyArray"] = {
3071-
"expression": self.expression,
3072-
"UDF": None,
3073-
"operands": operands,
3074-
}
3075-
30763032
def save(self, urlpath=None, **kwargs):
30773033
if urlpath is None:
30783034
raise ValueError("To save a LazyArray you must provide an urlpath")
@@ -3357,8 +3313,44 @@ def __getitem__(self, item):
33573313
return output[item]
33583314
return self.res_getitem[item]
33593315

3360-
def save(self, **kwargs):
3361-
raise NotImplementedError("For safety reasons, this is not implemented for UDFs")
3316+
def save(self, urlpath=None, **kwargs):
3317+
if urlpath is None:
3318+
raise ValueError("To save a LazyArray you must provide an urlpath")
3319+
3320+
meta = kwargs.get("meta", {})
3321+
meta["LazyArray"] = LazyArrayEnum.UDF.value
3322+
kwargs["urlpath"] = urlpath
3323+
kwargs["meta"] = meta
3324+
kwargs["mode"] = "w" # always overwrite the file in urlpath
3325+
3326+
# Create an empty array; useful for providing the shape and dtype of the outcome
3327+
array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs)
3328+
3329+
# Save the expression and operands in the metadata
3330+
operands = {}
3331+
operands_ = self.inputs_dict
3332+
for key, value in operands_.items():
3333+
if isinstance(value, blosc2.C2Array):
3334+
operands[key] = {
3335+
"path": str(value.path),
3336+
"urlbase": value.urlbase,
3337+
}
3338+
continue
3339+
if isinstance(value, blosc2.Proxy):
3340+
# Take the required info from the Proxy._cache container
3341+
value = value._cache
3342+
if not hasattr(value, "schunk"):
3343+
raise ValueError(
3344+
"To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects"
3345+
)
3346+
if value.schunk.urlpath is None:
3347+
raise ValueError("To save a LazyArray, all operands must be stored on disk/network")
3348+
operands[key] = value.schunk.urlpath
3349+
array.schunk.vlmeta["_LazyArray"] = {
3350+
"UDF": inspect.getsource(self.func),
3351+
"operands": operands,
3352+
"name": self.func.__name__,
3353+
}
33623354

33633355

33643356
def _numpy_eval_expr(expression, operands, prefer_blosc=False):
@@ -3615,41 +3607,59 @@ def lazyexpr(
36153607

36163608
def _open_lazyarray(array):
36173609
value = array.schunk.meta["LazyArray"]
3618-
if value == LazyArrayEnum.UDF.value:
3619-
raise NotImplementedError("For safety reasons, persistent UDFs are not supported")
3620-
3621-
# LazyExpr
36223610
lazyarray = array.schunk.vlmeta["_LazyArray"]
3611+
if value == LazyArrayEnum.Expr.value:
3612+
expr = lazyarray["expression"]
3613+
elif value == LazyArrayEnum.UDF.value:
3614+
expr = lazyarray["UDF"]
3615+
else:
3616+
raise ValueError("Argument `array` is not LazyExpr or LazyUDF instance.")
3617+
36233618
operands = lazyarray["operands"]
36243619
parent_path = Path(array.schunk.urlpath).parent
36253620
operands_dict = {}
36263621
missing_ops = {}
3627-
for key, value in operands.items():
3628-
if isinstance(value, str):
3629-
value = parent_path / value
3622+
for key, v in operands.items():
3623+
if isinstance(v, str):
3624+
v = parent_path / v
36303625
try:
3631-
op = blosc2.open(value)
3626+
op = blosc2.open(v)
36323627
except FileNotFoundError:
3633-
missing_ops[key] = value
3628+
missing_ops[key] = v
36343629
else:
36353630
operands_dict[key] = op
3636-
elif isinstance(value, dict):
3631+
elif isinstance(v, dict):
36373632
# C2Array
36383633
operands_dict[key] = blosc2.C2Array(
3639-
pathlib.Path(value["path"]).as_posix(),
3640-
urlbase=value["urlbase"],
3634+
pathlib.Path(v["path"]).as_posix(),
3635+
urlbase=v["urlbase"],
36413636
)
36423637
else:
36433638
raise TypeError("Error when retrieving the operands")
36443639

3645-
expr = lazyarray["expression"]
36463640
if missing_ops:
36473641
exc = exceptions.MissingOperands(expr, missing_ops)
36483642
exc.expr = expr
36493643
exc.missing_ops = missing_ops
36503644
raise exc
36513645

3652-
new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None)
3646+
# LazyExpr
3647+
if value == LazyArrayEnum.Expr.value:
3648+
new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None)
3649+
elif value == LazyArrayEnum.UDF.value:
3650+
local_ns = {}
3651+
exec(expr, {"np": np, "blosc2": blosc2}, local_ns)
3652+
name = lazyarray["name"]
3653+
func = local_ns[name]
3654+
# TODO: make more robust for general kwargs (not just cparams)
3655+
new_expr = blosc2.lazyudf(
3656+
func,
3657+
tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))),
3658+
shape=array.shape,
3659+
dtype=array.dtype,
3660+
cparams=array.cparams,
3661+
)
3662+
36533663
# Make the array info available for the user (only available when opened from disk)
36543664
new_expr.array = array
36553665
# We want to expose schunk too, so that .info() can be used on the LazyArray

tests/ndarray/test_lazyudf.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,3 +455,25 @@ def test_clip_logaddexp(shape, chunks, blocks, slices):
455455
np.testing.assert_allclose(expr, np.sin(np.logaddexp(npb, npa)))
456456
expr = blosc2.evaluate("clip(logaddexp(b, a), 6, 12)")
457457
np.testing.assert_allclose(expr, np.clip(np.logaddexp(npb, npa), 6, 12))
458+
459+
460+
def test_save_ludf():
461+
shape = (23,)
462+
npa = np.arange(start=0, stop=np.prod(shape)).reshape(shape)
463+
blosc2.remove_urlpath("a.b2nd")
464+
array = blosc2.asarray(npa, urlpath="a.b2nd")
465+
466+
# Assert that shape is computed correctly
467+
npc = npa + 1
468+
cparams = {"nthreads": 4}
469+
urlpath = "lazyarray.b2nd"
470+
blosc2.remove_urlpath(urlpath)
471+
472+
expr = blosc2.lazyudf(udf1p, (array,), np.float64, cparams=cparams)
473+
474+
expr.save(urlpath=urlpath)
475+
del expr
476+
expr = blosc2.open(urlpath)
477+
assert isinstance(expr, blosc2.LazyUDF)
478+
res_lazyexpr = expr.compute()
479+
np.testing.assert_array_equal(res_lazyexpr[:], npc)

0 commit comments

Comments
 (0)