From 4ebc846b26e268da0c4189d7d47bddcc71d3043d Mon Sep 17 00:00:00 2001 From: Felix Cremer Date: Thu, 10 Apr 2025 22:59:47 +0200 Subject: [PATCH 1/2] Add clustering code --- Project.toml | 4 +++ src/RQADeforestation.jl | 1 + src/cluster.jl | 70 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) create mode 100644 src/cluster.jl diff --git a/Project.toml b/Project.toml index d2c02e0..90a8b0d 100644 --- a/Project.toml +++ b/Project.toml @@ -7,6 +7,7 @@ version = "1.0.0-DEV" ArchGDAL = "c9ce4bd3-c3d5-55b8-8973-c0e20141b8c3" ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63" ConstructionBase = "187b0558-2788-49d3-abe0-74a17ed4e7c9" +DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" DimensionalData = "0703355e-b756-11e9-17c0-8b28908087d0" DiskArrayEngine = "2d4b2e14-ccd6-4284-b8b0-2378ace7c126" @@ -20,6 +21,7 @@ FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b" GDAL = "add2ef01-049f-52c4-9ee2-e494f65e021a" GeoFormatTypes = "68eda718-8dee-11e9-39e7-89f7f65f511f" Glob = "c27321d9-0574-5035-807b-f59d2c89b15c" +ImageMorphology = "787d08f9-d448-5407-9aad-5290dd7ab264" KML = "1284bf3a-1e3d-4f4e-a7a9-b9d235a28f35" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" @@ -43,6 +45,7 @@ Zarr = "0a941bbe-ad1d-11e8-39d9-ab76183a1d99" ArchGDAL = "0.10" ArgParse = "1" ConstructionBase = "1" +DataStructures = "0.18.22" DimensionalData = "0.29" DiskArrayEngine = "0.1, 0.2" DiskArrayTools = "0.1" @@ -54,6 +57,7 @@ FillArrays = "1" GDAL = "1" GeoFormatTypes = "0.4" Glob = "1" +ImageMorphology = "0.4.5" KML = "0.2" LinearAlgebra = "1.10" LoggingExtras = "1" diff --git a/src/RQADeforestation.jl b/src/RQADeforestation.jl index c409457..652c059 100644 --- a/src/RQADeforestation.jl +++ b/src/RQADeforestation.jl @@ -15,6 +15,7 @@ include("metrics.jl") include("auxil.jl") include("rqatrend.jl") include("analysis.jl") # TODO what is still needed from analysis now that rqatrend is in its own file? +include("cluster.jl") include("timestats.jl") include("main.jl") diff --git a/src/cluster.jl b/src/cluster.jl new file mode 100644 index 0000000..25b8b3a --- /dev/null +++ b/src/cluster.jl @@ -0,0 +1,70 @@ +using ImageMorphology, Zarr, YAXArrays +import DimensionalData as DD +import DiskArrayEngine as DAE +using DataStructures: counter +using Statistics +using FillArrays + +function meanvote(orbits, significance_thresh=-1.28) + s,n = 0.0,0 + for i in eachindex(orbits) + if orbits[i] != 0 + s += orbits[i] + n += 1 + end + end + m = s/n + m < significance_thresh ? 1 : 0 +end + +function filtersmallcomps!(xout,xin,comborbits,connsize;dims=:,threaded=false) + x = similar(Array{Float64},(axes(xin,1),axes(xin,2),Base.OneTo(1))) + for j in axes(x,2), i in axes(x,1) + x[i,j,1] = comborbits(view(xin,i,j,:)) + end + lc = label_components(x) + c = counter(lc) + for ix in eachindex(xout) + v = lc[ix] + if v==0 || c[v] < connsize + xout[ix] = 0 + else + xout[ix] = 1 + end + end +end + +function postprocess(a,target_array::YAXArray,orbitcombine=meanvote;minsize=30,max_cache=5e8) + nx,ny,nz = size(a) + windowsx = DAE.MovingWindow(1 - minsize,1,2*minsize + 1,nx,(1,nx)) + windowsy = DAE.MovingWindow(1 - minsize,1,2*minsize + 1,ny,(1,ny)) + windowsz = [1:nz] + inars = DAE.InputArray.((a.data,),windows=(windowsx,windowsy,windowsz)); + outchunks = (target_array.chunks.chunks...,DAE.RegularChunks(1,0,1)) + outars = DAE.create_outwindows((nx,ny,1),chunks=outchunks); + uf = DAE.create_userfunction(filtersmallcomps!,UInt8,is_blockfunction=true,is_mutating=true,args=(orbitcombine,minsize)) + op = DAE.GMDWop(inars,(outars,),uf) + plan = DAE.optimize_loopranges(op,max_cache) + runner=DAE.LocalRunner(op,plan,(reshape(target_array.data,(nx,ny,1)),)) + run(runner) + target_array +end + +# Open input data +#= +p = "/Net/Groups/BGI/work_5/scratch/fgans/germany_2020/" +outpath = "./output.zarr/" +orbits = readdir(p) +orbitname = map(o->split(basename(o),'_')[4],orbits) +d = DD.format(Dim{:orbit}(orbitname)) +files = DD.DimArray(orbits,d) +ds = open_mfdataset(files) +#Prepare output dataset to write into, this might also be a view into an existing dataset +nx,ny = size(ds.layer) +outds_skeleton = Dataset(;defo=YAXArray((ds.X,ds.Y),Fill(UInt8(0),nx,ny),chunks=DAE.GridChunks((nx,ny),(256,256)))) +dsout = savedataset(outds_skeleton,path=outpath,skeleton=true,overwrite=true) + +#Call the postprocess function +postprocess(ds.layer,dsout.defo) + +=# \ No newline at end of file From 455d81ebb6d05273a147ccc535d9daab9776cff2 Mon Sep 17 00:00:00 2001 From: Felix Cremer Date: Mon, 18 Aug 2025 14:33:22 +0200 Subject: [PATCH 2/2] Add postprocessing and save data to the minio server --- Project.toml | 7 +++ src/RQADeforestation.jl | 2 + src/cluster.jl | 40 +++++++++++++++-- src/main.jl | 98 ++++++++++++++++++++++++++++++++--------- src/rqatrend.jl | 3 +- 5 files changed, 125 insertions(+), 25 deletions(-) diff --git a/Project.toml b/Project.toml index 90a8b0d..70e6921 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Felix Cremer , Daniel Loos occursin(tile,string(x)), allfiles) + @show orbitfiles + orbits = filter(x->occursin(".zarr", string(x)), orbitfiles) + @show orbits + orbitname = map(o->split(basename(o),'_')[4],orbits) + d = DD.format(Dim{:orbit}(orbitname)) + files = DD.DimArray(orbits,d) + ds = open_mfdataset(string.(files)) + nx, ny = size(ds.layer) + outds_skeleton = Dataset(;defo=YAXArray((ds.X,ds.Y),Fill(UInt8(0),nx,ny),chunks=DAE.GridChunks((nx,ny),(256,256)))) + dsout = savedataset(outds_skeleton,path=outpath,skeleton=true,overwrite=true) + forest = Cube(forpath) + #masked = map(*, ds.layer, setchunks(forest,DiskArrays.eachchunk(ds.layer.chunks))) + @time postprocess(ds.layer,dsout.defo, forest) + @time PyramidScheme.buildpyramids(outpath) +end + # Open input data #= p = "/Net/Groups/BGI/work_5/scratch/fgans/germany_2020/" diff --git a/src/main.jl b/src/main.jl index 3d535f6..c09626c 100644 --- a/src/main.jl +++ b/src/main.jl @@ -1,7 +1,16 @@ using ArgParse using YAXArrays: YAXDefaults +using ArchGDAL: ArchGDAL +using PyramidScheme +using AWSS3: global_aws_config, S3Path +using FilePathsBase: exists +#using YAXArrays, Zarr +using Minio: MinioConfig + +global_aws_config(MinioConfig("http://s3.fairsendd.eodchosting.eu",region="us-east-1")) + const argparsesettings = ArgParseSettings() ArgParse.parse_item(::Type{Date}, x::AbstractString) = Date(x) @@ -69,72 +78,121 @@ function main(; tiles::Vector{String}, continent::String, indir::String, - outdir="out.zarr", + outstore=Zarr.S3Store("europe-forest-change"), + outdir="results", + tempfolder = S3Path(outstore.bucket, "intermediates/"), start_date::Date, end_date::Date, polarisation="VH", orbit="D", threshold=3.0, folders=["V1M0R1", "V1M1R1", "V1M1R2"], - stack=:dae + stack=:dae, + postprocess=true, + forestdir="data/forest20m_new", + delete_intermediate=false ) +#global_aws_config(MinioConfig("http://s3.fairsendd.eodchosting.eu",region="us-east-1",username="ufew8gJku5hRY7VD6jbEjRi8VnvDfeEv",password="dqZdzWCLB7a9gTshL29AnQWGqL3krwnS")) in(orbit, ["A", "D"]) || error("Orbit needs to be either A or D") if isdir(indir) && isempty(indir) error("Input directory $indir must not be empty") end - if isdir(outdir) + + if isdir(tempfolder) @warn "Resume from existing output directory" else - mkdir(outdir) + mkdir(tempfolder, recursive=true) @info "Write output to $outdir" end - if monthday(start_date) != monthday(end_date) @warn "Selected time series does not include a multiple of whole years. This might introduce seasonal bias." end - YAXDefaults.workdir[] = outdir + YAXDefaults.workdir[] = tempfolder + @show typeof(tempfolder) - corruptedfiles = "corrupted_tiles.txt" + corruptedfiles = open("corrupted_tiles.txt", "w") # TODO save the corrupt files to a txt for investigation for tilefolder in tiles - filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders] + @show tilefolder + outpath = joinpath(outdir, "postprocess_$tilefolder.zarr/") + @show outpath + if outpath in Zarr.subdirs(outstore, outdir) + println("Skip already processed tile $tilefolder") + continue + end + sub = first(folders) + #@show glob("$(sub)/*$(continent)*20M/$(tilefolder)*/*$(polarisation)_$(orbit)*.tif", indir) + filenamelist = [glob("$(sub)/*$(continent)*20M/$(tilefolder)*/*$(polarisation)_$(orbit)*.tif", indir) for sub in folders] allfilenames = collect(Iterators.flatten(filenamelist)) - + #@show allfilenames relorbits = unique([split(basename(x), "_")[5][2:end] for x in allfilenames]) @show relorbits for relorbit in relorbits + path = S3Path(joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)")) + #s3path = "s3://"*joinpath(outstore.bucket, path) + @show path + exists(path * ".done") && continue + exists(path * "_zerotimesteps.done") && continue filenames = allfilenames[findall(contains("$(relorbit)_E"), allfilenames)] @time cube = gdalcube(filenames, stack) + - path = joinpath(YAXDefaults.workdir[], "$(tilefolder)_rqatrend_$(polarisation)_$(orbit)$(relorbit)_thresh_$(threshold)") - @show path - ispath(path * ".done") && continue - ispath(path * "_zerotimesteps.done") && continue tcube = cube[Time=start_date .. end_date] @show size(cube) @show size(tcube) if size(tcube, 3) == 0 - touch(path * "_zerotimesteps.done") + touch(S3Path(path * "_zerotimesteps.done")) continue end try - outpath = path * ".zarr" - @time rqatrend(tcube; thresh=threshold, outpath=outpath, overwrite=true) - Zarr.consolidate_metadata(outpath) + orbitoutpath = string(path * ".zarr/") + # This is only necessary because overwrite=true doesn't work on S3 based Zarr files in YAXArrays + # See https://github.com/JuliaDataCubes/YAXArrays.jl/issues/511 + if exists(S3Path(orbitoutpath)) + println("Deleting path $orbitoutpath") + rm(S3Path(orbitoutpath), recursive=true) + end + @show orbitoutpath + # This seems to ignore the overwrite keyword when the outpath point to S3. + @time rqatrend(tcube; thresh=threshold, outpath=orbitoutpath, overwrite=true) + if delete_intermediate == false + PyramidScheme.buildpyramids(orbitoutpath) + Zarr.consolidate_metadata(orbitoutpath) + end catch e if hasproperty(e, :captured) && e.captured.ex isa ArchGDAL.GDAL.GDALError - println("Found GDALError:") - println(e.captured.ex.msg) + println(corruptedfiles, "Found GDALError:") + println(corruptedfiles, e.captured.ex.msg) continue else rethrow(e) end end - touch(path * ".done") + donepath = path * ".done" + @show donepath + touch(S3Path(path * ".done")) + end + if postprocess + @show outpath + DD.STRICT_BROADCAST_CHECKS[] = false + + RQADeforestation.postprocess(tilefolder, tempfolder, outpath, forestdir) + Zarr.consolidate_metadata(outpath) + DD.STRICT_BROADCAST_CHECKS[] = true + #base = basename(outpath) + #@show base + #command = `aws --endpoint-url http://s3.fairsendd.eodchosting.eu s3 cp --recursive $outpath s3://europe-forest-change/$base` + + #run(command) end + if delete_intermediate + rm(tempfolder, force=true, recursive=true) + end + + end end \ No newline at end of file diff --git a/src/rqatrend.jl b/src/rqatrend.jl index cc388cf..90fce08 100644 --- a/src/rqatrend.jl +++ b/src/rqatrend.jl @@ -39,7 +39,8 @@ end Compute the RQA trend metric for the data that is available on `path`. """ -rqatrend(path::AbstractString; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...) = rqatrend(Cube(path); thresh, outpath, overwrite, kwargs...) +rqatrend(path::AbstractString; thresh=2, outpath=tempname() * ".zarr", overwrite=false, kwargs...) = + rqatrend(Cube(path); thresh, outpath, overwrite, kwargs...) """