diff --git a/scripts/simulate/run_download_input.py b/scripts/simulate/run_download_input.py index 3e5aca560..8f6277a38 100644 --- a/scripts/simulate/run_download_input.py +++ b/scripts/simulate/run_download_input.py @@ -2,14 +2,8 @@ if __name__ == '__main__': - context_name = 'BAG_D244_F3_C1416_small' # The context you want to download - dry_run = True # Set False to download files + context_name = 'D244_F3_C1416_3' # The context you want to download include_zs = True # Set False to only download files necessary for SingleBest (skip predict proba files) - if dry_run: - print(f'NOTE: Files will not be downloaded as `dry_run=True`.\n' - f'This will log what files will be downloaded instead.\n' - f'Set `dry_run=False` to download the files.') - context = get_context(context_name) - context.download(include_zs=include_zs, dry_run=dry_run) + context.download(include_zs=include_zs, use_s3=False) diff --git a/setup.py b/setup.py index 46efdea6b..27916480a 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,7 @@ 'autogluon.core[all]', 'pytest', 'typing-extensions>=4.11,<5', # used for `Self` type hint + 'huggingface-hub', ] setup( diff --git a/tabrepo/contexts/context.py b/tabrepo/contexts/context.py index 56863ecb8..d70412124 100644 --- a/tabrepo/contexts/context.py +++ b/tabrepo/contexts/context.py @@ -1,3 +1,4 @@ + from __future__ import annotations from dataclasses import asdict, dataclass @@ -22,9 +23,80 @@ from ..predictions.tabular_predictions import TabularModelPredictions from ..repository.evaluation_repository import EvaluationRepository from ..utils import catchtime +from ..utils.huggingfacehub_utils import download_from_huggingface from ..utils.download import download_files +def download_from_s3(name: str, include_zs: bool, exists: str, dry_run: bool, s3_download_map, benchmark_paths, verbose: bool): + print(f'Downloading files for {name} context... ' + f'(include_zs={include_zs}, exists="{exists}", dry_run={dry_run})') + if dry_run: + print(f'\tNOTE: `dry_run=True`! Files will not be downloaded.') + assert exists in ["raise", "ignore", "overwrite"] + assert s3_download_map is not None, \ + f'self.s3_download_map is None: download functionality is disabled' + file_paths_expected = benchmark_paths.get_file_paths(include_zs=include_zs) + + file_paths_to_download = [f for f in file_paths_expected if f in s3_download_map] + if len(file_paths_to_download) == 0: + print(f'WARNING: Matching file paths to download is 0! ' + f'`self.s3_download_map` probably has incorrect keys.') + file_paths_already_exist = [f for f in file_paths_to_download if benchmark_paths.exists(f)] + file_paths_missing = [f for f in file_paths_to_download if not benchmark_paths.exists(f)] + + if exists == 'raise': + if file_paths_already_exist: + raise AssertionError(f'`exists="{exists}"`, ' + f'and found {len(file_paths_already_exist)} files that already exist locally!\n' + f'\tExisting Files: {file_paths_already_exist}\n' + f'\tMissing Files: {file_paths_missing}\n' + f'Either manually inspect and delete existing files, ' + f'set `exists="ignore"` to keep your local files and only download missing files, ' + f'or set `exists="overwrite"` to overwrite your existing local files.') + elif exists == 'ignore': + file_paths_to_download = file_paths_missing + elif exists == 'overwrite': + file_paths_to_download = file_paths_to_download + else: + raise ValueError(f'Invalid value for exists (`exists="{exists}"`). ' + f'Valid values: {["raise", "ignore", "overwrite"]}') + + s3_to_local_tuple_list = [(val, key) for key, val in s3_download_map.items() + if key in file_paths_to_download] + + log_extra = '' + + num_exist = len(file_paths_already_exist) + if exists == 'overwrite': + if num_exist > 0: + log_extra += f'\tWill overwrite {num_exist} files that exist locally:\n' \ + f'\t\t{file_paths_already_exist}' + else: + log_extra = f'' + if exists == 'ignore': + log_extra += f'\tWill skip {num_exist} files that exist locally:\n' \ + f'\t\t{file_paths_already_exist}' + if file_paths_missing: + if log_extra: + log_extra += '\n' + log_extra += f'Will download {len(file_paths_missing)} files that are missing locally:\n' \ + f'\t\t{file_paths_missing}' + + if log_extra: + print(log_extra) + print(f'\tDownloading {len(s3_to_local_tuple_list)} files from s3 to local...') + for s3_path, local_path in s3_to_local_tuple_list: + print(f'\t\t"{s3_path}" -> "{local_path}"') + s3_required_list = [(s3_path, local_path) for s3_path, local_path in s3_to_local_tuple_list if + s3_path[:2] == "s3"] + urllib_required_list = [(s3_path, local_path) for s3_path, local_path in s3_to_local_tuple_list if + s3_path[:2] != "s3"] + if urllib_required_list: + download_files(remote_to_local_tuple_list=urllib_required_list, dry_run=dry_run, verbose=verbose) + if s3_required_list: + download_s3_files(s3_to_local_tuple_list=s3_required_list, dry_run=dry_run, verbose=verbose) + + @dataclass class BenchmarkPaths: configs: str @@ -260,7 +332,9 @@ def download(self, include_zs: bool = True, exists: str = 'raise', verbose: bool = True, - dry_run: bool = False): + dry_run: bool = False, + use_s3: bool = True, + ): """ Downloads all BenchmarkContext required files from s3 to local disk. @@ -275,78 +349,27 @@ def download(self, Guarantees alignment between local and remote files (at the time of download) :param dry_run: If True, will not download files, but instead log what would have been downloaded. """ - print(f'Downloading files for {self.name} context... ' - f'(include_zs={include_zs}, exists="{exists}", dry_run={dry_run})') - if dry_run: - print(f'\tNOTE: `dry_run=True`! Files will not be downloaded.') - assert exists in ["raise", "ignore", "overwrite"] - assert self.s3_download_map is not None, \ - f'self.s3_download_map is None: download functionality is disabled' - file_paths_expected = self.benchmark_paths.get_file_paths(include_zs=include_zs) - - file_paths_to_download = [f for f in file_paths_expected if f in self.s3_download_map] - if len(file_paths_to_download) == 0: - print(f'WARNING: Matching file paths to download is 0! ' - f'`self.s3_download_map` probably has incorrect keys.') - file_paths_already_exist = [f for f in file_paths_to_download if self.benchmark_paths.exists(f)] - file_paths_missing = [f for f in file_paths_to_download if not self.benchmark_paths.exists(f)] - - if exists == 'raise': - if file_paths_already_exist: - raise AssertionError(f'`exists="{exists}"`, ' - f'and found {len(file_paths_already_exist)} files that already exist locally!\n' - f'\tExisting Files: {file_paths_already_exist}\n' - f'\tMissing Files: {file_paths_missing}\n' - f'Either manually inspect and delete existing files, ' - f'set `exists="ignore"` to keep your local files and only download missing files, ' - f'or set `exists="overwrite"` to overwrite your existing local files.') - elif exists == 'ignore': - file_paths_to_download = file_paths_missing - elif exists == 'overwrite': - file_paths_to_download = file_paths_to_download + if use_s3: + download_from_s3( + name=self.name, include_zs=include_zs, exists=exists, dry_run=dry_run, + s3_download_map=self.s3_download_map, benchmark_paths=self.benchmark_paths, verbose=verbose + ) else: - raise ValueError(f'Invalid value for exists (`exists="{exists}"`). ' - f'Valid values: {["raise", "ignore", "overwrite"]}') - - s3_to_local_tuple_list = [(val, key) for key, val in self.s3_download_map.items() - if key in file_paths_to_download] - - log_extra = '' - - num_exist = len(file_paths_already_exist) - if exists == 'overwrite': - if num_exist > 0: - log_extra += f'\tWill overwrite {num_exist} files that exist locally:\n' \ - f'\t\t{file_paths_already_exist}' - else: - log_extra = f'' - if exists == 'ignore': - log_extra += f'\tWill skip {num_exist} files that exist locally:\n' \ - f'\t\t{file_paths_already_exist}' - if file_paths_missing: - if log_extra: - log_extra += '\n' - log_extra += f'Will download {len(file_paths_missing)} files that are missing locally:\n' \ - f'\t\t{file_paths_missing}' - - if log_extra: - print(log_extra) - print(f'\tDownloading {len(s3_to_local_tuple_list)} files from s3 to local...') - for s3_path, local_path in s3_to_local_tuple_list: - print(f'\t\t"{s3_path}" -> "{local_path}"') - s3_required_list = [(s3_path, local_path) for s3_path, local_path in s3_to_local_tuple_list if s3_path[:2] == "s3"] - urllib_required_list = [(s3_path, local_path) for s3_path, local_path in s3_to_local_tuple_list if s3_path[:2] != "s3"] - if urllib_required_list: - download_files(remote_to_local_tuple_list=urllib_required_list, dry_run=dry_run, verbose=verbose) - if s3_required_list: - download_s3_files(s3_to_local_tuple_list=s3_required_list, dry_run=dry_run, verbose=verbose) + if verbose: + print(f'Downloading files for {self.name} context... ' + f'(include_zs={include_zs}, exists="{exists}")') + download_from_huggingface( + datasets=self.benchmark_paths.datasets, + ) def load(self, folds: List[int] = None, load_predictions: bool = True, download_files: bool = True, prediction_format: str = "memmap", - exists: str = 'ignore') -> Tuple[ZeroshotSimulatorContext, TabularModelPredictions, GroundTruth]: + exists: str = 'ignore', + use_s3: bool = True, + ) -> Tuple[ZeroshotSimulatorContext, TabularModelPredictions, GroundTruth]: """ :param folds: If None, uses self.folds as default. If specified, must be a subset of `self.folds`. This will filter the results to only the specified folds. @@ -397,7 +420,7 @@ def load(self, missing_files_str = [f'\n\t"{m}"' for m in missing_files] raise FileNotFoundError(f'Missing {len(missing_files)} required files: \n[{",".join(missing_files_str)}\n]') print(f'Downloading input files from s3...') - self.download(include_zs=load_predictions, exists=exists) + self.download(include_zs=load_predictions, exists=exists, use_s3=use_s3) self.benchmark_paths.assert_exists_all(check_zs=load_predictions) configs_hyperparameters = self.load_configs_hyperparameters() @@ -419,6 +442,7 @@ def load_repo( download_files: bool = True, prediction_format: str = "memmap", exists: str = 'ignore', + use_s3: bool = True, ) -> EvaluationRepository: zsc, zeroshot_pred_proba, zeroshot_gt = self.load( folds=folds, @@ -426,6 +450,7 @@ def load_repo( download_files=download_files, prediction_format=prediction_format, exists=exists, + use_s3=use_s3, ) repo = EvaluationRepository( zeroshot_context=zsc, diff --git a/tabrepo/repository/evaluation_repository.py b/tabrepo/repository/evaluation_repository.py index 2e8d68156..6ca8aed99 100644 --- a/tabrepo/repository/evaluation_repository.py +++ b/tabrepo/repository/evaluation_repository.py @@ -327,7 +327,7 @@ def _convert_sim_artifacts(cls, results_lst_simulation_artifacts: list[dict[str, return simulation_artifacts_full -def load_repository(version: str, *, load_predictions: bool = True, cache: bool | str = False, prediction_format: str = "memmap") -> EvaluationRepository: +def load_repository(version: str, *, load_predictions: bool = True, cache: bool | str = False, prediction_format: str = "memmap", use_s3: bool = True) -> EvaluationRepository: """ Load the specified EvaluationRepository. Will automatically download all required inputs if they do not already exist on local disk. @@ -347,7 +347,8 @@ def load_repository(version: str, *, load_predictions: bool = True, cache: bool Options: ["memmap", "memopt", "mem"] Determines the way the predictions are represented in the repo. It is recommended to keep the value as "memmap" for optimal performance. - + use_s3: bool, default = True + Whether to use S3 to download tabrepo files, if False uses HuggingFace instead. Returns ------- EvaluationRepository object for the given context. @@ -358,7 +359,7 @@ def load_repository(version: str, *, load_predictions: bool = True, cache: bool if isinstance(cache, str) and cache == "overwrite": kwargs["ignore_cache"] = True kwargs["exists"] = "overwrite" - repo = get_subcontext(version).load(load_predictions=load_predictions, prediction_format=prediction_format, **kwargs) + repo = get_subcontext(version).load(load_predictions=load_predictions, prediction_format=prediction_format, use_s3=use_s3, **kwargs) else: - repo = get_subcontext(version).load_from_parent(load_predictions=load_predictions, prediction_format=prediction_format) + repo = get_subcontext(version).load_from_parent(load_predictions=load_predictions, prediction_format=prediction_format, use_s3=use_s3) return repo diff --git a/tabrepo/utils/huggingfacehub_utils.py b/tabrepo/utils/huggingfacehub_utils.py new file mode 100644 index 000000000..ce187eb8f --- /dev/null +++ b/tabrepo/utils/huggingfacehub_utils.py @@ -0,0 +1,133 @@ +import os +from pathlib import Path +from huggingface_hub import HfApi +from tqdm import tqdm + +from tabrepo.utils.result_utils import results_path + +def upload_hugging_face( + version: str, + repo_id: str, + local_dir: Path | None = None, + override_existing_files: bool = True, + continue_in_case_of_error: bool = True +): + """ + Uploads tabrepo data to Hugging Face repository. + You should set your env variable HF_TOKEN and ask write access to tabrepo before using the script. + + Args: + version (str): The version of the data to be uploaded, the folder data/results/{version}/ should + be present and should contain baselines.parquet, configs.parquet and model_predictions/ folder + repo_id (str): The ID of the Hugging Face repository. + local_dir (Path): path to load datasets, use tabrepo default if not specified + override_existing_files (bool): Whether to re-upload files if they are already found in HuggingFace. + Returns: + None + """ + commit_message = f"Upload tabrepo new version" + if local_dir is None: + local_dir = str(results_path()) + else: + local_dir = str(local_dir) + + for filename in ["baselines.parquet", "configs.parquet", "model_predictions"]: + assert (root / filename).exists(), f"Expected to found {filename} but could not be found in {root / filename}." + api = HfApi() + for filename in ["baselines.parquet", "configs.parquet"]: + path_in_repo = str(Path(version) / filename) + if api.file_exists(repo_id=repo_id, filename=path_in_repo, token=os.getenv("HF_TOKEN"), repo_type="dataset") and not override_existing_files: + print(f"Skipping {path_in_repo} which already exists in the repo.") + continue + + api.upload_file( + path_or_fileobj=root / filename, + path_in_repo=path_in_repo, + repo_id=repo_id, + repo_type="dataset", + commit_message=commit_message, + token=os.getenv("HF_TOKEN"), + ) + files = list(sorted(Path(root / "model_predictions").glob("*"))) + for dataset_path in tqdm(files): + print(dataset_path) + try: + path_in_repo = str(Path(version) / "model_predictions" / dataset_path.name) + # ideally, we would just check if the folder exists but it is not possible AFAIK, we could alternatively + # upload per file but it would create a lot of different commits. + if api.file_exists(repo_id=repo_id, filename=str(Path(path_in_repo) / "0" / "metadata.json"), token=os.getenv("HF_TOKEN"), + repo_type="dataset") and not override_existing_files: + print(f"Skipping {path_in_repo} which already exists in the repo.") + continue + api.upload_folder( + folder_path=dataset_path, + path_in_repo=path_in_repo, + repo_id=repo_id, + repo_type="dataset", + ignore_patterns="*DS_Store", + commit_message=f"Upload tabrepo new version {dataset_path.name}", + token=os.getenv("HF_TOKEN"), + ) + except Exception as e: + if continue_in_case_of_error: + print(str(e)) + else: + raise e + + +def download_from_huggingface( + version: str = "2023_11_14", + force_download: bool = False, + local_files_only: bool = False, + datasets: list[str] | None = None, + local_dir: str | Path = None, +): + """ + :param version: name of a tabrepo version such as `2023_11_14` + :param local_files_only: whether to use local files with no internet check on the Hub + :param force_download: forces files to be downloaded + :param datasets: list of datasets to download, if not specified all datasets will be downloaded + :param local_dir: where to download local files, if not specified all files will be downloaded under + {tabrepo_root}/data/results + :return: + """ + # https://huggingface.co/datasets/Tabrepo/tabrepo/tree/main/2023_11_14/model_predictions + api = HfApi() + if local_dir is None: + local_dir = str(results_path()) + else: + local_dir = str(local_dir) + print(f"Going to download tabrepo files to {local_dir}.") + if datasets is None: + allow_patterns = None + else: + allow_patterns = [f"*{version}*{d}*" for d in datasets] + + allow_patterns += [ + "*baselines.parquet", + "*configs.parquet", + "*task_metadata.csv", + ] + + print(f"Allowed patterns: {allow_patterns}") + api.snapshot_download( + repo_id="Tabrepo/tabrepo", + repo_type="dataset", + allow_patterns=allow_patterns, + local_dir=local_dir, + force_download=force_download, + local_files_only=local_files_only, + ) +if __name__ == '__main__': + # upload_hugging_face( + # version="2023_11_14", + # repo_id="tabrepo/tabrepo", + # override_existing_files=False, + # ) + datasets = [ + 'Australian', + ] + download_from_huggingface( + datasets=datasets, + version="2023_11_14", + ) \ No newline at end of file diff --git a/tabrepo/utils/result_utils.py b/tabrepo/utils/result_utils.py index d2db9578f..4b4c03138 100644 --- a/tabrepo/utils/result_utils.py +++ b/tabrepo/utils/result_utils.py @@ -1,6 +1,13 @@ +from pathlib import Path +import tabrepo from autogluon.common.loaders import load_pd from autogluon.common.savers import save_pd +def results_path(): + res = Path(tabrepo.__path__[0]).parent / "data/results/" + if not res.exists(): + res.mkdir(parents=True, exist_ok=True) + return res def shrink_result_file_size(path_load, path_save): result_df = load_pd.load(path_load)