Skip to content

Commit f66e269

Browse files
felixcremerFelix Cremer
andauthored
Add clustering code and add postprocessing (#111)
* Add clustering code * Add postprocessing and save data to S3 store --------- Co-authored-by: Felix Cremer <[email protected]>
1 parent 042d19e commit f66e269

File tree

5 files changed

+197
-21
lines changed

5 files changed

+197
-21
lines changed

Project.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ authors = ["Felix Cremer <[email protected]>, Daniel Loos <dloos@bgc-
44
version = "1.0.0-DEV"
55

66
[deps]
7+
AWSS3 = "1c724243-ef5b-51ab-93f4-b0a88ac62a95"
78
ArchGDAL = "c9ce4bd3-c3d5-55b8-8973-c0e20141b8c3"
89
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
910
ConstructionBase = "187b0558-2788-49d3-abe0-74a17ed4e7c9"
11+
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
1012
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
1113
DimensionalData = "0703355e-b756-11e9-17c0-8b28908087d0"
1214
DiskArrayEngine = "2d4b2e14-ccd6-4284-b8b0-2378ace7c126"
@@ -16,18 +18,22 @@ Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
1618
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
1719
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
1820
Extents = "411431e0-e8b7-467b-b5e0-f676ba4f2910"
21+
FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f"
1922
FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b"
2023
GDAL = "add2ef01-049f-52c4-9ee2-e494f65e021a"
2124
GeoFormatTypes = "68eda718-8dee-11e9-39e7-89f7f65f511f"
2225
Glob = "c27321d9-0574-5035-807b-f59d2c89b15c"
26+
ImageMorphology = "787d08f9-d448-5407-9aad-5290dd7ab264"
2327
KML = "1284bf3a-1e3d-4f4e-a7a9-b9d235a28f35"
2428
LazyArtifacts = "4af54fe1-eca0-43a8-85a7-787d91b784e3"
2529
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
2630
LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
31+
Minio = "4281f0d9-7ae0-406e-9172-b7277c1efa20"
2732
Missings = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28"
2833
NetCDF = "30363a11-5582-574a-97bb-aa9a979735b9"
2934
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
3035
Proj = "c94c279d-25a6-4763-9509-64d165bea63e"
36+
PyramidScheme = "ec211b67-1c2c-4319-878f-eaee078ee145"
3137
Rasters = "a3a2b9e3-a471-40c9-b274-f788e487c689"
3238
RecurrenceAnalysis = "639c3291-70d9-5ea2-8c5b-839eba1ee399"
3339
StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
@@ -41,12 +47,14 @@ YAXArrays = "c21b50f5-aa40-41ea-b809-c0f5e47bfa5c"
4147
Zarr = "0a941bbe-ad1d-11e8-39d9-ab76183a1d99"
4248

4349
[compat]
50+
AWSS3 = "0.10.4"
4451
AllocCheck = "0.2.2"
4552
Aqua = "0.8.13"
4653
ArchGDAL = "0.10"
4754
ArgParse = "1"
4855
BenchmarkTools = "1.6.0"
4956
ConstructionBase = "1"
57+
DataStructures = "0.18.22"
5058
Dates = "1.10"
5159
DimensionalData = "0.29"
5260
DiskArrayEngine = "0.1, 0.2"
@@ -57,15 +65,18 @@ Distributed = "1.10"
5765
Distributions = "0.25"
5866
Documenter = "1.12.0"
5967
Extents = "0.1"
68+
FilePathsBase = "0.9.24"
6069
FillArrays = "1"
6170
GDAL = "1"
6271
GeoFormatTypes = "0.4"
6372
Glob = "1"
73+
ImageMorphology = "0.4.5"
6474
KML = "0.2"
6575
LazyArtifacts = "1.11.0"
6676
Libdl = "1.10"
6777
LinearAlgebra = "1.10"
6878
LoggingExtras = "1"
79+
Minio = "0.2.2"
6980
Missings = "1"
7081
NetCDF = "0.12"
7182
Pkg = "1.10"

src/RQADeforestation.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ using Zarr
88
using Distributed: myid
99
using NetCDF
1010
using TestItems
11+
using PyramidScheme: PyramidScheme
12+
using DimensionalData: DimensionalData as DD
1113

1214
export gdalcube, rqatrend
1315

1416
include("metrics.jl")
1517
include("auxil.jl")
1618
include("rqatrend.jl")
1719
include("analysis.jl") # TODO what is still needed from analysis now that rqatrend is in its own file?
20+
include("cluster.jl")
21+
include("timestats.jl")
1822
include("main.jl")
1923

2024
end

src/cluster.jl

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using ImageMorphology, Zarr, YAXArrays
2+
import DimensionalData as DD
3+
import DiskArrayEngine as DAE
4+
using DataStructures: counter
5+
using Statistics
6+
using FillArrays
7+
#maskfolder = "data/forestcompressed"
8+
9+
function meanvote(orbits, significance_thresh=-1.28)
10+
s,n = 0.0,0
11+
for i in eachindex(orbits)
12+
if orbits[i] != 0 && !isnan(orbits[i])
13+
s += orbits[i]
14+
n += 1
15+
end
16+
end
17+
m = s/n
18+
m < significance_thresh ? 1 : 0
19+
end
20+
21+
function filtersmallcomps!(xout,xin_unfiltered,forestmask,comborbits,connsize;dims=:,threaded=false)
22+
xin = broadcast(xin_unfiltered,forestmask) do x,m
23+
ismissing(m) ? zero(x) : x*m
24+
end
25+
x = similar(Array{Float64},(axes(xin,1),axes(xin,2),Base.OneTo(1)))
26+
for j in axes(x,2), i in axes(x,1)
27+
x[i,j,1] = comborbits(view(xin,i,j,:))
28+
end
29+
lc = label_components(x)
30+
c = counter(lc)
31+
for ix in eachindex(xout)
32+
v = lc[ix]
33+
if v==0 || c[v] < connsize
34+
xout[ix] = 0
35+
else
36+
xout[ix] = 1
37+
end
38+
end
39+
end
40+
41+
function postprocess(a,target_array::YAXArray,forestmask::YAXArray, orbitcombine=meanvote;minsize=30,max_cache=5e8)
42+
nx,ny,nz = size(a)
43+
windowsx = DAE.MovingWindow(1 - minsize,1,2*minsize + 1,nx,(1,nx))
44+
windowsy = DAE.MovingWindow(1 - minsize,1,2*minsize + 1,ny,(1,ny))
45+
windowsz = [1:nz]
46+
inar1 = DAE.InputArray(a.data,windows=(windowsx,windowsy,windowsz));
47+
inar2 = DAE.InputArray(forestmask.data, windows=(windowsx,windowsy));
48+
inars = (inar1,inar2)
49+
outchunks = (target_array.chunks.chunks...,DAE.RegularChunks(1,0,1))
50+
outars = DAE.create_outwindows((nx,ny,1),chunks=outchunks);
51+
uf = DAE.create_userfunction(filtersmallcomps!,UInt8,is_blockfunction=true,is_mutating=true,args=(orbitcombine,minsize))
52+
op = DAE.GMDWop(inars,(outars,),uf)
53+
plan = DAE.optimize_loopranges(op,max_cache)
54+
runner=DAE.LocalRunner(op,plan,(reshape(target_array.data,(nx,ny,1)),))
55+
run(runner)
56+
target_array
57+
end
58+
59+
function postprocess(tile::AbstractString, indir, outpath, maskfolder)
60+
if isdir(outpath)
61+
return
62+
end
63+
64+
forpath = only(glob("*$tile*", maskfolder))
65+
@show forpath
66+
allfiles = readdir(indir, join=true)
67+
@show allfiles
68+
orbitfiles = filter(x->occursin(tile,string(x)), allfiles)
69+
@show orbitfiles
70+
orbits = filter(x->occursin(".zarr", string(x)), orbitfiles)
71+
@show orbits
72+
orbitname = map(o->split(basename(o),'_')[4],orbits)
73+
d = DD.format(Dim{:orbit}(orbitname))
74+
files = DD.DimArray(orbits,d)
75+
ds = open_mfdataset(string.(files))
76+
nx, ny = size(ds.layer)
77+
outds_skeleton = Dataset(;defo=YAXArray((ds.X,ds.Y),Fill(UInt8(0),nx,ny),chunks=DAE.GridChunks((nx,ny),(256,256))))
78+
dsout = savedataset(outds_skeleton,path=outpath,skeleton=true,overwrite=true)
79+
forest = Cube(forpath)
80+
#masked = map(*, ds.layer, setchunks(forest,DiskArrays.eachchunk(ds.layer.chunks)))
81+
@time postprocess(ds.layer,dsout.defo, forest)
82+
@time PyramidScheme.buildpyramids(outpath)
83+
end
84+
85+
# Open input data
86+
#=
87+
p = "/Net/Groups/BGI/work_5/scratch/fgans/germany_2020/"
88+
outpath = "./output.zarr/"
89+
orbits = readdir(p)
90+
orbitname = map(o->split(basename(o),'_')[4],orbits)
91+
d = DD.format(Dim{:orbit}(orbitname))
92+
files = DD.DimArray(orbits,d)
93+
ds = open_mfdataset(files)
94+
#Prepare output dataset to write into, this might also be a view into an existing dataset
95+
nx,ny = size(ds.layer)
96+
outds_skeleton = Dataset(;defo=YAXArray((ds.X,ds.Y),Fill(UInt8(0),nx,ny),chunks=DAE.GridChunks((nx,ny),(256,256))))
97+
dsout = savedataset(outds_skeleton,path=outpath,skeleton=true,overwrite=true)
98+
99+
#Call the postprocess function
100+
postprocess(ds.layer,dsout.defo)
101+
102+
=#

src/main.jl

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
using ArgParse
22
using YAXArrays: YAXDefaults
3+
using ArchGDAL: ArchGDAL
4+
using PyramidScheme
5+
using AWSS3: global_aws_config, S3Path
6+
using FilePathsBase: exists
37

48

9+
#using YAXArrays, Zarr
10+
using Minio: MinioConfig
11+
12+
global_aws_config(MinioConfig("http://s3.fairsendd.eodchosting.eu",region="us-east-1"))
13+
514
const argparsesettings = ArgParseSettings()
615

716
ArgParse.parse_item(::Type{Date}, x::AbstractString) = Date(x)
@@ -69,72 +78,121 @@ function main(;
6978
tiles::Vector{String},
7079
continent::String,
7180
indir::String,
72-
outdir="out.zarr",
81+
outstore=Zarr.S3Store("europe-forest-change"),
82+
outdir="results",
83+
tempfolder = S3Path(outstore.bucket, "intermediates/"),
7384
start_date::Date,
7485
end_date::Date,
7586
polarisation="VH",
7687
orbit="D",
7788
threshold=3.0,
7889
folders=["V1M0R1", "V1M1R1", "V1M1R2"],
79-
stack=:dae
90+
stack=:dae,
91+
postprocess=true,
92+
forestdir="data/forest20m_new",
93+
delete_intermediate=false
8094
)
95+
#global_aws_config(MinioConfig("http://s3.fairsendd.eodchosting.eu",region="us-east-1",username="ufew8gJku5hRY7VD6jbEjRi8VnvDfeEv",password="dqZdzWCLB7a9gTshL29AnQWGqL3krwnS"))
8196
in(orbit, ["A", "D"]) || error("Orbit needs to be either A or D")
8297
if isdir(indir) && isempty(indir)
8398
error("Input directory $indir must not be empty")
8499
end
85-
if isdir(outdir)
100+
101+
if isdir(tempfolder)
86102
@warn "Resume from existing output directory"
87103
else
88-
mkdir(outdir)
104+
mkdir(tempfolder, recursive=true)
89105
@info "Write output to $outdir"
90106
end
91-
92107
if monthday(start_date) != monthday(end_date)
93108
@warn "Selected time series does not include a multiple of whole years. This might introduce seasonal bias."
94109
end
95110

96-
YAXDefaults.workdir[] = outdir
111+
YAXDefaults.workdir[] = tempfolder
112+
@show typeof(tempfolder)
97113

98-
corruptedfiles = "corrupted_tiles.txt"
114+
corruptedfiles = open("corrupted_tiles.txt", "w")
99115
# TODO save the corrupt files to a txt for investigation
100116
for tilefolder in tiles
101-
filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders]
117+
@show tilefolder
118+
outpath = joinpath(outdir, "postprocess_$tilefolder.zarr/")
119+
@show outpath
120+
if outpath in Zarr.subdirs(outstore, outdir)
121+
println("Skip already processed tile $tilefolder")
122+
continue
123+
end
124+
sub = first(folders)
125+
#@show glob("$(sub)/*$(continent)*20M/$(tilefolder)*/*$(polarisation)_$(orbit)*.tif", indir)
126+
filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)*/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders]
102127
allfilenames = collect(Iterators.flatten(filenamelist))
103-
128+
#@show allfilenames
104129
relorbits = unique([split(basename(x), "_")[5][2:end] for x in allfilenames])
105130
@show relorbits
106131

107132
for relorbit in relorbits
133+
path = S3Path(joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)"))
134+
#s3path = "s3://"*joinpath(outstore.bucket, path)
135+
@show path
136+
exists(path * ".done") && continue
137+
exists(path * "_zerotimesteps.done") && continue
108138
filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)]
109139
@time cube = gdalcube(filenames, stack)
140+
110141

111-
path = joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)")
112-
@show path
113-
ispath(path * ".done") && continue
114-
ispath(path * "_zerotimesteps.done") && continue
115142

116143
tcube = cube[Time=start_date .. end_date]
117144
@show size(cube)
118145
@show size(tcube)
119146
if size(tcube, 3) == 0
120-
touch(path * "_zerotimesteps.done")
147+
touch(S3Path(path * "_zerotimesteps.done"))
121148
continue
122149
end
123150
try
124-
outpath = path * ".zarr"
125-
@time rqatrend(tcube; thresh=threshold, outpath=outpath, overwrite=true)
126-
Zarr.consolidate_metadata(outpath)
151+
orbitoutpath = string(path * ".zarr/")
152+
# This is only necessary because overwrite=true doesn't work on S3 based Zarr files in YAXArrays
153+
# See https://github.com/JuliaDataCubes/YAXArrays.jl/issues/511
154+
if exists(S3Path(orbitoutpath))
155+
println("Deleting path $orbitoutpath")
156+
rm(S3Path(orbitoutpath), recursive=true)
157+
end
158+
@show orbitoutpath
159+
# This seems to ignore the overwrite keyword when the outpath point to S3.
160+
@time rqatrend(tcube; thresh=threshold, outpath=orbitoutpath, overwrite=true)
161+
if delete_intermediate == false
162+
PyramidScheme.buildpyramids(orbitoutpath)
163+
Zarr.consolidate_metadata(orbitoutpath)
164+
end
127165
catch e
128166

129167
if hasproperty(e, :captured) && e.captured.ex isa ArchGDAL.GDAL.GDALError
130-
println("Found GDALError:")
131-
println(e.captured.ex.msg)
168+
println(corruptedfiles, "Found GDALError:")
169+
println(corruptedfiles, e.captured.ex.msg)
132170
continue
133171
else
134172
rethrow(e)
135173
end
136174
end
137-
touch(path * ".done")
175+
donepath = path * ".done"
176+
@show donepath
177+
touch(S3Path(path * ".done"))
178+
end
179+
if postprocess
180+
@show outpath
181+
DD.STRICT_BROADCAST_CHECKS[] = false
182+
183+
RQADeforestation.postprocess(tilefolder, tempfolder, outpath, forestdir)
184+
Zarr.consolidate_metadata(outpath)
185+
DD.STRICT_BROADCAST_CHECKS[] = true
186+
#base = basename(outpath)
187+
#@show base
188+
#command = `aws --endpoint-url http://s3.fairsendd.eodchosting.eu s3 cp --recursive $outpath s3://europe-forest-change/$base`
189+
190+
#run(command)
138191
end
192+
if delete_intermediate
193+
rm(tempfolder, force=true, recursive=true)
194+
end
195+
196+
139197
end
140198
end

src/rqatrend.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ end
3838
3939
Compute the RQA trend metric for the data that is available on `path`.
4040
"""
41-
rqatrend(path::AbstractString; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...) = rqatrend(Cube(path); thresh, outpath, overwrite, kwargs...)
41+
rqatrend(path::AbstractString; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...) =
42+
rqatrend(Cube(path); thresh, outpath, overwrite, kwargs...)
4243

4344

4445
"""

0 commit comments

Comments
 (0)