|
1 | 1 | """This file contains utility functions for processing data located on an S3 storage. |
2 | 2 | The upload of data to the storage system should be performed with 'rclone'. |
3 | 3 | """ |
| 4 | +import json |
4 | 5 | import os |
| 6 | +import warnings |
| 7 | +from shutil import which |
| 8 | +from subprocess import run |
5 | 9 | from typing import Optional, Tuple |
6 | 10 |
|
7 | 11 | import s3fs |
@@ -186,3 +190,96 @@ def create_s3_target( |
186 | 190 | else: |
187 | 191 | s3_filesystem = s3fs.S3FileSystem(anon=anon, client_kwargs=client_kwargs, asynchronous=asynchronous) |
188 | 192 | return s3_filesystem |
| 193 | + |
| 194 | + |
| 195 | +def _sync_rclone(local_dir, target): |
| 196 | + # The rclone alias could also be exposed as parameter. |
| 197 | + rclone_alias = "cochlea-lightsheet" |
| 198 | + print("Sync", local_dir, "to", target) |
| 199 | + run(["rclone", "--progress", "copyto", local_dir, f"{rclone_alias}:{target}"]) |
| 200 | + |
| 201 | + |
| 202 | +def sync_dataset( |
| 203 | + mobie_root: str, |
| 204 | + dataset_name: str, |
| 205 | + bucket_name: Optional[str] = None, |
| 206 | + url: Optional[str] = None, |
| 207 | + anon: Optional[str] = False, |
| 208 | + credential_file: Optional[str] = None, |
| 209 | + force_segmentation_update: bool = False, |
| 210 | +) -> None: |
| 211 | + """Sync a MoBIE dataset on the s3 bucket using rclone. |
| 212 | +
|
| 213 | + Args: |
| 214 | + mobie_root: The directory with the local mobie project. |
| 215 | + dataset_name: The mobie dataset to sync. |
| 216 | + bucket_name: The name of the dataset's bucket on s3. |
| 217 | + url: Service endpoint for S3 bucket |
| 218 | + anon: Option for anon argument of S3FileSystem |
| 219 | + credential_file: File path to credentials |
| 220 | + force_segmentation_update: Whether to force segmentation updates. |
| 221 | + """ |
| 222 | + from mobie.metadata import add_remote_project_metadata |
| 223 | + |
| 224 | + # Make sure that rclone is loaded. |
| 225 | + if which("rclone") is None: |
| 226 | + raise RuntimeError("rclone is required for synchronization. Try loading it via 'module load rclone'.") |
| 227 | + |
| 228 | + # Make sure the dataset is in the local version of the dataset. |
| 229 | + with open(os.path.join(mobie_root, "project.json")) as f: |
| 230 | + project_metadata = json.load(f) |
| 231 | + datasets = project_metadata["datasets"] |
| 232 | + assert dataset_name in datasets |
| 233 | + |
| 234 | + # Get s3 filsystem and bucket name. |
| 235 | + s3 = create_s3_target(url, anon, credential_file) |
| 236 | + if bucket_name is None: |
| 237 | + bucket_name = BUCKET_NAME |
| 238 | + if url is None: |
| 239 | + url = SERVICE_ENDPOINT |
| 240 | + |
| 241 | + # Add the required remote metadata to the project. Suppress warnings about missing local data. |
| 242 | + with warnings.catch_warnings(): |
| 243 | + warnings.filterwarnings("ignore", category=UserWarning) |
| 244 | + add_remote_project_metadata(mobie_root, bucket_name, url) |
| 245 | + |
| 246 | + # Get the metadata from the S3 bucket. |
| 247 | + project_metadata_path = os.path.join(bucket_name, "project.json") |
| 248 | + with s3.open(project_metadata_path, "r") as f: |
| 249 | + project_metadata = json.load(f) |
| 250 | + |
| 251 | + # Check if the dataset is part of the remote project already. |
| 252 | + local_ds_root = os.path.join(mobie_root, dataset_name) |
| 253 | + remote_ds_root = os.path.join(bucket_name, dataset_name) |
| 254 | + if dataset_name not in project_metadata["datasets"]: |
| 255 | + print("The dataset is not yet synced. Will copy it over.") |
| 256 | + _sync_rclone(os.path.join(mobie_root, "project.json"), project_metadata_path) |
| 257 | + _sync_rclone(local_ds_root, remote_ds_root) |
| 258 | + return |
| 259 | + |
| 260 | + # Otherwise, check which sources are new and add them. |
| 261 | + with open(os.path.join(mobie_root, dataset_name, "dataset.json")) as f: |
| 262 | + local_dataset_metadata = json.load(f) |
| 263 | + |
| 264 | + dataset_metadata_path = os.path.join(bucket_name, dataset_name, "dataset.json") |
| 265 | + with s3.open(dataset_metadata_path, "r") as f: |
| 266 | + remote_dataset_metadata = json.load(f) |
| 267 | + |
| 268 | + for source_name, source_data in local_dataset_metadata["sources"].items(): |
| 269 | + source_type, source_data = next(iter(source_data.items())) |
| 270 | + is_segmentation = source_type == "segmentation" |
| 271 | + is_spots = source_type == "spots" |
| 272 | + data_path = source_data["imageData"]["ome.zarr"]["relativePath"] |
| 273 | + source_not_on_remote = (source_name not in remote_dataset_metadata["sources"]) |
| 274 | + # Only update the image data if the source is not updated or if we force updates for segmentations. |
| 275 | + if source_not_on_remote or (is_segmentation and force_segmentation_update): |
| 276 | + _sync_rclone(os.path.join(local_ds_root, data_path), os.path.join(remote_ds_root, data_path)) |
| 277 | + # We always sync the tables. |
| 278 | + if is_segmentation or is_spots: |
| 279 | + table_path = source_data["tableData"]["tsv"]["relativePath"] |
| 280 | + _sync_rclone(os.path.join(local_ds_root, table_path), os.path.join(remote_ds_root, table_path)) |
| 281 | + |
| 282 | + # Sync the dataset metadata. |
| 283 | + _sync_rclone( |
| 284 | + os.path.join(mobie_root, dataset_name, "dataset.json"), os.path.join(remote_ds_root, "dataset.json") |
| 285 | + ) |
0 commit comments