Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions nemo/utils/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility functions for handling data operations, including datastore access and caching."""

import os
import pathlib
import shutil
Expand Down Expand Up @@ -77,10 +79,9 @@ def is_datastore_cache_shared() -> bool:

if cache_shared == 0:
return False
elif cache_shared == 1:
if cache_shared == 1:
return True
else:
raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}')
raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}')


def ais_cache_base() -> str:
Expand Down Expand Up @@ -150,11 +151,10 @@ def ais_binary() -> str:
if os.path.isfile(default_path):
logging.info('ais available at the default path: %s', default_path, mode=LogMode.ONCE)
return default_path
else:
logging.warning(
f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE
)
return None
logging.warning(
f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE
)
return None


def datastore_path_to_local_path(store_path: str) -> str:
Expand Down Expand Up @@ -185,7 +185,8 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5):

Args:
path: path to an object
num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism
num_retries: number of retries if the get command fails with ais binary,
as AIS Python SDK has its own retry mechanism

Returns:
File-like object that supports read()
Expand All @@ -200,39 +201,54 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5):

if not binary:
raise RuntimeError(
f"AIS binary is not found, cannot resolve {path}. Please either install it or install Lhotse with `pip install lhotse`.\n"
"Lhotse's native open_best supports AIS Python SDK, which is the recommended way to operate with the data from AIStore.\n"
"See AIS binary installation instructions at https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n"
f"AIS binary is not found, cannot resolve {path}. "
"Please either install it or install Lhotse with `pip install lhotse`.\n"
"Lhotse's native open_best supports AIS Python SDK, "
"which is the recommended way to operate with the data from AIStore.\n"
"See AIS binary installation instructions at "
"https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n"
)

cmd = f'{binary} get {path} -'
cmd = [binary, 'get', path, '-']

done = False

for _ in range(num_retries):
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode
)
stream = proc.stdout
if stream.peek(1):
done = True
break
with subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode
) as proc:
stream = proc.stdout
if stream.peek(1):
done = True
return stream

if not done:
error = proc.stderr.read().decode("utf-8", errors="ignore").strip()
with subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False
) as proc:
error = proc.stderr.read().decode("utf-8", errors="ignore").strip()
raise ValueError(
f"{path} couldn't be opened with AIS binary after {num_retries} attempts because of the following exception: {error}"
f"{path} couldn't be opened with AIS binary "
f"after {num_retries} attempts because of the following exception: {error}"
)

return stream
return None


def open_best(path: str, mode: str = "rb"):
"""Open a file using the best available method (Lhotse, datastore binary, or standard open).

Args:
path: path to the file or datastore object
mode: file opening mode (default: "rb")

Returns:
File-like object
"""
if LHOTSE_AVAILABLE:
return lhotse_open_best(path, mode=mode)
if is_datastore_path(path):
return open_datastore_object_with_binary(path)
return open(path, mode=mode)
return open(path, mode=mode, encoding='utf-8' if 'b' not in mode else None)


def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -> str:
Expand All @@ -243,7 +259,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -
Args:
path: path to an object
force: force download, even if a local file exists
num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism
num_retries: number of retries if the get command fails with ais binary,
as AIS Python SDK has its own retry mechanism

Returns:
Local path of the object.
Expand All @@ -264,9 +281,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -

return local_path

else:
# Assume the file is local
return path
# Assume the file is local
return path


class DataStoreObject:
Expand Down Expand Up @@ -342,7 +358,7 @@ def datastore_object_get(store_object: DataStoreObject) -> bool:
return store_object.get() is not None


def wds_url_opener(
def wds_url_opener( # pylint: disable=unused-argument
data: Iterable[Dict[str, Any]],
handler: Callable[[Exception], bool],
**kw: Dict[str, Any],
Expand All @@ -355,7 +371,7 @@ def wds_url_opener(
Args:
data: Iterator over dict(url=...).
handler: Exception handler.
**kw: Keyword arguments for gopen.gopen.
**kw: Keyword arguments for gopen.gopen (unused, kept for API compatibility).

Yields:
A stream of url+stream pairs.
Expand All @@ -368,8 +384,7 @@ def wds_url_opener(
stream = open_best(url, mode="rb")
sample.update(stream=stream)
yield sample
except Exception as exn:
except Exception as exn: # pylint: disable=broad-exception-caught
if handler(exn):
continue
else:
break
break
Loading