11from airflow .providers .amazon .aws .hooks .s3 import S3Hook
22from airflow .decorators import task
33
4- import xarray as xr
5-
64@task (task_id = "determine_latest_zarr" )
75def determine_latest_zarr (bucket : str , prefix : str ):
86 s3hook = S3Hook (aws_conn_id = None ) # Use Boto3 default connection strategy
@@ -26,42 +24,32 @@ def determine_latest_zarr(bucket: str, prefix: str):
2624 size_old += obj .size
2725
2826 # If the sizes are different, create a new latest.zarr
27+ s3hook .log .info (f"size_old={ size_old } , size_new={ size_new } " )
2928 if size_old != size_new and size_new > 500 * 1e3 : # Expecting at least 500KB
3029
31- # open file
32- s3hook .log .info (f"Opening { zarrs [0 ]} " )
33- ds = xr .open_zarr (f"s3://{ bucket } /{ prefix } /{ zarrs [0 ]} " )
34-
35- # re-chunk
36- s3hook .log .info ("Re-chunking" )
37- ds = ds .chunk ({"init_time" : 1 ,
38- "step" : len (ds .step ) // 4 ,
39- "variable" : len (ds .variable ),
40- "latitude" : len (ds .latitude ) // 2 ,
41- "longitude" : len (ds .longitude ) // 2 })
42-
43- # save to latest_temp.zarr
44- s3hook .log .info (f"Saving { prefix } /latest_temp.zarr/" )
45- ds .to_zarr (f"s3://{ bucket } /{ prefix } /latest_temp.zarr/" , mode = "w" )
46-
4730 # delete latest.zarr
4831 s3hook .log .info (f"Deleting { prefix } /latest.zarr/" )
4932 if prefix + "/latest.zarr/" in prefixes :
5033 s3hook .log .debug (f"Deleting { prefix } /latest.zarr/" )
5134 keys_to_delete = s3hook .list_keys (bucket_name = bucket , prefix = prefix + "/latest.zarr/" )
5235 s3hook .delete_objects (bucket = bucket , keys = keys_to_delete )
5336
54- # move latest_temp.zarr to latest.zarr
55- s3hook .log .info (f"Move { prefix } /latest_temp.zarr/ to { prefix } /latest.zarr/" )
56- keys_to_move = s3hook .list_keys (bucket_name = bucket , prefix = prefix + "/latest_temp.zarr/" )
57- for key in keys_to_move :
37+ #
38+
39+ # move latest zarr file to latest.zarr using s3 batch jobs
40+ s3hook .log .info (f"Creating { prefix } /latest.zarr/" )
41+
42+ # Copy the new latest.zarr
43+ s3hook .log .info (f"Copying { zarrs [0 ]} to { prefix } /latest.zarr/" )
44+ source_keys = s3hook .list_keys (bucket_name = bucket , prefix = zarrs [0 ])
45+ for key in source_keys :
5846 s3hook .copy_object (
5947 source_bucket_name = bucket ,
6048 source_bucket_key = key ,
6149 dest_bucket_name = bucket ,
62- dest_bucket_key = prefix + "/latest.zarr/" + key .split (prefix + "/latest_temp.zarr/" )[- 1 ],
50+ dest_bucket_key = prefix + "/latest.zarr/" + key .split (zarrs [ 0 ] )[- 1 ],
6351 )
64- s3hook .delete_objects (bucket = bucket , keys = keys_to_move )
52+ s3hook .delete_objects (bucket = bucket , keys = source_keys )
6553
6654 else :
6755 s3hook .log .info ("No changes to latest.zarr required" )
0 commit comments