|
22 | 22 |
|
23 | 23 | import torch |
24 | 24 |
|
| 25 | +from lightning.data.constants import _IS_IN_STUDIO, _TORCH_GREATER_EQUAL_2_1_0 |
25 | 26 | from lightning.data.processing.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe |
| 27 | +from lightning.data.processing.dns import optimize_dns_context |
26 | 28 | from lightning.data.processing.readers import BaseReader |
27 | | -from lightning.data.streaming.constants import _IS_IN_STUDIO, _TORCH_GREATER_EQUAL_2_1_0 |
28 | 29 | from lightning.data.streaming.resolver import ( |
29 | 30 | Dir, |
30 | 31 | _assert_dir_has_index_file, |
@@ -218,7 +219,8 @@ def map( |
218 | 219 | weights=weights, |
219 | 220 | reader=reader, |
220 | 221 | ) |
221 | | - return data_processor.run(LambdaDataTransformRecipe(fn, inputs)) |
| 222 | + with optimize_dns_context(True): |
| 223 | + return data_processor.run(LambdaDataTransformRecipe(fn, inputs)) |
222 | 224 | return _execute( |
223 | 225 | f"data-prep-map-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}", |
224 | 226 | num_nodes, |
@@ -303,15 +305,18 @@ def optimize( |
303 | 305 | reorder_files=reorder_files, |
304 | 306 | reader=reader, |
305 | 307 | ) |
306 | | - return data_processor.run( |
307 | | - LambdaDataChunkRecipe( |
308 | | - fn, |
309 | | - inputs, |
310 | | - chunk_size=chunk_size, |
311 | | - chunk_bytes=chunk_bytes, |
312 | | - compression=compression, |
| 308 | + |
| 309 | + with optimize_dns_context(True): |
| 310 | + data_processor.run( |
| 311 | + LambdaDataChunkRecipe( |
| 312 | + fn, |
| 313 | + inputs, |
| 314 | + chunk_size=chunk_size, |
| 315 | + chunk_bytes=chunk_bytes, |
| 316 | + compression=compression, |
| 317 | + ) |
313 | 318 | ) |
314 | | - ) |
| 319 | + return None |
315 | 320 | return _execute( |
316 | 321 | f"data-prep-optimize-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}", |
317 | 322 | num_nodes, |
|
0 commit comments