Skip to content

Commit 28e0223

Browse files
Felix CremerFelix Cremer
andauthored
Tidy up loading and add classify (#121)
* Replace everything by missing if gdalfile throws error and rechunk the data This will allow to read Sentinel-1 data even though one single time step is not available. * Remove timestats.jl include * Add classifier for RQA trend With this we can save the RQA trend data in UInt8 data and can apply the threshold on that. * Tidy up merge * Add AWSS3 dependency * Use Path consistently in main * Fix tests * Remove AWSS3 This is actually not needed, because we are trying to keep the Path type and don't have to recast it. * Add test for skipmissingmean * Remove duplicate path definition in main * Improve docstrings in rqatrend * Fix main test with actual data * Remove old unused loading path * Remove aggregate_diskarray way as well --------- Co-authored-by: Felix Cremer <felix.cremer@bgc-jena.mpg.de>
1 parent d0863a2 commit 28e0223

File tree

6 files changed

+127
-97
lines changed

6 files changed

+127
-97
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "RQADeforestation"
22
uuid = "3f1d9318-18cc-4ffd-9937-04025ce33b74"
3-
authors = ["Felix Cremer <felix.cremer@bgc-jena.mpg.de>, Daniel Loos <dloos@bgc-jena.mpg.de> and contributors"]
43
version = "1.0.0-DEV"
4+
authors = ["Felix Cremer <felix.cremer@bgc-jena.mpg.de>, Daniel Loos <dloos@bgc-jena.mpg.de> and contributors"]
55

66
[deps]
77
ArchGDAL = "c9ce4bd3-c3d5-55b8-8973-c0e20141b8c3"

src/auxil.jl

Lines changed: 29 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,16 @@ function DiskArrays.readblock!(a::LazyAggDiskArray, aout, i::UnitRange{Int}...)
3434
for (j, it) in enumerate(itime)
3535
arrays_now = a.arrays[a.inds[it]]
3636
for ia in eachindex(arrays_now)
37-
DiskArrays.readblock!(arrays_now[ia], view(buf, :, :, ia), i1, i2)
37+
try
38+
DiskArrays.readblock!(arrays_now[ia], view(buf, :, :, ia), i1, i2)
39+
catch e
40+
if hasproperty(e, :captured) && e.captured.ex isa ArchGDAL.GDAL.GDALError
41+
@warn e.captured.ex.msg
42+
buf[:,:,ia] .= missing
43+
else
44+
rethrow(e)
45+
end
46+
end
3847
end
3948
vbuf = view(buf, :, :, 1:length(arrays_now))
4049
map!(a.f, view(aout, :, :, j), eachslice(vbuf, dims=(1, 2)))
@@ -135,27 +144,7 @@ function stackindices(times, timediff=200000)
135144
return groups
136145
end
137146

138-
#=
139-
function DiskArrays.readblock!(b::GDALBand, aout, r::AbstractUnitRange...)
140-
if !isa(aout,Matrix)
141-
aout2 = similar(aout)
142-
AG.read(b.filename) do ds
143-
AG.getband(ds, b.band) do bh
144-
DiskArrays.readblock!(bh, aout2, r...)
145-
end
146-
end
147-
aout .= aout2
148-
else
149-
AG.read(b.filename) do ds
150-
AG.getband(ds, b.band) do bh
151-
DiskArrays.readblock!(bh, aout, r...)
152-
end
153-
end
154-
end
155-
end
156-
=#
157-
158-
function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=:dae)
147+
function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=:lazyagg)
159148
dates = getdate.(filenames)
160149
@show length(dates)
161150
# Sort the dates and files by DateTime
@@ -165,7 +154,6 @@ function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=:dae)
165154

166155
#@show sdates
167156
# Put the dates which are 200 seconds apart into groups
168-
if stackgroups in [:dae, :lazyagg]
169157
groupinds = grouptimes(sdates, 200000)
170158
onefile = first(sfiles)
171159
gd = backendlist[:gdal]
@@ -184,60 +172,32 @@ function gdalcube(filenames::AbstractVector{<:AbstractString}, stackgroups=:dae)
184172

185173
cubelist = CFDiskArray.(group_gdbs, (gdbattrs,))
186174
stackinds = stackindices(sdates)
187-
aggdata = if stackgroups == :dae
188-
gcube = diskstack(cubelist)
189-
aggdata = DAE.aggregate_diskarray(gcube, mean skipmissing, (3 => stackinds,); strategy=:direct)
190-
else
191-
println("Construct lazy diskarray")
192-
LazyAggDiskArray(mean skipmissing, cubelist, stackinds)
193-
end
175+
println("Construct lazy diskarray")
176+
aggdata = LazyAggDiskArray(skipmissingmean, cubelist, stackinds)
194177
# data = DiskArrays.ConcatDiskArray(reshape(groupcubes, (1,1,length(groupcubes))))
195178
dates_grouped = [sdates[group[begin]] for group in groupinds]
196179

197180
taxis = DD.Ti(dates_grouped)
198181
gcube = Cube(sfiles[1])
199182
return YAXArray((DD.dims(gcube)[1:2]..., taxis), aggdata, gcube.properties,)
200-
else
201-
#datasets = AG.readraster.(sfiles)
202-
taxis = DD.Ti(sdates)
203-
204-
onefile = first(sfiles)
205-
gd = backendlist[:gdal]
206-
yax1 = gd(onefile)
207-
onecube = Cube(onefile)
208-
#@show onecube.axes
209-
gdb = get_var_handle(yax1, "Gray")
183+
end
210184

211-
#@assert gdb isa GDALBand
212-
all_gdbs = map(sfiles) do f
213-
BufferGDALBand{eltype(gdb)}(f, gdb.band, gdb.size, gdb.attrs, gdb.cs, Dict{Int,AG.IRasterBand}())
214-
end
215-
stacked_gdbs = diskstack(all_gdbs)
216-
attrs = copy(gdb.attrs)
217-
#attrs["add_offset"] = Float16(attrs["add_offset"])
218-
if haskey(attrs, "scale_factor")
219-
attrs["scale_factor"] = Float16(attrs["scale_factor"])
220-
end
221-
all_cfs = CFDiskArray(stacked_gdbs, attrs)
222-
return YAXArray((onecube.axes..., taxis), all_cfs, onecube.properties)
185+
function skipmissingmean(x)
186+
isempty(x) && return missing
187+
s,n = reduce(x,init=(zero(eltype(x)),0)) do (s,n), ix
188+
ismissing(ix) ? (s,n) : (s+ix,n+1)
223189
end
224-
#datasetgroups = [datasets[group] for group in groupinds]
225-
#We have to save the vrts because the usage of nested vrts is not working as a rasterdataset
226-
#temp = tempdir()
227-
#outpaths = [joinpath(temp, splitext(basename(sfiles[group][1]))[1] * ".vrt") for group in groupinds]
228-
#vrt_grouped = AG.unsafe_gdalbuildvrt.(datasetgroups)
229-
#AG.write.(vrt_grouped, outpaths)
230-
#vrt_grouped = AG.read.(outpaths)
231-
#vrt_vv = AG.unsafe_gdalbuildvrt(vrt_grouped, ["-separate"])
232-
#rvrt_vv = AG.RasterDataset(vrt_vv)
233-
#yaxras = YAXArray.(sfiles)
234-
#cube = concatenatecubes(yaxras, taxis)
235-
#bandnames = AG.GDAL.gdalgetfilelist(vrt_vv.ptr)
236-
237-
238-
239-
# Set the timesteps from the bandnames as time axis
240-
#dates_grouped = [sdates[group[begin]] for group in groupinds]
190+
n==0 ? missing : s/n
191+
end
192+
193+
# I don't dare to make this type piracy.
194+
#Base.∘(::typeof(mean), ::typeof(skipmissing)) = skipmissingmean
195+
196+
@testitem "skipmissingmean" begin
197+
@test ismissing(RQADeforestation.skipmissingmean(Float32[]))
198+
@test ismissing(RQADeforestation.skipmissingmean(Union{Float32, Missing}[missing, missing]))
199+
@test RQADeforestation.skipmissingmean([1,0,missing]) == 0.5
200+
@test RQADeforestation.skipmissingmean([1,2,3]) == 2.0
241201
end
242202

243203

src/main.jl

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using ArgParse
22
using YAXArrays: YAXDefaults
3+
using FilePathsBase: exists, Path
34

45

56
const argparsesettings = ArgParseSettings()
@@ -66,7 +67,7 @@ end
6667

6768

6869
function main(;
69-
tiles::Vector{String},
70+
tiles,
7071
continent::String,
7172
indir::String,
7273
outdir="out.zarr",
@@ -76,8 +77,10 @@ function main(;
7677
orbit="D",
7778
threshold=3.0,
7879
folders=["V1M0R1", "V1M1R1", "V1M1R2"],
79-
stack=:dae
80+
stack=:lazyagg,
81+
delete_intermediate=false
8082
)
83+
outdir = Path(outdir)
8184
in(orbit, ["A", "D"]) || error("Orbit needs to be either A or D")
8285
if isdir(indir) && isempty(indir)
8386
error("Input directory $indir must not be empty")
@@ -93,23 +96,24 @@ function main(;
9396
@warn "Selected time series does not include a multiple of whole years. This might introduce seasonal bias."
9497
end
9598

96-
YAXDefaults.workdir[] = outdir
9799

98100
corruptedfiles = "corrupted_tiles.txt"
99101
# TODO save the corrupt files to a txt for investigation
100102
for tilefolder in tiles
101103
filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders]
102104
allfilenames = collect(Iterators.flatten(filenamelist))
103-
104105
relorbits = unique([split(basename(x), "_")[5][2:end] for x in allfilenames])
105106
@show relorbits
106107

107108
for relorbit in relorbits
108-
filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)]
109-
@time cube = gdalcube(filenames, stack)
110-
111-
path = joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)")
109+
path = joinpath(outdir, "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)_$(start_date)_$(end_date)")
110+
#s3path = "s3://"*joinpath(outstore.bucket, path)
112111
@show path
112+
exists(path * ".done") && continue
113+
exists(path * "_zerotimesteps.done") && continue
114+
filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)]
115+
@time "cube construction" cube = gdalcube(filenames, stack)
116+
113117
ispath(path * ".done") && continue
114118
ispath(path * "_zerotimesteps.done") && continue
115119

@@ -121,14 +125,41 @@ function main(;
121125
continue
122126
end
123127
try
124-
outpath = path * ".zarr"
125-
@time rqatrend(tcube; thresh=threshold, outpath=outpath, overwrite=true)
126-
Zarr.consolidate_metadata(outpath)
128+
orbitoutpath = path * ".zarr/"
129+
# This is only necessary because overwrite=true doesn't work on S3 based Zarr files in YAXArrays
130+
# See https://github.com/JuliaDataCubes/YAXArrays.jl/issues/511
131+
if exists(orbitoutpath)
132+
println("Deleting path $orbitoutpath")
133+
rm(orbitoutpath, recursive=true)
134+
end
135+
@show orbitoutpath
136+
# We save locally and then save a rechunked version in the cloud,
137+
# because the chunking is suboptimal which we get from the automatic setting.
138+
tmppath = tempname() * ".zarr"
139+
@time "rqatrend" rqatrend(tcube; thresh=threshold, outpath=tmppath, overwrite=true)
140+
c = Cube(tmppath)
141+
@time "save to S3" savecube(setchunks(c, (15000,15000)), string(orbitoutpath))
142+
rm(tmppath, recursive=true)
143+
@show delete_intermediate
144+
if delete_intermediate == false
145+
#PyramidScheme.buildpyramids(orbitoutpath)
146+
Zarr.consolidate_metadata(string(orbitoutpath))
147+
end
127148
catch e
128-
149+
println("inside catch")
129150
if hasproperty(e, :captured) && e.captured.ex isa ArchGDAL.GDAL.GDALError
130-
println("Found GDALError:")
151+
msg = e.captured.ex.msg
152+
corruptfile = split(msg, " ")[1][1:end-1]
153+
corrupt_parts = split(corruptfile, "_")
154+
foldername = corrupt_parts[end-1]
155+
continentfolder = corrupt_parts[end-2]
156+
corruptpath = joinpath(indir, foldername, "EQUI7_$continentfolder", tilefolder, corruptfile)
157+
println("Corrupted input file")
158+
println(corruptpath)
159+
println(joinpath(indir, ))
131160
println(e.captured.ex.msg)
161+
println(corruptedfiles, "Found GDALError:")
162+
println(corruptedfiles, e.captured.ex.msg)
132163
continue
133164
else
134165
rethrow(e)

src/rqatrend.jl

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ using Distances
66
"""rqatrend(cube;thresh=2, path=tempname() * ".zarr")
77
88
Compute the RQA trend metric for the datacube `cube` with the epsilon threshold `thresh`.
9+
`lowerbound` and `upperbound` are forwarded to the classification of the RQA Trend result.
910
"""
10-
function rqatrend(cube; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...)
11-
@show outpath
12-
mapCube(rqatrend, cube, thresh; indims=InDims("Time"), outdims=OutDims(; outtype=Float32, path=outpath, overwrite, kwargs...))
11+
function rqatrend(cube; thresh=2, lowerbound=-5, upperbound=-0.5, outpath=tempname() * ".zarr", overwrite=false, kwargs...)
12+
mapCube(rqatrend, cube, thresh, lowerbound, upperbound; indims=InDims("Time"), outdims=OutDims(; outtype=UInt8, path=outpath, fill_value=255, overwrite, kwargs...))
1313
end
1414

1515
@testitem "rqatrend cube" begin
@@ -32,27 +32,51 @@ end
3232
mock_trend = rqatrend(mock_cube; thresh=0.5)
3333
@test mock_trend.axes == (mock_cube.X, mock_cube.Y)
3434
diff = abs(mean(mock_trend))
35-
@test diff < 0.5
35+
@test diff < 254
3636
end
3737

3838
"""rqatrend(path::AbstractString; thresh=2, outpath=tempname()*".zarr")
3939
4040
Compute the RQA trend metric for the data that is available on `path`.
41+
See the `rqatrend` for a YAXArray for the description of the parameters.
4142
"""
42-
rqatrend(path::AbstractString; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...) =
43-
rqatrend(Cube(path); thresh, outpath, overwrite, kwargs...)
43+
rqatrend(path::AbstractString; thresh=2, lowerbound=-5., upperbound=-0.5, outpath=tempname() * ".zarr", overwrite=false, kwargs...) =
44+
rqatrend(Cube(path); thresh, lowerbound, upperbound, outpath, overwrite, kwargs...)
4445

4546

4647
"""
4748
rqatrend(xout, xin, thresh)
4849
4950
Compute the RQA trend metric for the non-missing time steps of xin, and save it to xout.
50-
`thresh` specifies the epsilon threshold of the Recurrence Plot computation
51+
`thresh` specifies the epsilon threshold of the Recurrence Plot computation.
52+
`lowerbound` and `upperbound` are the bounds of the classification into UInt8.
53+
The result of rqatrend are UInt8 values between 0 (no change) to 254 (definitive change) with 255 as sentinel value for missing data.
5154
"""
52-
function rqatrend(pix_trend, pix, thresh=2)
53-
pix_trend .= rqatrend_impl(pix; thresh)
55+
function rqatrend(pix_trend, pix, thresh=2, lowerbound=-5., upperbound=-0.5)
56+
pix_trend .= classify_rqatrend(rqatrend_impl(pix; thresh); lowerbound, upperbound)
5457
end
5558

59+
"""
60+
classify_rqatrend(trend; lowerbound=Float32(-5.0), upperbound=Float32(-0.5)))
61+
Classify the rqatrend and put it into 254 bins so that they can fit into a UInt8 encoding.
62+
This is a compromise between data storage and accuracy of the change detection.
63+
The value range is 0 (no change) to 254 (definitive change) with 255 kept free as a Sentinel value for missing data.
64+
"""
65+
function classify_rqatrend(trend; lowerbound=Float32(-5.0), upperbound=Float32(-0.5))
66+
isnan(trend) && return UInt8(255)
67+
ctrend = clamp(trend, lowerbound, upperbound)
68+
rlength = upperbound - lowerbound
69+
return round(UInt8, 254-((ctrend - lowerbound) / rlength) * 254)
70+
end
71+
72+
@testitem "classify_rqatrend" begin
73+
import AllocCheck
74+
@test RQADeforestation.classify_rqatrend(-4.999) === UInt8(254)
75+
@test RQADeforestation.classify_rqatrend(1) === UInt8(0)
76+
@test RQADeforestation.classify_rqatrend(-0.52) === UInt8(1)
77+
@test RQADeforestation.classify_rqatrend(-6) === UInt8(254)
78+
@test isempty( AllocCheck.check_allocs(RQADeforestation.classify_rqatrend, (Float32,)))
79+
end
5680

5781
function rqatrend_impl(data; thresh=2, border=10, theiler=1, metric=CheckedEuclidean())
5882
# simplified implementation of https://stats.stackexchange.com/a/370175 and https://github.com/joshday/OnlineStats.jl/blob/b89a99679b13e3047ff9c93a03c303c357931832/src/stats/linreg.jl

test/runtests.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,16 @@ doctest(RQADeforestation)
1010

1111
using TestItemRunner
1212
@run_package_tests
13+
14+
15+
@testitem "rqa step function" begin
16+
using RQADeforestation
17+
using Distributions: Normal as DNormal
18+
x = range(0,100,length=1000)
19+
ts2 = zero(x)
20+
ts2[1:div(end,2)] .= rand.(DNormal(3,1))
21+
ts2[div(end,2):end] .= rand.(DNormal(0,1))
22+
pixtrend = UInt8[255]
23+
RQADeforestation.rqatrend(pixtrend, ts2)
24+
@test pixtrend[] < 20
25+
end

test/testdata.jl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
@testitem "testdata main" begin
33
import Pkg: Artifacts.@artifact_str
44
using LazyArtifacts
5+
using FilePathsBase
56
testdatapath = artifact"rqatestdata/RQADeforestationTestData-2.0"
67

78
testdir = tempname()
89
rm(testdir, recursive=true, force=true)
910
mkpath(testdir)
10-
outdir = "$testdir/out.zarr"
11+
outdir = "$testdir/out"
1112
indir = "$testdir/in"
1213
cp(testdatapath, indir)
1314

@@ -20,13 +21,14 @@
2021
indir=indir,
2122
start_date=Date("2021-01-01"),
2223
end_date=Date("2022-01-01"),
23-
outdir=outdir
24+
outdir=Path(outdir),
25+
stack=:lazyagg,
2426
)
25-
a = open_dataset(outdir * "/E051N018T3_rqatrend_VH_D022_thresh_3.0.zarr").layer
27+
a = open_dataset(joinpath(outdir, "E051N018T3_rqatrend_VH_D022_thresh_3.0_2021-01-01_2022-01-01.zarr")).layer
2628

2729
@test size(a) == (50, 74)
28-
@test minimum(a) < 0
29-
@test maximum(a) > 0
30+
@test minimum(a) == 0
31+
@test maximum(a) > 200
3032
end
3133

3234
@testitem "testdata julia_main" begin

0 commit comments

Comments
 (0)