11"""This file contains utility functions for processing data located on an S3 storage.
22The upload of data to the storage system should be performed with 'rclone'.
33"""
4+ import json
45import os
6+ import warnings
7+ from shutil import which
8+ from subprocess import run
59from typing import Optional , Tuple
610
711import s3fs
@@ -115,14 +119,23 @@ def get_s3_path(
115119 bucket_name , service_endpoint , credential_file
116120 )
117121
118- s3_filesystem = create_s3_target (url = service_endpoint , anon = False , credential_file = credential_file )
122+ zarr_major_version = int (zarr .__version__ .split ("." )[0 ])
123+ s3_filesystem = create_s3_target (
124+ url = service_endpoint , anon = False , credential_file = credential_file , asynchronous = zarr_major_version == 3 ,
125+ )
119126
120127 zarr_path = f"{ bucket_name } /{ input_path } "
121128
122- if not s3_filesystem .exists (zarr_path ):
129+ if zarr_major_version == 2 and not s3_filesystem .exists (zarr_path ):
123130 print (f"Error: S3 path { zarr_path } does not exist!" )
124131
125- s3_path = zarr .storage .FSStore (zarr_path , fs = s3_filesystem )
132+ # The approach for opening a dataset from S3 differs in zarr v2 and zarr v3.
133+ if zarr_major_version == 2 :
134+ s3_path = zarr .storage .FSStore (zarr_path , fs = s3_filesystem )
135+ elif zarr_major_version == 3 :
136+ s3_path = zarr .storage .FsspecStore (fs = s3_filesystem , path = zarr_path )
137+ else :
138+ raise RuntimeError (f"Unsupported zarr version { zarr_major_version } " )
126139
127140 return s3_path , s3_filesystem
128141
@@ -153,6 +166,7 @@ def create_s3_target(
153166 url : Optional [str ] = None ,
154167 anon : Optional [str ] = False ,
155168 credential_file : Optional [str ] = None ,
169+ asynchronous : bool = False ,
156170) -> s3fs .core .S3FileSystem :
157171 """Create file system for S3 bucket based on a service endpoint and an optional credential file.
158172 If the credential file is not provided, the s3fs.S3FileSystem function checks the environment variables
@@ -162,14 +176,110 @@ def create_s3_target(
162176 url: Service endpoint for S3 bucket
163177 anon: Option for anon argument of S3FileSystem
164178 credential_file: File path to credentials
179+ asynchronous: Whether to open the file system in async mode.
165180
166181 Returns:
167182 s3_filesystem
168183 """
169184 client_kwargs = {"endpoint_url" : SERVICE_ENDPOINT if url is None else url }
170185 if credential_file is not None :
171186 key , secret = read_s3_credentials (credential_file )
172- s3_filesystem = s3fs .S3FileSystem (key = key , secret = secret , client_kwargs = client_kwargs )
187+ s3_filesystem = s3fs .S3FileSystem (
188+ key = key , secret = secret , client_kwargs = client_kwargs , asynchronous = asynchronous
189+ )
173190 else :
174- s3_filesystem = s3fs .S3FileSystem (anon = anon , client_kwargs = client_kwargs )
191+ s3_filesystem = s3fs .S3FileSystem (anon = anon , client_kwargs = client_kwargs , asynchronous = asynchronous )
175192 return s3_filesystem
193+
194+
195+ def _sync_rclone (local_dir , target ):
196+ # The rclone alias could also be exposed as parameter.
197+ rclone_alias = "cochlea-lightsheet"
198+ print ("Sync" , local_dir , "to" , target )
199+ run (["rclone" , "--progress" , "copyto" , local_dir , f"{ rclone_alias } :{ target } " ])
200+
201+
202+ def sync_dataset (
203+ mobie_root : str ,
204+ dataset_name : str ,
205+ bucket_name : Optional [str ] = None ,
206+ url : Optional [str ] = None ,
207+ anon : Optional [str ] = False ,
208+ credential_file : Optional [str ] = None ,
209+ force_segmentation_update : bool = False ,
210+ ) -> None :
211+ """Sync a MoBIE dataset on the s3 bucket using rclone.
212+
213+ Args:
214+ mobie_root: The directory with the local mobie project.
215+ dataset_name: The mobie dataset to sync.
216+ bucket_name: The name of the dataset's bucket on s3.
217+ url: Service endpoint for S3 bucket
218+ anon: Option for anon argument of S3FileSystem
219+ credential_file: File path to credentials
220+ force_segmentation_update: Whether to force segmentation updates.
221+ """
222+ from mobie .metadata import add_remote_project_metadata
223+
224+ # Make sure that rclone is loaded.
225+ if which ("rclone" ) is None :
226+ raise RuntimeError ("rclone is required for synchronization. Try loading it via 'module load rclone'." )
227+
228+ # Make sure the dataset is in the local version of the dataset.
229+ with open (os .path .join (mobie_root , "project.json" )) as f :
230+ project_metadata = json .load (f )
231+ datasets = project_metadata ["datasets" ]
232+ assert dataset_name in datasets
233+
234+ # Get s3 filsystem and bucket name.
235+ s3 = create_s3_target (url , anon , credential_file )
236+ if bucket_name is None :
237+ bucket_name = BUCKET_NAME
238+ if url is None :
239+ url = SERVICE_ENDPOINT
240+
241+ # Add the required remote metadata to the project. Suppress warnings about missing local data.
242+ with warnings .catch_warnings ():
243+ warnings .filterwarnings ("ignore" , category = UserWarning )
244+ add_remote_project_metadata (mobie_root , bucket_name , url )
245+
246+ # Get the metadata from the S3 bucket.
247+ project_metadata_path = os .path .join (bucket_name , "project.json" )
248+ with s3 .open (project_metadata_path , "r" ) as f :
249+ project_metadata = json .load (f )
250+
251+ # Check if the dataset is part of the remote project already.
252+ local_ds_root = os .path .join (mobie_root , dataset_name )
253+ remote_ds_root = os .path .join (bucket_name , dataset_name )
254+ if dataset_name not in project_metadata ["datasets" ]:
255+ print ("The dataset is not yet synced. Will copy it over." )
256+ _sync_rclone (os .path .join (mobie_root , "project.json" ), project_metadata_path )
257+ _sync_rclone (local_ds_root , remote_ds_root )
258+ return
259+
260+ # Otherwise, check which sources are new and add them.
261+ with open (os .path .join (mobie_root , dataset_name , "dataset.json" )) as f :
262+ local_dataset_metadata = json .load (f )
263+
264+ dataset_metadata_path = os .path .join (bucket_name , dataset_name , "dataset.json" )
265+ with s3 .open (dataset_metadata_path , "r" ) as f :
266+ remote_dataset_metadata = json .load (f )
267+
268+ for source_name , source_data in local_dataset_metadata ["sources" ].items ():
269+ source_type , source_data = next (iter (source_data .items ()))
270+ is_segmentation = source_type == "segmentation"
271+ is_spots = source_type == "spots"
272+ data_path = source_data ["imageData" ]["ome.zarr" ]["relativePath" ]
273+ source_not_on_remote = (source_name not in remote_dataset_metadata ["sources" ])
274+ # Only update the image data if the source is not updated or if we force updates for segmentations.
275+ if source_not_on_remote or (is_segmentation and force_segmentation_update ):
276+ _sync_rclone (os .path .join (local_ds_root , data_path ), os .path .join (remote_ds_root , data_path ))
277+ # We always sync the tables.
278+ if is_segmentation or is_spots :
279+ table_path = source_data ["tableData" ]["tsv" ]["relativePath" ]
280+ _sync_rclone (os .path .join (local_ds_root , table_path ), os .path .join (remote_ds_root , table_path ))
281+
282+ # Sync the dataset metadata.
283+ _sync_rclone (
284+ os .path .join (mobie_root , dataset_name , "dataset.json" ), os .path .join (remote_ds_root , "dataset.json" )
285+ )
0 commit comments