From 285aad3f487031c3bc0ed26c672ca9c68ab5493e Mon Sep 17 00:00:00 2001 From: meggart Date: Mon, 3 Nov 2025 14:05:33 +0100 Subject: [PATCH 1/5] merge commit --- src/DAT/broadcast.jl | 8 +- src/DAT/counter.jl | 14 +++ src/DAT/resample.jl | 1 + src/DAT/xmap.jl | 182 +++++++++++++++++++++++++++++++++++-- src/DatasetAPI/Datasets.jl | 2 +- src/YAXArrays.jl | 1 + 6 files changed, 196 insertions(+), 12 deletions(-) create mode 100644 src/DAT/counter.jl diff --git a/src/DAT/broadcast.jl b/src/DAT/broadcast.jl index b7d9b49a9..4ab4e94cf 100644 --- a/src/DAT/broadcast.jl +++ b/src/DAT/broadcast.jl @@ -2,8 +2,14 @@ struct XStyle <: Broadcast.BroadcastStyle end Base.BroadcastStyle(::Broadcast.AbstractArrayStyle, ::XStyle) = XStyle() Base.BroadcastStyle(::XStyle, ::Broadcast.AbstractArrayStyle) = XStyle() Base.BroadcastStyle(::Type{<:YAXArray}) = XStyle() -to_yax(x::Number) = YAXArray((), fill(x)) +Base.BroadcastStyle(::Type{<:DimWindowArray}) = XStyle() +to_yax(x::Number) = YAXArray((),fill(x)) to_yax(x::DD.AbstractDimArray) = x +to_yax(x::DimWindowArray) = x + +Base.broadcastable(d::DimWindowArray) = d +Base.broadcastable(d::YAXArray) = d + function Base.broadcasted(::XStyle, f, args...) return Broadcast.Broadcasted{XStyle}(f, args) end diff --git a/src/DAT/counter.jl b/src/DAT/counter.jl new file mode 100644 index 000000000..8e22fe11e --- /dev/null +++ b/src/DAT/counter.jl @@ -0,0 +1,14 @@ +import OnlineStats +import DiskArrayEngine as DAE + +function counter(yax, expected_values=nothing) + st = if expected_values isa AbstractUnitRange{<:Int} + compoffs = DAE.KeyConvertDicts.AddConst(Val(1-first(expected_values))) + mydicttype = DAE.KeyConvertDicts.KeyDictType(eltype(expected_values),Int,compoffs, inv(compoffs),length(expected_values)) + OnlineStats.CountMap{eltype(yax),mydicttype} + else + OnlineStats.CountMap{eltype(yax),Dict{eltype(yax),Int}} + end + dimargs = ntuple(i->i=>nothing,ndims(yax)) + DAE.aggregate_diskarray(yax.data,st,dimargs) +end \ No newline at end of file diff --git a/src/DAT/resample.jl b/src/DAT/resample.jl index 85915f68c..9499282da 100644 --- a/src/DAT/resample.jl +++ b/src/DAT/resample.jl @@ -11,6 +11,7 @@ function xresample(yax::YAXArray;to=nothing,method=Linear(),outtype=Float32) conv = map(newdims) do d dold = DD.dims(yax.axes,d) dold === nothing && return nothing + approxequal(dold,d) && return nothing idim = DD.dimnum(yax.axes,d) idim=>(valval(dold),valval(d)) end diff --git a/src/DAT/xmap.jl b/src/DAT/xmap.jl index 11baebac6..108482c4d 100644 --- a/src/DAT/xmap.jl +++ b/src/DAT/xmap.jl @@ -2,15 +2,16 @@ module Xmap import DimensionalData: rebuild, dims, DimArrayOrStack, Dimension, basedims, DimTuple, _group_indices, OpaqueArray, otherdims import DimensionalData as DD -using DiskArrays: find_subranges_sorted, chunktype_from_chunksizes, GridChunks +using DiskArrays: find_subranges_sorted, chunktype_from_chunksizes, GridChunks,isdisk using ..YAXArrays import ..Cubes: YAXArray import DiskArrayEngine as DAE import IntervalSets: Interval +import DiskArrayEngine.compute include("resample.jl") -export windows, xmap, Whole, xmap, XOutput, compute_to_zarr, xresample, MovingIntervals, XFunction, ⊘ +export windows, xmap, Whole, xmap, XOutput, compute_to_zarr, xresample, MovingIntervals, XFunction, ⊘, compute struct Whole <: DD.AbstractBins end function DD._group_indices(dim::DD.Dimension, ::Whole; labels=nothing) @@ -54,6 +55,50 @@ time_mean = xmap(mean, w, inplace=false) ⊘(a,b) = ⊘(a,(b,)) windows(A::DimArrayOrStack) = DimWindowArray(A,DD.dims(A),map(d->1:length(d),DD.dims(A)),DD.dims(A)) windows(A::DimArrayOrStack, x) = windows(A, dims(x)) +#Method to group by another DimArray defined by groups. This array might be very +#large, so we will not be able to determine the groups in all cases and might +#introduce an axis of unknown length +struct UnknownValues{T} + eltype::Type{T} +end +struct GroupIndices{I} + indices::I + dims_destroyed + dims_new +end +function windows(A::DimArrayOrStack, groups::DD.AbstractDimArray;groupname=:group, expected_groups=nothing) + groupdims = DD.dims(groups) + arraydims = DD.dims(A) + expanddims = otherdims(groupdims, arraydims) + groupdimvals = if !isnothing(expected_groups) + expected_groups + elseif isdisk(DD.data(groups)) + UnknownValues(eltype(groups)) + else + ug = collect(skipmissing(unique(groups))) + mi,ma = extrema(ug) + if all(isinteger,ug) && length(ug)/ (ma-mi+1) > 0.5 + Int(mi):Int(ma) + else + sort!(ug) + end + end + array_indices = map(DD.dims(A)) do d + DD.rebuild(d,1:length(d)) + end + newindices = GroupIndices(DD.data(groups),groupdims, DD.Dim{groupname}(groupdimvals)) + gd = DD.dims(array_indices,first(groupdims)) + newind = DD.rebuild(gd,newindices) + array_indices_view = DD.setdims(array_indices, newind) + array_indices_pure = map(DD.val, DD.otherdims(array_indices_view,Base.tail(groupdims))) + dim_orig = DD.dims(A) + newdim_firstname = DD.rebuild(first(groupdims),groupdimvals) + newdims = DD.setdims(dim_orig, newdim_firstname) + newdims = DD.otherdims(newdims, Base.tail(groupdims)) + irep = DD.dimnum(newdims, first(groupdims)) + newdims = Base.setindex(newdims, DD.Dim{groupname}(groupdimvals),irep) + return DimWindowArray(A, newdims, array_indices_pure, dim_orig) +end windows(A::DimArrayOrStack, dimfuncs::Dimension...) = windows(A, dimfuncs) function windows( A::DimArrayOrStack, p1::Pair{<:Any,<:Base.Callable}, ps::Pair{<:Any,<:Base.Callable}...; @@ -63,6 +108,7 @@ function windows( end return windows(A, dims) end + function windows(A::DimArrayOrStack, dimfuncs::DimTuple) length(otherdims(dimfuncs, dims(A))) > 0 && DD.Dimensions._extradimserror(otherdims(dimfuncs, dims(A))) @@ -85,10 +131,6 @@ function windows(A::DimArrayOrStack, dimfuncs::DimTuple) array_indices_pure = map(DD.val, array_indices_view) dim_orig = DD.dims(A) newdims = DD.setdims(dim_orig, group_dims) - N = ndims(A) - indt = map(eltype,array_indices_pure) - et = Base.promote_op(getindex,typeof(A),indt...) - #etdim = mapreduce(ndims ∘ eltype, +, array_indices_pure) return DimWindowArray(A, newdims, array_indices_pure, dim_orig) end @@ -200,8 +242,10 @@ struct DimWindowArray{A,D,I,DO} indices::I dim_orig::DO end -Base.size(a::DimWindowArray) = length.(a.indices) -Base.getindex(a::DimWindowArray, i::Int...) = a.data[map(getindex,a.indices,i)...] +Base.size(a::DimWindowArray) = length.(a.dims) +index_group(a,i) = a[i] +index_group(a::GroupIndices,i) = findall(isequal(i),a.indices) +Base.getindex(a::DimWindowArray, i::Int...) = a.data.data[map(index_group,a.indices,i)...] DD.dims(a::DimWindowArray) = a.dims to_windowarray(d::DimWindowArray) = d to_windowarray(d) = windows(d) @@ -211,13 +255,19 @@ function Base.show(io::IO, dw::DimWindowArray) end -struct XOutput{D<:Tuple{Vararg{DD.Dimension}},T} +struct XOutput{D<:Tuple{Vararg{DD.Dimension}},R,T} outaxes::D + destroyaxes::R outtype::T properties end +<<<<<<< HEAD function XOutput(outaxes::DD.Dimension...; outtype=1, properties=Dict()) XOutput(outaxes, outtype,properties) +======= +function XOutput(outaxes...; outtype=1,properties=Dict(),destroyaxes=()) + XOutput(outaxes, destroyaxes, outtype,properties) +>>>>>>> 0c911fba... xmap updates end _step(x::AbstractArray{<:Number}) = length(x) > 1 ? (last(x)-first(x))/(length(x)-1) : zero(eltype(x)) @@ -257,15 +307,69 @@ end dataeltype(y::YAXArray) = eltype(y.data) dataeltype(y::DimWindowArray) = eltype(y.data.data) +<<<<<<< HEAD tupelize(x) = (x,) tupelize(x::Tuple) = x +======= +""" + _groupby_xmap(f,winars...;output,inplace) + +Function to handle groupby operations in `xmap`. It assumes that the only input array +is a DimWindowArray where one of the dimensions is a GroupIndices dimension. +""" +function _groupby_xmap(f,ars...;output,inplace) + + @assert length(ars) == 1 + g = only(ars) + inds = findall(i->isa(i,Xmap.GroupIndices), g.indices) + #For now we allow only a single group array + @assert length(inds) == 1 + igroup = only(inds) + + preproc, groupconv = (identity, identity) + _f = isa(f,XFunction) ? f.f : f + newf = DAE.disk_onlinestat(_f,preproc,groupconv) + + outputs = XOutput(g.dims[igroup],destroyaxes=DD.otherdims(g.dim_orig,g.dims)) + groupar = YAXArray(g.indices[igroup].dims_destroyed, g.indices[igroup].indices) + xmap(newf, g.data, groupar, output=outputs) +end + + +""" + xmap(f, ar...; output = nothing, inplace = nothing) + +Maps a function `f` over an array of `ar` of type `YAXArray` or `DimWindowArray`. + +`xmap` requires the specification of a type for the output of `f`, with a default type which is 1 indicating +that the data type should be equal to the element type of the first input array. `output` must be a list of `XOutput` objects, where each contains a tuple of axes under which the results are stored and the type of the values stored. If `inplace` is `true`, then the original values are replaced in a place. `xmap` returns one or more objects of type `YAXArray` or `DimWindowArray` containing a view over the data passed to `f` by `overlaying` the outputs over the original data arrays. If reduction functions are specified, then the `xmap` outputs replace the original original data array with the reduced values. During the execution of `xmap`, the everything except `f` itself is compiled just once. Specifying `f` as an object of type `XFunction` waits until the actual function is called before compiling it. +>>>>>>> 0c911fba... xmap updates +xmap will return a lazy representation of the resulting array. +<<<<<<< HEAD function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; output=XOutput(), inplace=default_inplace(f), function_args=(), function_kwargs=(;)) +======= +* `output::Vector{XOutput}`: specifies the output arrays. Each XOutput object contains a tuple of axes (or symbols) to store the result and the element type of the output arrays. +* `inplace::Bool`: if `true` the function `f` operates in-place so that pre-allocated output buffers will be passed to the function as + + + +**Examples.** + + # Simple mapping + x = xmap(+, a, b) + + +""" +function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=(), kwargs=(;), output = nothing,inplace=nothing) + output === nothing && (output = default_output(f)) + inplace === nothing && (inplace = default_inplace(f)) +>>>>>>> 0c911fba... xmap updates alldims = mapreduce(approxunion!,ars,init=[]) do ar DD.dims(ar) end @@ -277,6 +381,13 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; throw(ArgumentError("Duplicated dimensions with different values")) end + winars = map(to_windowarray,ars) + #Check for any input that contains an array groupby + is_groupby = any(winars) do a + any(Base.Fix2(isa,GroupIndices),a.indices) + end + is_groupby && return _groupby_xmap(f,winars...;output,inplace) + #Create outspecs output = tupelize(output) @@ -288,6 +399,7 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; outaxinfo = map(output) do o +<<<<<<< HEAD outaxes = o.outaxes addaxes = DD.otherdims(alldims, DD.basedims(outaxes)) outwindows = map(i->[Base.OneTo(length(i))],outaxes) @@ -295,6 +407,43 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; alloutaxes = (outaxes..., addaxes...) dimsmap = DD.dimnum(allinandoutdims, alloutaxes) alloutaxes, tupelize(dimsmap), (outwindows..., extrawindows...) +======= + r = map(o.outaxes) do ax + ax_indim = DD.dims(alldims,ax) + if isnothing(ax_indim) + ax, length(o.outaxes)+length(alldims) + else + idim = DD.dimnum(alldims,ax) + ax, idim + end + end + outaxes = map(first,r) + dimsmap = map(last,r) + #Add some logic for destroying dimensions, i.e. in reducing operations + destroydims = o.destroyaxes + addaxes = DD.otherdims(alldims,DD.basedims(outaxes)) + + + dimsmapadd = setdiff(ntuple(identity,length(alldims)),dimsmap) + outwindows = map(outaxes) do outax + [Base.OneTo(length(outax))] + end + extrawindows = map(addaxes) do outax + if isnothing(DD.dims(destroydims,outax)) + Base.OneTo(length(outax)) + else + fill(1, length(outax)) + end + end + addaxes = map(addaxes) do outax + if isnothing(DD.dims(destroydims,outax)) + outax + else + DD.reducedims(outax, DD.Dim) + end + end + (outaxes...,addaxes...), (dimsmap...,dimsmapadd...), (outwindows...,extrawindows...) +>>>>>>> 0c911fba... xmap updates end outaxes = map(first,outaxinfo) dimsmap = map(Base.Fix2(getindex,2),outaxinfo) @@ -309,14 +458,22 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; end push!(outtypes, outtype) - sout = map(length,ax) + sout = map(win->maximum(maximum,win),w) DAE.create_outwindows(sout;dimsmap=dm,windows = w) end +<<<<<<< HEAD daefunction = DAE.create_userfunction(f, (outtypes...,), is_mutating=inplace, allow_threads=false, args=function_args, kwargs=function_kwargs) +======= + daefunction = if f isa DAE.UserOp + f + else + DAE.create_userfunction(f, (outtypes...,); is_mutating=inplace,allow_threads=false,args, kwargs) + end +>>>>>>> 0c911fba... xmap updates #Create DiskArrayEngine Input arrays input_arrays = map(ars) do ar a = to_windowarray(ar) @@ -461,6 +618,9 @@ XFunction(f::XFunction;kwargs...) = f default_inplace(f::XFunction) = f.inplace default_inplace(f) = true +default_output(f) = XOutput() +default_output(f::XFunction) = f.outputs + function Base.broadcasted(f::XFunction,args...) xmap(f,args...,output = f.outputs, inplace = f.inplace) @@ -474,6 +634,8 @@ function gmwop_from_conn(conn,nodes) DAE.GMDWop(inputs, outspecs, op) end +compute(yax::YAXArray,args...;kwargs...) = DD.rebuild(yax,DAE.compute(yax.data,args...;kwargs...)) + """ compute_to_zarr(ods, path; max_cache=5e8, overwrite=false) diff --git a/src/DatasetAPI/Datasets.jl b/src/DatasetAPI/Datasets.jl index 8ad7112ef..e8e20d94b 100644 --- a/src/DatasetAPI/Datasets.jl +++ b/src/DatasetAPI/Datasets.jl @@ -318,7 +318,7 @@ function testrange(x) end function testrange(x::AbstractArray{<:Integer}) - length(x) == 1 && return x + length(x) <= 1 && return x steps = diff(x) if all(isequal(steps[1]), steps) && !iszero(steps[1]) return range(first(x), step = steps[1], length = length(x)) diff --git a/src/YAXArrays.jl b/src/YAXArrays.jl index a957e30c1..f5e8d1cbe 100644 --- a/src/YAXArrays.jl +++ b/src/YAXArrays.jl @@ -39,6 +39,7 @@ include("Cubes/Cubes.jl") include("DatasetAPI/Datasets.jl") include("DAT/DAT.jl") include("DAT/xmap.jl") +include("DAT/counter.jl") using Reexport: @reexport using YAXArrayBase: getattributes From 36b8b77a4658c81a10de8f803c53bf1adb751ae5 Mon Sep 17 00:00:00 2001 From: meggart Date: Thu, 26 Jun 2025 16:59:04 +0200 Subject: [PATCH 2/5] Merge graph also for to_memory computations --- src/DAT/xmap.jl | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/DAT/xmap.jl b/src/DAT/xmap.jl index 108482c4d..a6b7f33be 100644 --- a/src/DAT/xmap.jl +++ b/src/DAT/xmap.jl @@ -634,7 +634,25 @@ function gmwop_from_conn(conn,nodes) DAE.GMDWop(inputs, outspecs, op) end -compute(yax::YAXArray,args...;kwargs...) = DD.rebuild(yax,DAE.compute(yax.data,args...;kwargs...)) +function compute(yax::YAXArray,args...;kwargs...) + if isa(yax.data,DAE.GMWOPResult) + r = if any(a->isa(a.a,DAE.GMWOPResult), yax.data.op.inars) + g = DAE.MwopGraph() + i = DAE.to_graph!(g, yax.data) + DAE.remove_aliases!(g) + DAE.fuse_graph!(g) + newop = DAE.gmwop_from_reducedgraph(g); + DAE.results_as_diskarrays(newop)[i] + else + yax.data + end + computed = DAE.compute(r,args...;kwargs...) + DD.rebuild(yax,computed) + else + @warn "Yaxarray does not wrap a computation, nothing to do" + yax + end +end """ compute_to_zarr(ods, path; max_cache=5e8, overwrite=false) From cc2a03a984d88b6c411b33f72f21a713c6cea53d Mon Sep 17 00:00:00 2001 From: meggart Date: Mon, 3 Nov 2025 15:17:10 +0100 Subject: [PATCH 3/5] finish merge --- src/DAT/xmap.jl | 57 ------------------------------------------------- 1 file changed, 57 deletions(-) diff --git a/src/DAT/xmap.jl b/src/DAT/xmap.jl index a6b7f33be..17f0c79e3 100644 --- a/src/DAT/xmap.jl +++ b/src/DAT/xmap.jl @@ -261,13 +261,8 @@ struct XOutput{D<:Tuple{Vararg{DD.Dimension}},R,T} outtype::T properties end -<<<<<<< HEAD -function XOutput(outaxes::DD.Dimension...; outtype=1, properties=Dict()) - XOutput(outaxes, outtype,properties) -======= function XOutput(outaxes...; outtype=1,properties=Dict(),destroyaxes=()) XOutput(outaxes, destroyaxes, outtype,properties) ->>>>>>> 0c911fba... xmap updates end _step(x::AbstractArray{<:Number}) = length(x) > 1 ? (last(x)-first(x))/(length(x)-1) : zero(eltype(x)) @@ -307,10 +302,8 @@ end dataeltype(y::YAXArray) = eltype(y.data) dataeltype(y::DimWindowArray) = eltype(y.data.data) -<<<<<<< HEAD tupelize(x) = (x,) tupelize(x::Tuple) = x -======= """ _groupby_xmap(f,winars...;output,inplace) @@ -343,17 +336,14 @@ Maps a function `f` over an array of `ar` of type `YAXArray` or `DimWindowArray` `xmap` requires the specification of a type for the output of `f`, with a default type which is 1 indicating that the data type should be equal to the element type of the first input array. `output` must be a list of `XOutput` objects, where each contains a tuple of axes under which the results are stored and the type of the values stored. If `inplace` is `true`, then the original values are replaced in a place. `xmap` returns one or more objects of type `YAXArray` or `DimWindowArray` containing a view over the data passed to `f` by `overlaying` the outputs over the original data arrays. If reduction functions are specified, then the `xmap` outputs replace the original original data array with the reduced values. During the execution of `xmap`, the everything except `f` itself is compiled just once. Specifying `f` as an object of type `XFunction` waits until the actual function is called before compiling it. ->>>>>>> 0c911fba... xmap updates xmap will return a lazy representation of the resulting array. -<<<<<<< HEAD function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; output=XOutput(), inplace=default_inplace(f), function_args=(), function_kwargs=(;)) -======= * `output::Vector{XOutput}`: specifies the output arrays. Each XOutput object contains a tuple of axes (or symbols) to store the result and the element type of the output arrays. * `inplace::Bool`: if `true` the function `f` operates in-place so that pre-allocated output buffers will be passed to the function as @@ -369,7 +359,6 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=(), kwargs=(;), output = nothing,inplace=nothing) output === nothing && (output = default_output(f)) inplace === nothing && (inplace = default_inplace(f)) ->>>>>>> 0c911fba... xmap updates alldims = mapreduce(approxunion!,ars,init=[]) do ar DD.dims(ar) end @@ -399,7 +388,6 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() outaxinfo = map(output) do o -<<<<<<< HEAD outaxes = o.outaxes addaxes = DD.otherdims(alldims, DD.basedims(outaxes)) outwindows = map(i->[Base.OneTo(length(i))],outaxes) @@ -407,43 +395,6 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() alloutaxes = (outaxes..., addaxes...) dimsmap = DD.dimnum(allinandoutdims, alloutaxes) alloutaxes, tupelize(dimsmap), (outwindows..., extrawindows...) -======= - r = map(o.outaxes) do ax - ax_indim = DD.dims(alldims,ax) - if isnothing(ax_indim) - ax, length(o.outaxes)+length(alldims) - else - idim = DD.dimnum(alldims,ax) - ax, idim - end - end - outaxes = map(first,r) - dimsmap = map(last,r) - #Add some logic for destroying dimensions, i.e. in reducing operations - destroydims = o.destroyaxes - addaxes = DD.otherdims(alldims,DD.basedims(outaxes)) - - - dimsmapadd = setdiff(ntuple(identity,length(alldims)),dimsmap) - outwindows = map(outaxes) do outax - [Base.OneTo(length(outax))] - end - extrawindows = map(addaxes) do outax - if isnothing(DD.dims(destroydims,outax)) - Base.OneTo(length(outax)) - else - fill(1, length(outax)) - end - end - addaxes = map(addaxes) do outax - if isnothing(DD.dims(destroydims,outax)) - outax - else - DD.reducedims(outax, DD.Dim) - end - end - (outaxes...,addaxes...), (dimsmap...,dimsmapadd...), (outwindows...,extrawindows...) ->>>>>>> 0c911fba... xmap updates end outaxes = map(first,outaxinfo) dimsmap = map(Base.Fix2(getindex,2),outaxinfo) @@ -461,19 +412,11 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() sout = map(win->maximum(maximum,win),w) DAE.create_outwindows(sout;dimsmap=dm,windows = w) end -<<<<<<< HEAD - daefunction = DAE.create_userfunction(f, (outtypes...,), - is_mutating=inplace, - allow_threads=false, - args=function_args, - kwargs=function_kwargs) -======= daefunction = if f isa DAE.UserOp f else DAE.create_userfunction(f, (outtypes...,); is_mutating=inplace,allow_threads=false,args, kwargs) end ->>>>>>> 0c911fba... xmap updates #Create DiskArrayEngine Input arrays input_arrays = map(ars) do ar a = to_windowarray(ar) From b76687f559b4ad894ee4548989b8490b9bda8b11 Mon Sep 17 00:00:00 2001 From: Fabian Gans Date: Fri, 7 Nov 2025 08:39:53 +0100 Subject: [PATCH 4/5] some fixes --- src/DAT/xmap.jl | 24 ++++++++++++++++++++---- src/DatasetAPI/Datasets.jl | 19 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/DAT/xmap.jl b/src/DAT/xmap.jl index 17f0c79e3..7d6a11a7a 100644 --- a/src/DAT/xmap.jl +++ b/src/DAT/xmap.jl @@ -324,6 +324,7 @@ function _groupby_xmap(f,ars...;output,inplace) newf = DAE.disk_onlinestat(_f,preproc,groupconv) outputs = XOutput(g.dims[igroup],destroyaxes=DD.otherdims(g.dim_orig,g.dims)) + groupar = YAXArray(g.indices[igroup].dims_destroyed, g.indices[igroup].indices) xmap(newf, g.data, groupar, output=outputs) end @@ -356,7 +357,7 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; """ -function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=(), kwargs=(;), output = nothing,inplace=nothing) +function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=(), kwargs=(;), output=nothing, inplace=nothing, function_args=(), function_kwargs=(;)) output === nothing && (output = default_output(f)) inplace === nothing && (inplace = default_inplace(f)) alldims = mapreduce(approxunion!,ars,init=[]) do ar @@ -375,6 +376,7 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() is_groupby = any(winars) do a any(Base.Fix2(isa,GroupIndices),a.indices) end + is_groupby && return _groupby_xmap(f,winars...;output,inplace) #Create outspecs @@ -386,12 +388,25 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() allinandoutdims = (unique(DD.basedims((alldims..., alloutdims...)))...,) - outaxinfo = map(output) do o outaxes = o.outaxes + destroydims = o.destroyaxes addaxes = DD.otherdims(alldims, DD.basedims(outaxes)) outwindows = map(i->[Base.OneTo(length(i))],outaxes) - extrawindows = Base.OneTo.(length.(addaxes)) + extrawindows = map(addaxes) do outax + if isnothing(DD.dims(destroydims, outax)) + Base.OneTo(length(outax)) + else + fill(1, length(outax)) + end + end + addaxes = map(addaxes) do outax + if isnothing(DD.dims(destroydims, outax)) + outax + else + DD.reducedims(outax, DD.Dim) + end + end alloutaxes = (outaxes..., addaxes...) dimsmap = DD.dimnum(allinandoutdims, alloutaxes) alloutaxes, tupelize(dimsmap), (outwindows..., extrawindows...) @@ -415,7 +430,8 @@ function xmap(f, ars::Union{YAXArrays.Cubes.YAXArray,DimWindowArray}...; args=() daefunction = if f isa DAE.UserOp f else - DAE.create_userfunction(f, (outtypes...,); is_mutating=inplace,allow_threads=false,args, kwargs) + DAE.create_userfunction(f, (outtypes...,); is_mutating=inplace, allow_threads=false, + args=function_args, kwargs=function_kwargs) end #Create DiskArrayEngine Input arrays input_arrays = map(ars) do ar diff --git a/src/DatasetAPI/Datasets.jl b/src/DatasetAPI/Datasets.jl index e8e20d94b..ba91ced9a 100644 --- a/src/DatasetAPI/Datasets.jl +++ b/src/DatasetAPI/Datasets.jl @@ -3,10 +3,10 @@ module Datasets import ..Cubes: Cubes, YAXArray, concatenatecubes, CleanMe, subsetcube, copy_diskarray, setchunks, caxes, readcubedata, cubesize, formatbytes using ...YAXArrays: YAXArrays, YAXDefaults, findAxis using DataStructures: OrderedDict, counter -using Dates: Day, Hour, Minute, Second, Month, Year, Date, DateTime, TimeType, AbstractDateTime, Period +using Dates: Dates, Day, Hour, Minute, Second, Month, Year, Date, DateTime, TimeType, AbstractDateTime, Period using Statistics: mean using IntervalSets: Interval, (..) -using CFTime: timedecode, timeencode, DateTimeNoLeap, DateTime360Day, DateTimeAllLeap +using CFTime: timedecode, timeencode, DateTimeNoLeap, DateTime360Day, DateTimeAllLeap, CFTime using YAXArrayBase using YAXArrayBase: iscontdimval, add_var using DiskArrayTools: CFDiskArray, diskstack @@ -273,7 +273,13 @@ function collectdims(g) outd end -function toaxis(dimname, g, offs, len) +function round_datetime(dt) + origin = CFTime._origin_period(dt) + ms = Dates.Millisecond(round(Int, (dt.instant + origin + CFTime.DATETIME_OFFSET).duration)) + return DateTime(CFTime.UTInstant{Dates.Millisecond}(ms)) +end + +function toaxis(dimname, g, offs, len;prefer_datetime=true) axname = Symbol(dimname) if !haskey(g, dimname) return DD.rebuild(DD.name2dim(axname), 1:len) @@ -282,7 +288,12 @@ function toaxis(dimname, g, offs, len) aratts = get_var_attrs(g, dimname) if match(r"^(days)|(hours)|(seconds)|(months) since",lowercase(get(aratts,"units",""))) !== nothing tsteps = try - timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard"))) + dec = timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard")), prefer_datetime=false) + if prefer_datetime + round_datetime.(dec) + else + dec + end catch ar[:] end From 52f2bea1ffda74a310b37add9a0d75db3ef8c319 Mon Sep 17 00:00:00 2001 From: Fabian Gans Date: Mon, 10 Nov 2025 10:31:16 +0100 Subject: [PATCH 5/5] fix datetime issues --- src/DatasetAPI/Datasets.jl | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/DatasetAPI/Datasets.jl b/src/DatasetAPI/Datasets.jl index ba91ced9a..5cfece194 100644 --- a/src/DatasetAPI/Datasets.jl +++ b/src/DatasetAPI/Datasets.jl @@ -249,7 +249,7 @@ function Base.getindex(x::Dataset; var = nothing, kwargs...) Dataset(; properties=x.properties, map(ds -> ds => subsetifdimexists(cc[ds]; kwargs...), collect(keys(cc)))...) end end -function collectdims(g) +function collectdims(g; force_datetime=true) dlist = Set{Tuple{String,Int,Int}}() varnames = get_varnames(g) foreach(varnames) do k @@ -267,7 +267,7 @@ function collectdims(g) end end end - outd = Dict(d[1] => (ax = toaxis(d[1], g, d[2], d[3]), offs = d[2]) for d in dlist) + outd = Dict(d[1] => (ax=toaxis(d[1], g, d[2], d[3]; force_datetime), offs=d[2]) for d in dlist) length(outd) == length(dlist) || throw(ArgumentError("All Arrays must have the same offset")) outd @@ -279,7 +279,7 @@ function round_datetime(dt) return DateTime(CFTime.UTInstant{Dates.Millisecond}(ms)) end -function toaxis(dimname, g, offs, len;prefer_datetime=true) +function toaxis(dimname, g, offs, len; force_datetime=true) axname = Symbol(dimname) if !haskey(g, dimname) return DD.rebuild(DD.name2dim(axname), 1:len) @@ -288,14 +288,18 @@ function toaxis(dimname, g, offs, len;prefer_datetime=true) aratts = get_var_attrs(g, dimname) if match(r"^(days)|(hours)|(seconds)|(months) since",lowercase(get(aratts,"units",""))) !== nothing tsteps = try - dec = timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard")), prefer_datetime=false) - if prefer_datetime - round_datetime.(dec) + timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard"))) + catch e + if e isa InexactError + dec = timedecode(ar[:], aratts["units"], lowercase(get(aratts, "calendar", "standard")), prefer_datetime=false) + if force_datetime + round_datetime.(dec) + else + dec + end else - dec + ar[:] end - catch - ar[:] end DD.rebuild(DD.name2dim(axname), tsteps[offs+1:end]) elseif haskey(aratts, "_ARRAYVALUES") @@ -439,6 +443,7 @@ The default driver will search for available drivers and tries to detect the use - `skip_keys` are passed as symbols, i.e., `skip_keys = (:a, :b)` - `driver=:all`, common options are `:netcdf` or `:zarr`. +- `force_datetime=false` force conversion when CFTime fails with an InexactError even if milliseconds must be rounded Example: @@ -446,12 +451,12 @@ Example: ds = open_dataset(f, driver=:zarr, skip_keys = (:c,)) ```` """ -function open_dataset(g; skip_keys=(), driver = :all) +function open_dataset(g; skip_keys=(), driver=:all, force_datetime=false) str_skipkeys = string.(skip_keys) dsopen = YAXArrayBase.to_dataset(g, driver = driver) YAXArrayBase.open_dataset_handle(dsopen) do g isempty(get_varnames(g)) && throw(ArgumentError("Group does not contain datasets.")) - dimlist = collectdims(g) + dimlist = collectdims(g; force_datetime) dnames = string.(keys(dimlist)) varlist = filter(get_varnames(g)) do vn upname = uppercase(vn)