Skip to content

Commit d608ba7

Browse files
Execute with subprocess list (#15165) (#15257)
* Execute with subprocess list * fix pylint issues --------- Signed-off-by: nithinraok <[email protected]> Signed-off-by: Charlie Truong <[email protected]> Co-authored-by: Nithin Rao <[email protected]>
1 parent 5fac9aa commit d608ba7

File tree

1 file changed

+49
-34
lines changed

1 file changed

+49
-34
lines changed

nemo/utils/data_utils.py

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
"""Utility functions for handling data operations, including datastore access and caching."""
16+
1517
import os
1618
import pathlib
1719
import shutil
@@ -77,10 +79,9 @@ def is_datastore_cache_shared() -> bool:
7779

7880
if cache_shared == 0:
7981
return False
80-
elif cache_shared == 1:
82+
if cache_shared == 1:
8183
return True
82-
else:
83-
raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}')
84+
raise ValueError(f'Unexpected value of env {constants.NEMO_ENV_DATA_STORE_CACHE_SHARED}')
8485

8586

8687
def ais_cache_base() -> str:
@@ -150,11 +151,10 @@ def ais_binary() -> str:
150151
if os.path.isfile(default_path):
151152
logging.info('ais available at the default path: %s', default_path, mode=LogMode.ONCE)
152153
return default_path
153-
else:
154-
logging.warning(
155-
f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE
156-
)
157-
return None
154+
logging.warning(
155+
f'AIS binary not found with `which ais` and at the default path {default_path}.', mode=LogMode.ONCE
156+
)
157+
return None
158158

159159

160160
def datastore_path_to_local_path(store_path: str) -> str:
@@ -185,7 +185,8 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5):
185185
186186
Args:
187187
path: path to an object
188-
num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism
188+
num_retries: number of retries if the get command fails with ais binary,
189+
as AIS Python SDK has its own retry mechanism
189190
190191
Returns:
191192
File-like object that supports read()
@@ -200,39 +201,54 @@ def open_datastore_object_with_binary(path: str, num_retries: int = 5):
200201

201202
if not binary:
202203
raise RuntimeError(
203-
f"AIS binary is not found, cannot resolve {path}. Please either install it or install Lhotse with `pip install lhotse`.\n"
204-
"Lhotse's native open_best supports AIS Python SDK, which is the recommended way to operate with the data from AIStore.\n"
205-
"See AIS binary installation instructions at https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n"
204+
f"AIS binary is not found, cannot resolve {path}. "
205+
"Please either install it or install Lhotse with `pip install lhotse`.\n"
206+
"Lhotse's native open_best supports AIS Python SDK, "
207+
"which is the recommended way to operate with the data from AIStore.\n"
208+
"See AIS binary installation instructions at "
209+
"https://github.com/NVIDIA/aistore?tab=readme-ov-file#install-from-release-binaries.\n"
206210
)
207211

208-
cmd = f'{binary} get {path} -'
212+
cmd = [binary, 'get', path, '-']
209213

210214
done = False
211215

212216
for _ in range(num_retries):
213-
proc = subprocess.Popen(
214-
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode
215-
)
216-
stream = proc.stdout
217-
if stream.peek(1):
218-
done = True
219-
break
217+
with subprocess.Popen(
218+
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False # bytes mode
219+
) as proc:
220+
stream = proc.stdout
221+
if stream.peek(1):
222+
done = True
223+
return stream
220224

221225
if not done:
222-
error = proc.stderr.read().decode("utf-8", errors="ignore").strip()
226+
with subprocess.Popen(
227+
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=False
228+
) as proc:
229+
error = proc.stderr.read().decode("utf-8", errors="ignore").strip()
223230
raise ValueError(
224-
f"{path} couldn't be opened with AIS binary after {num_retries} attempts because of the following exception: {error}"
231+
f"{path} couldn't be opened with AIS binary "
232+
f"after {num_retries} attempts because of the following exception: {error}"
225233
)
226-
227-
return stream
234+
return None
228235

229236

230237
def open_best(path: str, mode: str = "rb"):
238+
"""Open a file using the best available method (Lhotse, datastore binary, or standard open).
239+
240+
Args:
241+
path: path to the file or datastore object
242+
mode: file opening mode (default: "rb")
243+
244+
Returns:
245+
File-like object
246+
"""
231247
if LHOTSE_AVAILABLE:
232248
return lhotse_open_best(path, mode=mode)
233249
if is_datastore_path(path):
234250
return open_datastore_object_with_binary(path)
235-
return open(path, mode=mode)
251+
return open(path, mode=mode, encoding='utf-8' if 'b' not in mode else None)
236252

237253

238254
def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -> str:
@@ -243,7 +259,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -
243259
Args:
244260
path: path to an object
245261
force: force download, even if a local file exists
246-
num_retries: number of retries if the get command fails with ais binary, as AIS Python SDK has its own retry mechanism
262+
num_retries: number of retries if the get command fails with ais binary,
263+
as AIS Python SDK has its own retry mechanism
247264
248265
Returns:
249266
Local path of the object.
@@ -264,9 +281,8 @@ def get_datastore_object(path: str, force: bool = False, num_retries: int = 5) -
264281

265282
return local_path
266283

267-
else:
268-
# Assume the file is local
269-
return path
284+
# Assume the file is local
285+
return path
270286

271287

272288
class DataStoreObject:
@@ -342,7 +358,7 @@ def datastore_object_get(store_object: DataStoreObject) -> bool:
342358
return store_object.get() is not None
343359

344360

345-
def wds_url_opener(
361+
def wds_url_opener( # pylint: disable=unused-argument
346362
data: Iterable[Dict[str, Any]],
347363
handler: Callable[[Exception], bool],
348364
**kw: Dict[str, Any],
@@ -355,7 +371,7 @@ def wds_url_opener(
355371
Args:
356372
data: Iterator over dict(url=...).
357373
handler: Exception handler.
358-
**kw: Keyword arguments for gopen.gopen.
374+
**kw: Keyword arguments for gopen.gopen (unused, kept for API compatibility).
359375
360376
Yields:
361377
A stream of url+stream pairs.
@@ -368,8 +384,7 @@ def wds_url_opener(
368384
stream = open_best(url, mode="rb")
369385
sample.update(stream=stream)
370386
yield sample
371-
except Exception as exn:
387+
except Exception as exn: # pylint: disable=broad-exception-caught
372388
if handler(exn):
373389
continue
374-
else:
375-
break
390+
break

0 commit comments

Comments
 (0)