1+ from collections .abc import Generator
12import shutil
23import time
34
67from tempfile import TemporaryDirectory
78
89import numpy as np
9- import xarray as xr
10+ import openeo
1011import rasterio
12+ import xarray as xr
13+ import pyproj
14+ import shapely
15+ from shapely import wkt
16+ from enhancement_tools .time_measurement import Timer
1117
12- import openeo
1318from openeo .udf import execute_local_udf
1419
15- from efast import s2_processing_openeo , s2_processing
20+ from efast import s2_processing , s2_processing_openeo , s3_processing
1621
1722TEST_DATA_ROOT = Path (__file__ ).parent .parent / "test_data"
1823TEST_DATA_S2 = TEST_DATA_ROOT / "S2"
1924
25+ TEST_DATA_S3 = TEST_DATA_ROOT / "S3"
26+
2027TEST_DATA_S2_NETCDF = (
2128 TEST_DATA_S2
2229 / "netcdf"
2633TEST_DATE = "20220618"
2734TEST_DATE_DASH = "2022-06-18"
2835
36+ SKIP_LOCAL = True
37+ SMALL_AREA = True
38+
39+
2940@contextmanager
30- def create_temp_dir_and_copy_files (source , sub = "." , pattern : str | None = "*" ):
41+ def create_temp_dir_and_copy_files (
42+ source , sub = "." , pattern : str | None = "*"
43+ ) -> Generator [TemporaryDirectory ]:
3144 import sys
3245
3346 tmp = TemporaryDirectory ()
@@ -41,6 +54,9 @@ def create_temp_dir_and_copy_files(source, sub=".", pattern: str | None = "*"):
4154 tmp .cleanup ()
4255
4356
57+ ############################## S2 Processing ##############################
58+
59+
4460def test_distance_to_cloud ():
4561 with create_temp_dir_and_copy_files (TEST_DATA_S2 , sub = "raw/" , pattern = None ) as tmp :
4662 tmp_path = Path (tmp .name )
@@ -49,7 +65,9 @@ def test_distance_to_cloud():
4965
5066 # reference
5167
52- s2_processing .extract_mask_s2_bands (TEST_DATA_S2 / "raw" , tif_path , resolution = 20 )
68+ s2_processing .extract_mask_s2_bands (
69+ TEST_DATA_S2 / "raw" , tif_path , resolution = 20
70+ )
5371 s2_processing .distance_to_clouds (tif_path , ratio = 15 , tolerance_percentage = 0.05 )
5472 try :
5573 reference_path = next (tif_path .glob (f"*{ TEST_DATE } *DIST_CLOUD.tif" ))
@@ -71,21 +89,29 @@ def test_distance_to_cloud():
7189 conn = s2_processing_openeo .connect ()
7290 conn .authenticate_oidc ()
7391
74- test_area = s2_processing_openeo .TestArea (bbox = bounds , s2_bands = ["SCL" ], temporal_extent = (TEST_DATE_DASH , TEST_DATE_DASH ))
92+ test_area = s2_processing_openeo .TestArea (
93+ bbox = bounds ,
94+ s2_bands = ["SCL" ],
95+ temporal_extent = (TEST_DATE_DASH , TEST_DATE_DASH ),
96+ )
7597 cube = test_area .get_s2_cube (conn )
7698
77- dtc_input = s2_processing_openeo .calculate_large_grid_cloud_mask (cube , tolerance_percentage = 0.05 , grid_side_length = 300 )
78- dtc = s2_processing_openeo .distance_to_clouds (dtc_input , tolerance_percentage = 0.05 , ratio = 30 )
99+ dtc_input = s2_processing_openeo .calculate_large_grid_cloud_mask (
100+ cube , tolerance_percentage = 0.05 , grid_side_length = 300
101+ )
102+ dtc = s2_processing_openeo .distance_to_clouds (
103+ dtc_input , tolerance_percentage = 0.05 , ratio = 30
104+ )
79105 download_path = tmp_path / "test_distance_to_cloud.tif"
80106
81107 print ("openEO execution" )
82108
83109 # intermediate results for debugging
84110 BASE_DIR = Path ("openeo_results" )
85- #BASE_DIR.mkdir(exist_ok=True)
86- #cloud_mask.download(BASE_DIR / "cloud_mask.tif")
87- #cloud_mask_resampled.download(BASE_DIR / "cloud_mask_resampled.tif")
88- #dtc_input.download(BASE_DIR / "dtc_input.tif")
111+ # BASE_DIR.mkdir(exist_ok=True)
112+ # cloud_mask.download(BASE_DIR / "cloud_mask.tif")
113+ # cloud_mask_resampled.download(BASE_DIR / "cloud_mask_resampled.tif")
114+ # dtc_input.download(BASE_DIR / "dtc_input.tif")
89115
90116 before = time .perf_counter ()
91117 dtc .download (download_path )
@@ -117,17 +143,21 @@ def test_distance_to_cloud_synthetic_cube():
117143 cube_resampled = xr .DataArray (cube_resampled , dims = ["x" , "y" ])
118144
119145 udf = openeo .UDF .from_file ("efast/distance_transform_udf.py" )
120- dtc_local_udf = execute_local_udf (udf , cube_resampled ).get_datacube_list ()[0 ].get_array ()
146+ dtc_local_udf = (
147+ execute_local_udf (udf , cube_resampled ).get_datacube_list ()[0 ].get_array ()
148+ )
121149
122- dtc_reference = s2_processing .distance_to_clouds_in_memory (cube .to_numpy (), ratio = ratio , tolerance_percentage = tolerance )
150+ dtc_reference = s2_processing .distance_to_clouds_in_memory (
151+ cube .to_numpy (), ratio = ratio , tolerance_percentage = tolerance
152+ )
123153
124154 assert np .all (dtc_reference == dtc_local_udf )
125155
126156
127157def spatial_extent_from_bounds (crs , bounding_box ):
128158 directions = ["west" , "south" , "east" , "north" ]
129159
130- extent = {d :b for (d ,b ) in zip (directions , bounding_box )}
160+ extent = {d : b for (d , b ) in zip (directions , bounding_box )}
131161 extent ["crs" ] = extract_epsg_code_from_rasterio_crs (crs )
132162 return extent
133163
@@ -136,3 +166,120 @@ def extract_epsg_code_from_rasterio_crs(crs: rasterio.CRS) -> int:
136166 code_str = crs ["init" ].split (":" )[1 ]
137167 return int (code_str )
138168
169+
170+ ############################## S3 Processing ##############################
171+
172+ # TODO this should not live in test code
173+ CLOUD_FLAG_CLOUD = 0b1
174+ CLOUD_FLAG_CLOUD_AMBIGUOUS = 0b10
175+ CLOUD_FLAG_CLOUD_MARGIN = 0b100
176+ CLOUD_FLAGS_COMBINED = (
177+ CLOUD_FLAG_CLOUD | CLOUD_FLAG_CLOUD_AMBIGUOUS | CLOUD_FLAG_CLOUD_MARGIN
178+ )
179+ OLC_FLAG_LAND = 0b1 << 12
180+
181+
182+ def test_data_acquisition_s3 ():
183+ with create_temp_dir_and_copy_files (
184+ TEST_DATA_S3 , sub = "raw/" , pattern = f"raw/*SY_2_SYN____{ TEST_DATE } *"
185+ ) as tmp :
186+ inner_data_acquisition_s3 (tmp )
187+
188+
189+ def inner_data_acquisition_s3 (tmpdir ):
190+ tmp = Path (tmpdir .name )
191+ # TODO for interactive testing
192+ download_dir = Path ("temp_s3" )
193+ download_dir .mkdir (exist_ok = True )
194+
195+ bounds = {
196+ "west" : 399960.0 ,
197+ "south" : 1590240.0 ,
198+ "east" : 509760.0 ,
199+ "north" : 1700040.0 ,
200+ "crs" : 32628 ,
201+ }
202+
203+ if SMALL_AREA :
204+ dist = 900
205+ bounds ["east" ] = bounds ["west" ] + dist
206+ bounds ["north" ] = bounds ["south" ] + dist
207+
208+ # reference
209+
210+ s3_binning_dir = tmp / "binning"
211+ s3_download_dir = tmp / "raw"
212+ s3_binning_dir .mkdir ()
213+ footprint = transform_bounds_to_wkt (
214+ bounds
215+ ) # TODO probably needs to be converted to wkt
216+
217+ if not SKIP_LOCAL :
218+ s3_processing .binning_s3 (
219+ s3_download_dir ,
220+ s3_binning_dir ,
221+ footprint = footprint ,
222+ s3_bands = ["SDR_Oa04" , "SDR_Oa06" , "SDR_Oa08" , "SDR_Oa17" ],
223+ instrument = "SYNERGY" ,
224+ max_zenith_angle = 30 ,
225+ crs = "EPSG:32628" ,
226+ aggregator = "mean" ,
227+ snap_gpt_path = "gpt" ,
228+ snap_memory = "8G" ,
229+ snap_parallelization = 1 , # TODO more than 1?
230+ )
231+
232+ shutil .copytree (s3_binning_dir , download_dir , dirs_exist_ok = True )
233+
234+ # openeo
235+
236+ conn = s2_processing_openeo .connect ()
237+ conn .authenticate_oidc ()
238+
239+ bands = [
240+ # "Syn_Oa04_reflectance",
241+ # "Syn_Oa06_reflectance",
242+ # "Syn_Oa08_reflectance",
243+ "Syn_Oa17_reflectance" ,
244+ "CLOUD_flags" ,
245+ "OLC_flags" ,
246+ ]
247+ test_area = s2_processing_openeo .TestArea (
248+ bbox = bounds , s3_bands = bands , temporal_extent = (TEST_DATE_DASH , TEST_DATE_DASH )
249+ )
250+ cube = test_area .get_s3_cube (conn )
251+ cloud_flags = cube .band ("CLOUD_flags" )
252+ olc_flags = cube .band ("OLC_flags" )
253+
254+ cloud_mask = (cloud_flags & CLOUD_FLAGS_COMBINED ) != 0
255+ land_mask = (olc_flags & OLC_FLAG_LAND ) != 0
256+ mask = ((~ cloud_mask ) & land_mask ) == 0
257+
258+ cube_masked = cube .mask (mask )
259+ mask = mask .add_dimension (name = "bands" , label = "cloud_noland_mask" , type = "bands" )
260+ merged = cube_masked .merge_cubes (mask )
261+
262+ with Timer (dsc = "OpenEO execution and download" ) as timer :
263+ # cube.download(download_dir / "cube_unmasked.nc")
264+ merged .download (download_dir / "cube_masked.nc" )
265+
266+ print (f"execution and download in { timer .elapsed :.2f} seconds" )
267+
268+ assert True , "Test ran through without issues"
269+ return True
270+
271+
272+ def transform_bounds_to_wkt (bounds : dict ):
273+ required = ["crs" , "west" , "east" , "south" , "north" ]
274+ for att in required :
275+ if att not in bounds :
276+ raise ValueError (f"'bounds' needs to contain all of { required } " )
277+ transformer = pyproj .Transformer .from_crs (
278+ f"EPSG:{ bounds ['crs' ]} " , "EPSG:4326" , always_xy = True
279+ )
280+ minx_wgs84 , miny_wgs84 = transformer .transform (bounds ["west" ], bounds ["south" ])
281+ maxx_wgs84 , maxy_wgs84 = transformer .transform (bounds ["east" ], bounds ["north" ])
282+ bbox = shapely .geometry .box (
283+ minx = minx_wgs84 , miny = miny_wgs84 , maxx = maxx_wgs84 , maxy = maxy_wgs84
284+ )
285+ return wkt .dumps (bbox )
0 commit comments