Skip to content

Commit 38f9bf0

Browse files
authored
Parquet Support (#1029)
* Store the minio_url from description xml * Add minio dependency * Add call for downloading file from minio bucket * Allow objects to be located in directories * add parquet equivalent of _get_dataset_arff * Store parquet alongside arff, if available * Deal with unknown buckets, fix path expectation * Update test to reflect parquet file is downloaded * Download parquet file through lazy loading i.e. if the dataset was initially retrieved with download_data=False, make sure to download the dataset on first get_data call. * Load data from parquet if available * Update (doc) strings * Cast to signify url is str * Make cache file path generation extension agnostic Fixes a bug where the parquet files would simply be overwritten. Also now only save the local files to members only if they actually exist. * Remove return argument * Add clear test messages, update minio urls * Debugging on CI with print * Add pyarrow dependency for loading parquet * Remove print
1 parent 4ff66ed commit 38f9bf0

File tree

6 files changed

+247
-18
lines changed

6 files changed

+247
-18
lines changed

openml/_api_calls.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
import time
44
import hashlib
55
import logging
6+
import pathlib
67
import requests
8+
import urllib.parse
79
import xml
810
import xmltodict
9-
from typing import Dict, Optional
11+
from typing import Dict, Optional, Union
12+
13+
import minio
1014

1115
from . import config
1216
from .exceptions import (
@@ -68,6 +72,45 @@ def _perform_api_call(call, request_method, data=None, file_elements=None):
6872
return response.text
6973

7074

75+
def _download_minio_file(
76+
source: str, destination: Union[str, pathlib.Path], exists_ok: bool = True,
77+
) -> None:
78+
""" Download file ``source`` from a MinIO Bucket and store it at ``destination``.
79+
80+
Parameters
81+
----------
82+
source : Union[str, pathlib.Path]
83+
URL to a file in a MinIO bucket.
84+
destination : str
85+
Path to store the file to, if a directory is provided the original filename is used.
86+
exists_ok : bool, optional (default=True)
87+
If False, raise FileExists if a file already exists in ``destination``.
88+
89+
"""
90+
destination = pathlib.Path(destination)
91+
parsed_url = urllib.parse.urlparse(source)
92+
93+
# expect path format: /BUCKET/path/to/file.ext
94+
bucket, object_name = parsed_url.path[1:].split("/", maxsplit=1)
95+
if destination.is_dir():
96+
destination = pathlib.Path(destination, object_name)
97+
if destination.is_file() and not exists_ok:
98+
raise FileExistsError(f"File already exists in {destination}.")
99+
100+
client = minio.Minio(endpoint=parsed_url.netloc, secure=False)
101+
102+
try:
103+
client.fget_object(
104+
bucket_name=bucket, object_name=object_name, file_path=str(destination),
105+
)
106+
except minio.error.S3Error as e:
107+
if e.message.startswith("Object does not exist"):
108+
raise FileNotFoundError(f"Object at '{source}' does not exist.") from e
109+
# e.g. permission error, or a bucket does not exist (which is also interpreted as a
110+
# permission error on minio level).
111+
raise FileNotFoundError("Bucket does not exist or is private.") from e
112+
113+
71114
def _download_text_file(
72115
source: str,
73116
output_path: Optional[str] = None,

openml/datasets/dataset.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ class OpenMLDataset(OpenMLBase):
9696
which maps a quality name to a quality value.
9797
dataset: string, optional
9898
Serialized arff dataset string.
99+
minio_url: string, optional
100+
URL to the MinIO bucket with dataset files
101+
parquet_file: string, optional
102+
Path to the local parquet file.
99103
"""
100104

101105
def __init__(
@@ -128,6 +132,8 @@ def __init__(
128132
features_file: Optional[str] = None,
129133
qualities_file: Optional[str] = None,
130134
dataset=None,
135+
minio_url: Optional[str] = None,
136+
parquet_file: Optional[str] = None,
131137
):
132138
def find_invalid_characters(string, pattern):
133139
invalid_chars = set()
@@ -202,7 +208,9 @@ def find_invalid_characters(string, pattern):
202208
self.update_comment = update_comment
203209
self.md5_checksum = md5_checksum
204210
self.data_file = data_file
211+
self.parquet_file = parquet_file
205212
self._dataset = dataset
213+
self._minio_url = minio_url
206214

207215
if features_file is not None:
208216
self.features = _read_features(
@@ -291,9 +299,11 @@ def __eq__(self, other):
291299
def _download_data(self) -> None:
292300
""" Download ARFF data file to standard cache directory. Set `self.data_file`. """
293301
# import required here to avoid circular import.
294-
from .functions import _get_dataset_arff
302+
from .functions import _get_dataset_arff, _get_dataset_parquet
295303

296304
self.data_file = _get_dataset_arff(self)
305+
if self._minio_url is not None:
306+
self.parquet_file = _get_dataset_parquet(self)
297307

298308
def _get_arff(self, format: str) -> Dict:
299309
"""Read ARFF file and return decoded arff.
@@ -454,22 +464,38 @@ def _parse_data_from_arff(
454464
return X, categorical, attribute_names
455465

456466
def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
457-
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
458-
data_feather_file = data_file.replace(".arff", ".feather")
459-
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
467+
ext = f".{data_file.split('.')[-1]}"
468+
data_pickle_file = data_file.replace(ext, ".pkl.py3")
469+
data_feather_file = data_file.replace(ext, ".feather")
470+
feather_attribute_file = data_file.replace(ext, ".feather.attributes.pkl.py3")
460471
return data_pickle_file, data_feather_file, feather_attribute_file
461472

462-
def _cache_compressed_file_from_arff(
463-
self, arff_file: str
473+
def _cache_compressed_file_from_file(
474+
self, data_file: str
464475
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
465-
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
476+
""" Store data from the local file in compressed format.
477+
478+
If a local parquet file is present it will be used instead of the arff file.
479+
Sets cache_format to 'pickle' if data is sparse.
480+
"""
466481
(
467482
data_pickle_file,
468483
data_feather_file,
469484
feather_attribute_file,
470-
) = self._compressed_cache_file_paths(arff_file)
485+
) = self._compressed_cache_file_paths(data_file)
471486

472-
data, categorical, attribute_names = self._parse_data_from_arff(arff_file)
487+
if data_file.endswith(".arff"):
488+
data, categorical, attribute_names = self._parse_data_from_arff(data_file)
489+
elif data_file.endswith(".pq"):
490+
try:
491+
data = pd.read_parquet(data_file)
492+
except Exception as e:
493+
raise Exception(f"File: {data_file}") from e
494+
495+
categorical = [data[c].dtype.name == "category" for c in data.columns]
496+
attribute_names = list(data.columns)
497+
else:
498+
raise ValueError(f"Unknown file type for file '{data_file}'.")
473499

474500
# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
475501
if scipy.sparse.issparse(data):
@@ -480,12 +506,16 @@ def _cache_compressed_file_from_arff(
480506
data.to_feather(data_feather_file)
481507
with open(feather_attribute_file, "wb") as fh:
482508
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
509+
self.data_feather_file = data_feather_file
510+
self.feather_attribute_file = feather_attribute_file
483511
else:
484512
with open(data_pickle_file, "wb") as fh:
485513
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
514+
self.data_pickle_file = data_pickle_file
486515

487516
data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
488517
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")
518+
489519
return data, categorical, attribute_names
490520

491521
def _load_data(self):
@@ -496,10 +526,9 @@ def _load_data(self):
496526
if need_to_create_pickle or need_to_create_feather:
497527
if self.data_file is None:
498528
self._download_data()
499-
res = self._compressed_cache_file_paths(self.data_file)
500-
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
501-
# Since our recently stored data is exists in memory, there is no need to load from disk
502-
return self._cache_compressed_file_from_arff(self.data_file)
529+
530+
file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
531+
return self._cache_compressed_file_from_file(file_to_load)
503532

504533
# helper variable to help identify where errors occur
505534
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
@@ -543,7 +572,8 @@ def _load_data(self):
543572
data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
544573
if self.cache_format == "pickle" and not data_up_to_date:
545574
logger.info("Updating outdated pickle file.")
546-
return self._cache_compressed_file_from_arff(self.data_file)
575+
file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
576+
return self._cache_compressed_file_from_file(file_to_load)
547577
return data, categorical, attribute_names
548578

549579
@staticmethod

openml/datasets/functions.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import io
44
import logging
55
import os
6-
from typing import List, Dict, Union, Optional
6+
from typing import List, Dict, Union, Optional, cast
77

88
import numpy as np
99
import arff
@@ -424,6 +424,10 @@ def get_dataset(
424424
raise
425425

426426
arff_file = _get_dataset_arff(description) if download_data else None
427+
if "oml:minio_url" in description and download_data:
428+
parquet_file = _get_dataset_parquet(description)
429+
else:
430+
parquet_file = None
427431
remove_dataset_cache = False
428432
except OpenMLServerException as e:
429433
# if there was an exception,
@@ -437,7 +441,7 @@ def get_dataset(
437441
_remove_cache_dir_for_id(DATASETS_CACHE_DIR_NAME, did_cache_dir)
438442

439443
dataset = _create_dataset_from_description(
440-
description, features_file, qualities_file, arff_file, cache_format
444+
description, features_file, qualities_file, arff_file, parquet_file, cache_format
441445
)
442446
return dataset
443447

@@ -908,6 +912,55 @@ def _get_dataset_description(did_cache_dir, dataset_id):
908912
return description
909913

910914

915+
def _get_dataset_parquet(
916+
description: Union[Dict, OpenMLDataset], cache_directory: str = None
917+
) -> Optional[str]:
918+
""" Return the path to the local parquet file of the dataset. If is not cached, it is downloaded.
919+
920+
Checks if the file is in the cache, if yes, return the path to the file.
921+
If not, downloads the file and caches it, then returns the file path.
922+
The cache directory is generated based on dataset information, but can also be specified.
923+
924+
This function is NOT thread/multiprocessing safe.
925+
Unlike the ARFF equivalent, checksums are not available/used (for now).
926+
927+
Parameters
928+
----------
929+
description : dictionary or OpenMLDataset
930+
Either a dataset description as dict or OpenMLDataset.
931+
932+
cache_directory: str, optional (default=None)
933+
Folder to store the parquet file in.
934+
If None, use the default cache directory for the dataset.
935+
936+
Returns
937+
-------
938+
output_filename : string, optional
939+
Location of the Parquet file if successfully downloaded, None otherwise.
940+
"""
941+
if isinstance(description, dict):
942+
url = description.get("oml:minio_url")
943+
did = description.get("oml:id")
944+
elif isinstance(description, OpenMLDataset):
945+
url = description._minio_url
946+
did = description.dataset_id
947+
else:
948+
raise TypeError("`description` should be either OpenMLDataset or Dict.")
949+
950+
if cache_directory is None:
951+
cache_directory = _create_cache_directory_for_id(DATASETS_CACHE_DIR_NAME, did)
952+
output_file_path = os.path.join(cache_directory, "dataset.pq")
953+
954+
if not os.path.isfile(output_file_path):
955+
try:
956+
openml._api_calls._download_minio_file(
957+
source=cast(str, url), destination=output_file_path
958+
)
959+
except FileNotFoundError:
960+
return None
961+
return output_file_path
962+
963+
911964
def _get_dataset_arff(description: Union[Dict, OpenMLDataset], cache_directory: str = None) -> str:
912965
""" Return the path to the local arff file of the dataset. If is not cached, it is downloaded.
913966
@@ -1031,6 +1084,7 @@ def _create_dataset_from_description(
10311084
features_file: str,
10321085
qualities_file: str,
10331086
arff_file: str = None,
1087+
parquet_file: str = None,
10341088
cache_format: str = "pickle",
10351089
) -> OpenMLDataset:
10361090
"""Create a dataset object from a description dict.
@@ -1045,6 +1099,8 @@ def _create_dataset_from_description(
10451099
Path of the dataset qualities as xml file.
10461100
arff_file : string, optional
10471101
Path of dataset ARFF file.
1102+
parquet_file : string, optional
1103+
Path of dataset Parquet file.
10481104
cache_format: string, optional
10491105
Caching option for datasets (feather/pickle)
10501106
@@ -1081,6 +1137,8 @@ def _create_dataset_from_description(
10811137
cache_format=cache_format,
10821138
features_file=features_file,
10831139
qualities_file=qualities_file,
1140+
minio_url=description.get("oml:minio_url"),
1141+
parquet_file=parquet_file,
10841142
)
10851143

10861144

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
"pandas>=1.0.0",
5454
"scipy>=0.13.3",
5555
"numpy>=1.6.2",
56+
"minio",
57+
"pyarrow",
5658
],
5759
extras_require={
5860
"test": [
@@ -65,7 +67,6 @@
6567
"nbformat",
6668
"oslo.concurrency",
6769
"flaky",
68-
"pyarrow",
6970
"pre-commit",
7071
"pytest-cov",
7172
"mypy",
69.3 KB
Binary file not shown.

0 commit comments

Comments
 (0)