Skip to content

Commit 85ab141

Browse files
authored
fix: parallelize dataset add for performance (#3338)
1 parent fe91a9c commit 85ab141

File tree

4 files changed

+51
-17
lines changed

4 files changed

+51
-17
lines changed

renku/core/dataset/dataset.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -802,9 +802,12 @@ def add_datadir_files_to_dataset(dataset: Dataset) -> None:
802802
# NOTE: Add existing files to dataset
803803
dataset_files: List[DatasetFile] = []
804804
files: List[Path] = []
805-
for file in get_files(datadir):
805+
existing_files: List[Union[Path, str]] = list(get_files(datadir))
806+
checksums = project_context.repository.get_object_hashes(existing_files)
807+
808+
for file in cast(List[Path], existing_files):
806809
files.append(file)
807-
dataset_files.append(DatasetFile.from_path(path=file, source=file))
810+
dataset_files.append(DatasetFile.from_path(path=file, source=file, checksum=checksums.get(file)))
808811

809812
if not dataset_files:
810813
return
@@ -907,11 +910,14 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat
907910
progress_name = "Updating dataset metadata"
908911
communication.start_progress(progress_name, total=len(files))
909912
try:
913+
checksums = project_context.repository.get_object_hashes(
914+
[file.relative_to(project_context.path) for file in files.values()]
915+
)
910916
for src, dst in files.items():
911917
src = src.relative_to(project_context.path)
912918
dst = dst.relative_to(project_context.path)
913919
# NOTE: Files are moved at this point, so, we can use dst
914-
new_dataset_file = DatasetFile.from_path(dst)
920+
new_dataset_file = DatasetFile.from_path(dst, checksum=checksums.get(dst))
915921

916922
for dataset in datasets:
917923
removed = dataset.unlink_file(src, missing_ok=True)
@@ -1007,9 +1013,11 @@ def update_dataset_local_files(
10071013

10081014
def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool):
10091015
modified_datasets = {}
1010-
1016+
checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files])
10111017
for file in updated_files:
1012-
new_file = DatasetFile.from_path(path=file.entity.path, based_on=file.based_on, source=file.source)
1018+
new_file = DatasetFile.from_path(
1019+
path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
1020+
)
10131021
modified_datasets[file.dataset.name] = file.dataset
10141022
file.dataset.add_or_update_files(new_file)
10151023

renku/core/dataset/dataset_add.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,9 @@ def get_upload_uri(dataset: Dataset, entity_path: Union[Path, str]) -> str:
299299
def move_files_to_dataset(dataset: Dataset, files: List[DatasetAddMetadata]):
300300
"""Copy/Move files into a dataset's directory."""
301301

302-
def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]):
302+
def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]) -> bool:
303303
if not file.has_action:
304-
return
304+
return False
305305

306306
if file.action in (
307307
DatasetAddAction.COPY,
@@ -350,9 +350,6 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]):
350350
else:
351351
raise
352352

353-
if track_in_lfs and not dataset.storage:
354-
track_paths_in_storage(file.destination)
355-
356353
# NOTE: We always copy the files to the dataset's data dir. If dataset has a storage backend, we also upload the
357354
# file to the remote storage.
358355
if storage:
@@ -367,14 +364,22 @@ def move_file(file: DatasetAddMetadata, storage: Optional[IStorage]):
367364

368365
file.based_on = RemoteEntity(url=file_uri, path=file.entity_path, checksum=md5_hash)
369366

367+
return track_in_lfs
368+
370369
dataset_storage = None
371370
if dataset.storage:
372371
provider = ProviderFactory.get_storage_provider(uri=dataset.storage)
373372
dataset_storage = provider.get_storage()
374373

374+
lfs_files = []
375+
375376
for dataset_file in files:
376377
# TODO: Parallelize copy/download/upload
377-
move_file(file=dataset_file, storage=dataset_storage)
378+
if move_file(file=dataset_file, storage=dataset_storage):
379+
lfs_files.append(dataset_file.destination)
380+
381+
if lfs_files and not dataset.storage:
382+
track_paths_in_storage(*lfs_files)
378383

379384

380385
def add_files_to_repository(dataset: Dataset, files: List[DatasetAddMetadata]):
@@ -401,8 +406,16 @@ def add_files_to_repository(dataset: Dataset, files: List[DatasetAddMetadata]):
401406
def update_dataset_metadata(dataset: Dataset, files: List[DatasetAddMetadata], clear_files_before: bool):
402407
"""Add newly-added files to the dataset's metadata."""
403408
dataset_files = []
409+
repo_paths: List[Union[Path, str]] = [
410+
file.entity_path for file in files if (project_context.path / file.entity_path).exists()
411+
]
412+
413+
checksums = project_context.repository.get_object_hashes(repo_paths)
414+
404415
for file in files:
405-
dataset_file = DatasetFile.from_path(path=file.entity_path, source=file.url, based_on=file.based_on)
416+
dataset_file = DatasetFile.from_path(
417+
path=file.entity_path, source=file.url, based_on=file.based_on, checksum=checksums.get(file.entity_path)
418+
)
406419
dataset_files.append(dataset_file)
407420

408421
if clear_files_before:

renku/core/util/git.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,11 @@ def is_path_safe(path: Union[Path, str]) -> bool:
365365

366366

367367
def get_entity_from_revision(
368-
repository: "Repository", path: Union[Path, str], revision: Optional[str] = None, bypass_cache: bool = False
368+
repository: "Repository",
369+
path: Union[Path, str],
370+
revision: Optional[str] = None,
371+
bypass_cache: bool = False,
372+
checksum: Optional[str] = None,
369373
) -> "Entity":
370374
"""Return an Entity instance from given path and revision.
371375
@@ -374,7 +378,7 @@ def get_entity_from_revision(
374378
path(Union[Path, str]): The path of the entity.
375379
revision(str, optional): The revision to check at (Default value = None).
376380
bypass_cache(bool): Whether to ignore cached entries and get information from disk (Default value = False).
377-
381+
checksum(str, optional): Pre-calculated checksum for performance reasons, will be calculated if not set.
378382
Returns:
379383
Entity: The Entity for the given path and revision.
380384
@@ -407,7 +411,8 @@ def get_directory_members(absolute_path: Path) -> List[Entity]:
407411
return cached_entry
408412

409413
# NOTE: For untracked directory the hash is None; make sure to stage them first before calling this function.
410-
checksum = repository.get_object_hash(revision=revision, path=path)
414+
if not checksum:
415+
checksum = repository.get_object_hash(revision=revision, path=path)
411416
# NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the
412417
# entity and use revision as checksum
413418
if isinstance(revision, str) and revision == "HEAD":

renku/domain_model/dataset.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,13 @@ def __init__(
259259
self.source: Optional[str] = str(source)
260260

261261
@classmethod
262-
def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[RemoteEntity] = None) -> "DatasetFile":
262+
def from_path(
263+
cls,
264+
path: Union[str, Path],
265+
source=None,
266+
based_on: Optional[RemoteEntity] = None,
267+
checksum: Optional[str] = None,
268+
) -> "DatasetFile":
263269
"""Return an instance from a path."""
264270
from renku.domain_model.entity import NON_EXISTING_ENTITY_CHECKSUM, Entity
265271

@@ -269,7 +275,9 @@ def from_path(cls, path: Union[str, Path], source=None, based_on: Optional[Remot
269275
id = Entity.generate_id(checksum=checksum, path=path)
270276
entity = Entity(id=id, checksum=checksum, path=path)
271277
else:
272-
entity = get_entity_from_revision(repository=project_context.repository, path=path, bypass_cache=True)
278+
entity = get_entity_from_revision(
279+
repository=project_context.repository, path=path, bypass_cache=True, checksum=checksum
280+
)
273281

274282
is_external = is_external_file(path=path, project_path=project_context.path)
275283
return cls(entity=entity, is_external=is_external, source=source, based_on=based_on)

0 commit comments

Comments
 (0)