Skip to content

Commit 44f44b7

Browse files
author
Felix Cremer
committed
Add lazydiskaggregate to stack data
1 parent 4f7f07f commit 44f44b7

File tree

2 files changed

+53
-8
lines changed

2 files changed

+53
-8
lines changed

src/auxil.jl

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,42 @@ using DimensionalData: DimensionalData as DD, X, Y
66
using GeoFormatTypes
77
#using PyramidScheme: PyramidScheme as PS
88
using Rasters: Raster
9+
import DiskArrays: readblock!, IrregularChunks, AbstractDiskArray
10+
using StatsBase: rle
11+
using Statistics: mean
12+
13+
#DiskArrays.readblock!(a::AbstractArray,aout,i::AbstractUnitRange...) = copyto!(aout,view(a,i...))
14+
15+
struct LazyAggDiskArray{T,F,A} <: AbstractDiskArray{T,3}
16+
f::F
17+
arrays::A
18+
inds::IrregularChunks
19+
s::Tuple{Int,Int,Int}
20+
end
21+
function LazyAggDiskArray(f,arrays,groups)
22+
allequal(size,arrays) || error("All Arrays must have same size")
23+
allequal(eltype,arrays) || error("All Arrays must have same element type")
24+
inds = IrregularChunks(;chunksizes=last(rle(groups)))
25+
s = (size(first(arrays))...,length(inds))
26+
T = Base.promote_op(f,Vector{eltype(first(arrays))})
27+
LazyAggDiskArray{T,typeof(f),typeof(arrays)}(f,arrays,inds,s)
28+
end
29+
Base.size(a::LazyAggDiskArray) = a.s
30+
DiskArrays.haschunks(a::LazyAggDiskArray) = DiskArrays.haschunks(first(a.arrays))
31+
function DiskArrays.readblock!(a::LazyAggDiskArray,aout,i::UnitRange{Int}...)
32+
i1,i2,itime = i
33+
max_n_array = maximum(it->length(a.inds[it]),itime)
34+
buf = zeros(eltype(first(a.arrays)),length(i1),length(i2),max_n_array)
35+
for (j, it) in enumerate(itime)
36+
arrays_now = a.arrays[a.inds[it]]
37+
for ia in 1:length(arrays_now)
38+
DiskArrays.readblock!(arrays_now[ia],view(buf,:,:,ia),i1,i2)
39+
end
40+
vbuf = view(buf,:,:,1:length(arrays_now))
41+
map!(a.f,view(aout,:,:,j),eachslice(vbuf,dims=(1,2)))
42+
end
43+
end
44+
945

1046
struct BufferGDALBand{T} <: AG.DiskArrays.AbstractDiskArray{T,2}
1147
filename::String
@@ -112,7 +148,7 @@ function DiskArrays.readblock!(b::GDALBand, aout, r::AbstractUnitRange...)
112148
end
113149
=#
114150

115-
function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=true)
151+
function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=:dae)
116152
dates = getdate.(filenames)
117153
@show length(dates)
118154
# Sort the dates and files by DateTime
@@ -122,7 +158,7 @@ function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=true)
122158

123159
#@show sdates
124160
# Put the dates which are 200 seconds apart into groups
125-
if stackgroups
161+
if stackgroups in [:dae, :lazyagg]
126162
groupinds = grouptimes(sdates, 200000)
127163
onefile = first(sfiles)
128164
gd = backendlist[:gdal]
@@ -140,14 +176,20 @@ function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=true)
140176
end
141177

142178
cubelist = CFDiskArray.(group_gdbs, (gdbattrs,))
143-
gcube = diskstack(cubelist)
144-
aggdata = DAE.aggregate_diskarray(gcube, mean skipmissing, (3=> stackindices(sdates),); strategy=:direct)
179+
stackinds = stackindices(sdates)
180+
aggdata = if stackgroups == :dae
181+
gcube = diskstack(cubelist)
182+
aggdata = DAE.aggregate_diskarray(gcube, mean skipmissing, (3=> stackinds,); strategy=:direct)
183+
else
184+
println("Construct lazy diskarray")
185+
LazyAggDiskArray(mean skipmissing, cubelist, stackinds)
186+
end
145187
# data = DiskArrays.ConcatDiskArray(reshape(groupcubes, (1,1,length(groupcubes))))
146188
dates_grouped = [sdates[group[begin]] for group in groupinds]
147189

148190
taxis = DD.Ti(dates_grouped)
149191
gcube = Cube(sfiles[1])
150-
return YAXArray((DD.dims(gcube)[1:2]..., taxis), aggdata, gcube.properties,)
192+
return YAXArray((DD.dims(gcube)[1:2]..., taxis), aggdata, gcube.properties,)
151193
else
152194
#datasets = AG.readraster.(sfiles)
153195
taxis = DD.Ti(sdates)

src/main.jl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using ArgParse
22
using YAXArrays: YAXDefaults
33

4+
45
const argparsesettings = ArgParseSettings()
56

67
ArgParse.parse_item(::Type{Date}, x::AbstractString) = Date(x)
@@ -75,6 +76,7 @@ function main(;
7576
orbit="D",
7677
threshold=3.0,
7778
folders=["V01R01", "V0M2R4", "V1M0R1", "V1M1R1", "V1M1R2"]
79+
stack=:dae
7880
)
7981

8082
in(orbit, ["A", "D"]) || error("Orbit needs to be either A or D")
@@ -100,13 +102,14 @@ function main(;
100102

101103
filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders]
102104
allfilenames = collect(Iterators.flatten(filenamelist))
103-
@show allfilenames
104105

105106
relorbits = unique([split(basename(x), "_")[5][2:end] for x in allfilenames])
106107
@show relorbits
107108
for relorbit in relorbits
108-
filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)]
109-
@time cube = gdalcube(filenames)
109+
for y in years
110+
111+
filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)]
112+
@time cube = gdalcube(filenames, stack)
110113

111114
path = joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)_year_$(y)")
112115
@show path

0 commit comments

Comments
 (0)