diff --git a/src/DAT/broadcast.jl b/src/DAT/broadcast.jl index b7d9b49a9..4ab4e94cf 100644 --- a/src/DAT/broadcast.jl +++ b/src/DAT/broadcast.jl @@ -2,8 +2,14 @@ struct XStyle <: Broadcast.BroadcastStyle end Base.BroadcastStyle(::Broadcast.AbstractArrayStyle, ::XStyle) = XStyle() Base.BroadcastStyle(::XStyle, ::Broadcast.AbstractArrayStyle) = XStyle() Base.BroadcastStyle(::Type{<:YAXArray}) = XStyle() -to_yax(x::Number) = YAXArray((), fill(x)) +Base.BroadcastStyle(::Type{<:DimWindowArray}) = XStyle() +to_yax(x::Number) = YAXArray((),fill(x)) to_yax(x::DD.AbstractDimArray) = x +to_yax(x::DimWindowArray) = x + +Base.broadcastable(d::DimWindowArray) = d +Base.broadcastable(d::YAXArray) = d + function Base.broadcasted(::XStyle, f, args...) return Broadcast.Broadcasted{XStyle}(f, args) end diff --git a/src/DAT/counter.jl b/src/DAT/counter.jl new file mode 100644 index 000000000..8e22fe11e --- /dev/null +++ b/src/DAT/counter.jl @@ -0,0 +1,14 @@ +import OnlineStats +import DiskArrayEngine as DAE + +function counter(yax, expected_values=nothing) + st = if expected_values isa AbstractUnitRange{<:Int} + compoffs = DAE.KeyConvertDicts.AddConst(Val(1-first(expected_values))) + mydicttype = DAE.KeyConvertDicts.KeyDictType(eltype(expected_values),Int,compoffs, inv(compoffs),length(expected_values)) + OnlineStats.CountMap{eltype(yax),mydicttype} + else + OnlineStats.CountMap{eltype(yax),Dict{eltype(yax),Int}} + end + dimargs = ntuple(i->i=>nothing,ndims(yax)) + DAE.aggregate_diskarray(yax.data,st,dimargs) +end \ No newline at end of file diff --git a/src/DAT/resample.jl b/src/DAT/resample.jl index 85915f68c..9499282da 100644 --- a/src/DAT/resample.jl +++ b/src/DAT/resample.jl @@ -11,6 +11,7 @@ function xresample(yax::YAXArray;to=nothing,method=Linear(),outtype=Float32) conv = map(newdims) do d dold = DD.dims(yax.axes,d) dold === nothing && return nothing + approxequal(dold,d) && return nothing idim = DD.dimnum(yax.axes,d) idim=>(valval(dold),valval(d)) end diff --git a/src/DAT/xmap.jl b/src/DAT/xmap.jl index 11baebac6..7d6a11a7a 100644 --- a/src/DAT/xmap.jl +++ b/src/DAT/xmap.jl @@ -2,15 +2,16 @@ module Xmap import DimensionalData: rebuild, dims, DimArrayOrStack, Dimension, basedims, DimTuple, _group_indices, OpaqueArray, otherdims import DimensionalData as DD -using DiskArrays: find_subranges_sorted, chunktype_from_chunksizes, GridChunks +using DiskArrays: find_subranges_sorted, chunktype_from_chunksizes, GridChunks,isdisk using ..YAXArrays import ..Cubes: YAXArray import DiskArrayEngine as DAE import IntervalSets: Interval +import DiskArrayEngine.compute include("resample.jl") -export windows, xmap, Whole, xmap, XOutput, compute_to_zarr, xresample, MovingIntervals, XFunction, ⊘ +export windows, xmap, Whole, xmap, XOutput, compute_to_zarr, xresample, MovingIntervals, XFunction, ⊘, compute struct Whole <: DD.AbstractBins end function DD._group_indices(dim::DD.Dimension, ::Whole; labels=nothing) @@ -54,6 +55,50 @@ time_mean = xmap(mean, w, inplace=false) ⊘(a,b) = ⊘(a,(b,)) windows(A::DimArrayOrStack) = DimWindowArray(A,DD.dims(A),map(d->1:length(d),DD.dims(A)),DD.dims(A)) windows(A::DimArrayOrStack, x) = windows(A, dims(x)) +#Method to group by another DimArray defined by groups. This array might be very +#large, so we will not be able to determine the groups in all cases and might +#introduce an axis of unknown length +struct UnknownValues{T} + eltype::Type{T} +end +struct GroupIndices{I} + indices::I + dims_destroyed + dims_new +end +function windows(A::DimArrayOrStack, groups::DD.AbstractDimArray;groupname=:group, expected_groups=nothing) + groupdims = DD.dims(groups) + arraydims = DD.dims(A) + expanddims = otherdims(groupdims, arraydims) + groupdimvals = if !isnothing(expected_groups) + expected_groups + elseif isdisk(DD.data(groups)) + UnknownValues(eltype(groups)) + else + ug = collect(skipmissing(unique(groups))) + mi,ma = extrema(ug) + if all(isinteger,ug) && length(ug)/ (ma-mi+1) > 0.5 + Int(mi):Int(ma) + else + sort!(ug) + end + end + array_indices = map(DD.dims(A)) do d + DD.rebuild(d,1:length(d)) + end + newindices = GroupIndices(DD.data(groups),groupdims, DD.Dim{groupname}(groupdimvals)) + gd = DD.dims(array_indices,first(groupdims)) + newind = DD.rebuild(gd,newindices) + array_indices_view = DD.setdims(array_indices, newind) + array_indices_pure = map(DD.val, DD.otherdims(array_indices_view,Base.tail(groupdims))) + dim_orig = DD.dims(A) + newdim_firstname = DD.rebuild(first(groupdims),groupdimvals) + newdims = DD.setdims(dim_orig, newdim_firstname) + newdims = DD.otherdims(newdims, Base.tail(groupdims)) + irep = DD.dimnum(newdims, first(groupdims)) + newdims = Base.setindex(newdims, DD.Dim{groupname}(groupdimvals),irep) + return DimWindowArray(A, newdims, array_indices_pure, dim_orig) +end windows(A::DimArrayOrStack, dimfuncs::Dimension...) = windows(A, dimfuncs) function windows( A::DimArrayOrStack, p1::Pair{<:Any,<:Base.Callable}, ps::Pair{<:Any,<:Base.Callable}...; @@ -63,6 +108,7 @@ function windows( end return windows(A, dims) end + function windows(A::DimArrayOrStack, dimfuncs::DimTuple) length(otherdims(dimfuncs, dims(A))) > 0 && DD.Dimensions._extradimserror(otherdims(dimfuncs, dims(A))) @@ -85,10 +131,6 @@ function windows(A::DimArrayOrStack, dimfuncs::DimTuple) array_indices_pure = map(DD.val, array_indices_view) dim_orig = DD.dims(A) newdims = DD.setdims(dim_orig, group_dims) - N = ndims(A) - indt = map(eltype,array_indices_pure) - et = Base.promote_op(getindex,typeof(A),indt...) - #etdim = mapreduce(ndims ∘ eltype, +, array_indices_pure) return DimWindowArray(A, newdims, array_indices_pure, dim_orig) end @@ -200,8 +242,10 @@ struct DimWindowArray{A,D,I,DO} indices::I dim_orig::DO end -Base.size(a::DimWindowArray) = length.(a.indices) -Base.getindex(a::DimWindowArray, i::Int...) = a.data[map(getindex,a.indices,i)...] +Base.size(a::DimWindowArray) = length.(a.dims) +index_group(a,i) = a[i] +index_group(a::GroupIndices,i) = findall(isequal(i),a.indices) +Base.getindex(a::DimWindowArray, i::Int...) = a.data.data[map(index_group,a.indices,i)...] DD.dims(a::DimWindowArray) = a.dims to_windowarray(d::DimWindowArray) = d to_windowarray(d) = windows(d) @@ -211,13 +255,14 @@ function Base.show(io::IO, dw::DimWindowArray) end -struct XOutput{D<:Tuple{Vararg{DD.Dimension}},T} +struct XOutput{D<:Tuple{Vararg{DD.Dimension}},R,T} outaxes::D + destroyaxes::R outtype::T properties end -function XOutput(outaxes::DD.Dimension...; outtype=1, properties=Dict()) - XOutput(outaxes, outtype,properties) +function XOutput(outaxes...; outtype=1,properties=Dict(),destroyaxes=()) + XOutput(outaxes, destroyaxes, outtype,properties) end _step(x::AbstractArray{<:Number}) = length(x) > 1 ? (last(x)-first(x))/(length(x)-1) : zero(eltype(x)) @@ -259,13 +304,62 @@ dataeltype(y::DimWindowArray) = eltype(y.data.data) tupelize(x) = (x,) tupelize(x::Tuple) = x +""" + _groupby_xmap(f,winars...;output,inplace) + +Function to handle groupby operations in `xmap`. It assumes that the only input array +is a DimWindowArray where one of the dimensions is a GroupIndices dimension. +""" +function _groupby_xmap(f,ars...;output,inplace) + + @assert length(ars) == 1 + g = only(ars) + inds = findall(i->isa(i,Xmap.GroupIndices), g.indices) + #For now we allow only a single group array + @assert length(inds) == 1 + igroup = only(inds) + + preproc, groupconv = (identity, identity) + _f = isa(f,XFunction) ? f.f : f + newf = DAE.disk_onlinestat(_f,preproc,groupconv) + + outputs = XOutput(g.dims[igroup],destroyaxes=DD.otherdims(g.dim_orig,g.dims)) + + groupar = YAXArray(g.indices[igroup].dims_destroyed, g.indices[igroup].indices) + xmap(newf, g.data, groupar, output=outputs) +end + + +""" + xmap(f, ar...; output = nothing, inplace = nothing) + +Maps a function `f` over an array of `ar` of type `YAXArray` or `DimWindowArray`. +`xmap` requires the specification of a type for the output of `f`, with a default type which is 1 indicating +that the data type should be equal to the element type of the first input array. `output` must be a list of `XOutput` objects, where each contains a tuple of axes under which the results are stored and the type of the values stored. If `inplace` is `true`, then the original values are replaced in a place. `xmap` returns one or more objects of type `YAXArray` or `DimWindowArray` containing a view over the data passed to `f` by `overlaying` the outputs over the original data arrays. If reduction functions are specified, then the `xmap` outputs replace the original original data array with the reduced values. During the execution of `xmap`, the everything except `f` itself is compiled just once. Specifying `f` as an object of type `XFunction` waits until the actual function is called before compiling it. + +xmap will return a lazy representation of the resulting array. function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; output=XOutput(), inplace=default_inplace(f), function_args=(), function_kwargs=(;)) +* `output::Vector{XOutput}`: specifies the output arrays. Each XOutput object contains a tuple of axes (or symbols) to store the result and the element type of the output arrays. +* `inplace::Bool`: if `true` the function `f` operates in-place so that pre-allocated output buffers will be passed to the function as + + + +**Examples.** + + # Simple mapping + x = xmap(+, a, b) + + +""" +function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=(), kwargs=(;), output=nothing, inplace=nothing, function_args=(), function_kwargs=(;)) + output === nothing && (output = default_output(f)) + inplace === nothing && (inplace = default_inplace(f)) alldims = mapreduce(approxunion!,ars,init=[]) do ar DD.dims(ar) end @@ -277,6 +371,14 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; throw(ArgumentError("Duplicated dimensions with different values")) end + winars = map(to_windowarray,ars) + #Check for any input that contains an array groupby + is_groupby = any(winars) do a + any(Base.Fix2(isa,GroupIndices),a.indices) + end + + is_groupby && return _groupby_xmap(f,winars...;output,inplace) + #Create outspecs output = tupelize(output) @@ -286,12 +388,25 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; allinandoutdims = (unique(DD.basedims((alldims..., alloutdims...)))...,) - outaxinfo = map(output) do o outaxes = o.outaxes + destroydims = o.destroyaxes addaxes = DD.otherdims(alldims, DD.basedims(outaxes)) outwindows = map(i->[Base.OneTo(length(i))],outaxes) - extrawindows = Base.OneTo.(length.(addaxes)) + extrawindows = map(addaxes) do outax + if isnothing(DD.dims(destroydims, outax)) + Base.OneTo(length(outax)) + else + fill(1, length(outax)) + end + end + addaxes = map(addaxes) do outax + if isnothing(DD.dims(destroydims, outax)) + outax + else + DD.reducedims(outax, DD.Dim) + end + end alloutaxes = (outaxes..., addaxes...) dimsmap = DD.dimnum(allinandoutdims, alloutaxes) alloutaxes, tupelize(dimsmap), (outwindows..., extrawindows...) @@ -309,14 +424,15 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; end push!(outtypes, outtype) - sout = map(length,ax) + sout = map(win->maximum(maximum,win),w) DAE.create_outwindows(sout;dimsmap=dm,windows = w) end - daefunction = DAE.create_userfunction(f, (outtypes...,), - is_mutating=inplace, - allow_threads=false, - args=function_args, - kwargs=function_kwargs) + daefunction = if f isa DAE.UserOp + f + else + DAE.create_userfunction(f, (outtypes...,); is_mutating=inplace, allow_threads=false, + args=function_args, kwargs=function_kwargs) + end #Create DiskArrayEngine Input arrays input_arrays = map(ars) do ar a = to_windowarray(ar) @@ -461,6 +577,9 @@ XFunction(f::XFunction;kwargs...) = f default_inplace(f::XFunction) = f.inplace default_inplace(f) = true +default_output(f) = XOutput() +default_output(f::XFunction) = f.outputs + function Base.broadcasted(f::XFunction,args...) xmap(f,args...,output = f.outputs, inplace = f.inplace) @@ -474,6 +593,26 @@ function gmwop_from_conn(conn,nodes) DAE.GMDWop(inputs, outspecs, op) end +function compute(yax::YAXArray,args...;kwargs...) + if isa(yax.data,DAE.GMWOPResult) + r = if any(a->isa(a.a,DAE.GMWOPResult), yax.data.op.inars) + g = DAE.MwopGraph() + i = DAE.to_graph!(g, yax.data) + DAE.remove_aliases!(g) + DAE.fuse_graph!(g) + newop = DAE.gmwop_from_reducedgraph(g); + DAE.results_as_diskarrays(newop)[i] + else + yax.data + end + computed = DAE.compute(r,args...;kwargs...) + DD.rebuild(yax,computed) + else + @warn "Yaxarray does not wrap a computation, nothing to do" + yax + end +end + """ compute_to_zarr(ods, path; max_cache=5e8, overwrite=false) diff --git a/src/DatasetAPI/Datasets.jl b/src/DatasetAPI/Datasets.jl index 8ad7112ef..5cfece194 100644 --- a/src/DatasetAPI/Datasets.jl +++ b/src/DatasetAPI/Datasets.jl @@ -3,10 +3,10 @@ module Datasets import ..Cubes: Cubes, YAXArray, concatenatecubes, CleanMe, subsetcube, copy_diskarray, setchunks, caxes, readcubedata, cubesize, formatbytes using ...YAXArrays: YAXArrays, YAXDefaults, findAxis using DataStructures: OrderedDict, counter -using Dates: Day, Hour, Minute, Second, Month, Year, Date, DateTime, TimeType, AbstractDateTime, Period +using Dates: Dates, Day, Hour, Minute, Second, Month, Year, Date, DateTime, TimeType, AbstractDateTime, Period using Statistics: mean using IntervalSets: Interval, (..) -using CFTime: timedecode, timeencode, DateTimeNoLeap, DateTime360Day, DateTimeAllLeap +using CFTime: timedecode, timeencode, DateTimeNoLeap, DateTime360Day, DateTimeAllLeap, CFTime using YAXArrayBase using YAXArrayBase: iscontdimval, add_var using DiskArrayTools: CFDiskArray, diskstack @@ -249,7 +249,7 @@ function Base.getindex(x::Dataset; var = nothing, kwargs...) Dataset(; properties=x.properties, map(ds -> ds => subsetifdimexists(cc[ds]; kwargs...), collect(keys(cc)))...) end end -function collectdims(g) +function collectdims(g; force_datetime=true) dlist = Set{Tuple{String,Int,Int}}() varnames = get_varnames(g) foreach(varnames) do k @@ -267,13 +267,19 @@ function collectdims(g) end end end - outd = Dict(d[1] => (ax = toaxis(d[1], g, d[2], d[3]), offs = d[2]) for d in dlist) + outd = Dict(d[1] => (ax=toaxis(d[1], g, d[2], d[3]; force_datetime), offs=d[2]) for d in dlist) length(outd) == length(dlist) || throw(ArgumentError("All Arrays must have the same offset")) outd end -function toaxis(dimname, g, offs, len) +function round_datetime(dt) + origin = CFTime._origin_period(dt) + ms = Dates.Millisecond(round(Int, (dt.instant + origin + CFTime.DATETIME_OFFSET).duration)) + return DateTime(CFTime.UTInstant{Dates.Millisecond}(ms)) +end + +function toaxis(dimname, g, offs, len; force_datetime=true) axname = Symbol(dimname) if !haskey(g, dimname) return DD.rebuild(DD.name2dim(axname), 1:len) @@ -283,8 +289,17 @@ function toaxis(dimname, g, offs, len) if match(r"^(days)|(hours)|(seconds)|(months) since",lowercase(get(aratts,"units",""))) !== nothing tsteps = try timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard"))) - catch - ar[:] + catch e + if e isa InexactError + dec = timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard")), prefer_datetime=false) + if force_datetime + round_datetime.(dec) + else + dec + end + else + ar[:] + end end DD.rebuild(DD.name2dim(axname), tsteps[offs+1:end]) elseif haskey(aratts, "_ARRAYVALUES") @@ -318,7 +333,7 @@ function testrange(x) end function testrange(x::AbstractArray{<:Integer}) - length(x) == 1 && return x + length(x) <= 1 && return x steps = diff(x) if all(isequal(steps[1]), steps) && !iszero(steps[1]) return range(first(x), step = steps[1], length = length(x)) @@ -428,6 +443,7 @@ The default driver will search for available drivers and tries to detect the use - `skip_keys` are passed as symbols, i.e., `skip_keys = (:a, :b)` - `driver=:all`, common options are `:netcdf` or `:zarr`. +- `force_datetime=false` force conversion when CFTime fails with an InexactError even if milliseconds must be rounded Example: @@ -435,12 +451,12 @@ Example: ds = open_dataset(f, driver=:zarr, skip_keys = (:c,)) ```` """ -function open_dataset(g; skip_keys=(), driver = :all) +function open_dataset(g; skip_keys=(), driver=:all, force_datetime=false) str_skipkeys = string.(skip_keys) dsopen = YAXArrayBase.to_dataset(g, driver = driver) YAXArrayBase.open_dataset_handle(dsopen) do g isempty(get_varnames(g)) && throw(ArgumentError("Group does not contain datasets.")) - dimlist = collectdims(g) + dimlist = collectdims(g; force_datetime) dnames = string.(keys(dimlist)) varlist = filter(get_varnames(g)) do vn upname = uppercase(vn) diff --git a/src/YAXArrays.jl b/src/YAXArrays.jl index a957e30c1..f5e8d1cbe 100644 --- a/src/YAXArrays.jl +++ b/src/YAXArrays.jl @@ -39,6 +39,7 @@ include("Cubes/Cubes.jl") include("DatasetAPI/Datasets.jl") include("DAT/DAT.jl") include("DAT/xmap.jl") +include("DAT/counter.jl") using Reexport: @reexport using YAXArrayBase: getattributes