diff --git a/nemo/utils/data_utils.py b/nemo/utils/data_utils.py index 5fd1f4978002..56c27b556347 100644 --- a/nemo/utils/data_utils.py +++ b/nemo/utils/data_utils.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Utility functions for handling data operations, including datastore access and caching.""" + import os import pathlib import shutil @@ -77,10 +79,9 @@ def is_datastore_cache_shared() -> bool: if cache_shared == 0: return False - elif cache_shared == 1: + if cache_shared == 1: return True - else: - raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}') + raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}') def ais_cache_base() -> str: @@ -150,11 +151,10 @@ def ais_binary() -> str: if os.path.isfile(default_path): logging.info('ais available at the default path: %s', default_path, mode=LogMode.ONCE) return default_path - else: - logging.warning( - f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE - ) - return None + logging.warning( + f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE + ) + return None def datastore_path_to_local_path(store_path: str) -> str: @@ -185,7 +185,8 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5): Args: path: path to an object - num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism + num_retries: number of retries if the get command fails with ais binary, + as AIS Python SDK has its own retry mechanism Returns: File-like object that supports read() @@ -200,39 +201,54 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5): if not binary: raise RuntimeError( - f"AIS binary is not found, cannot resolve {path}. Please either install it or install Lhotse with `pip install lhotse`.\n" - "Lhotse's native open_best supports AIS Python SDK, which is the recommended way to operate with the data from AIStore.\n" - "See AIS binary installation instructions at https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n" + f"AIS binary is not found, cannot resolve {path}. " + "Please either install it or install Lhotse with `pip install lhotse`.\n" + "Lhotse's native open_best supports AIS Python SDK, " + "which is the recommended way to operate with the data from AIStore.\n" + "See AIS binary installation instructions at " + "https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n" ) - cmd = f'{binary} get {path} -' + cmd = [binary, 'get', path, '-'] done = False for _ in range(num_retries): - proc = subprocess.Popen( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode - ) - stream = proc.stdout - if stream.peek(1): - done = True - break + with subprocess.Popen( + cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode + ) as proc: + stream = proc.stdout + if stream.peek(1): + done = True + return stream if not done: - error = proc.stderr.read().decode("utf-8", errors="ignore").strip() + with subprocess.Popen( + cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False + ) as proc: + error = proc.stderr.read().decode("utf-8", errors="ignore").strip() raise ValueError( - f"{path} couldn't be opened with AIS binary after {num_retries} attempts because of the following exception: {error}" + f"{path} couldn't be opened with AIS binary " + f"after {num_retries} attempts because of the following exception: {error}" ) - - return stream + return None def open_best(path: str, mode: str = "rb"): + """Open a file using the best available method (Lhotse, datastore binary, or standard open). + + Args: + path: path to the file or datastore object + mode: file opening mode (default: "rb") + + Returns: + File-like object + """ if LHOTSE_AVAILABLE: return lhotse_open_best(path, mode=mode) if is_datastore_path(path): return open_datastore_object_with_binary(path) - return open(path, mode=mode) + return open(path, mode=mode, encoding='utf-8' if 'b' not in mode else None) def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -> str: @@ -243,7 +259,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) - Args: path: path to an object force: force download, even if a local file exists - num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism + num_retries: number of retries if the get command fails with ais binary, + as AIS Python SDK has its own retry mechanism Returns: Local path of the object. @@ -264,9 +281,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) - return local_path - else: - # Assume the file is local - return path + # Assume the file is local + return path class DataStoreObject: @@ -342,7 +358,7 @@ def datastore_object_get(store_object: DataStoreObject) -> bool: return store_object.get() is not None -def wds_url_opener( +def wds_url_opener( # pylint: disable=unused-argument data: Iterable[Dict[str, Any]], handler: Callable[[Exception], bool], **kw: Dict[str, Any], @@ -355,7 +371,7 @@ def wds_url_opener( Args: data: Iterator over dict(url=...). handler: Exception handler. - **kw: Keyword arguments for gopen.gopen. + **kw: Keyword arguments for gopen.gopen (unused, kept for API compatibility). Yields: A stream of url+stream pairs. @@ -368,8 +384,7 @@ def wds_url_opener( stream = open_best(url, mode="rb") sample.update(stream=stream) yield sample - except Exception as exn: + except Exception as exn: # pylint: disable=broad-exception-caught if handler(exn): continue - else: - break + break