Skip to content

Commit c80226d

Browse files
authored
map & operator: Add lightning.ai account creation info (#19418)
1 parent 0d8208f commit c80226d

File tree

1 file changed

+31
-5
lines changed

1 file changed

+31
-5
lines changed

src/lightning/data/streaming/functions.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import torch
2424

2525
from lightning.data.processing.readers import BaseReader
26-
from lightning.data.streaming.constants import _TORCH_GREATER_EQUAL_2_1_0
26+
from lightning.data.streaming.constants import _IS_IN_STUDIO, _TORCH_GREATER_EQUAL_2_1_0
2727
from lightning.data.streaming.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe
2828
from lightning.data.streaming.resolver import (
2929
Dir,
@@ -169,8 +169,8 @@ def map(
169169
output_dir: The folder where the processed data should be stored.
170170
num_workers: The number of workers to use during processing
171171
fast_dev_run: Whether to use process only a sub part of the inputs
172-
num_nodes: When doing remote execution, the number of nodes to use.
173-
machine: When doing remote execution, the machine to use.
172+
num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/.
173+
machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/.
174174
num_downloaders: The number of downloaders per worker.
175175
reorder_files: By default, reorders the files by file size to distribute work equally among all workers.
176176
Set this to ``False`` if the order in which samples are processed should be preserved.
@@ -183,6 +183,17 @@ def map(
183183
if len(inputs) == 0:
184184
raise ValueError(f"The provided inputs should be non empty. Found {inputs}.")
185185

186+
if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None):
187+
raise ValueError(
188+
"Only https://lightning.ai/ supports multiple nodes or selecting a machine."
189+
" Create an account to try it out.")
190+
191+
if not _IS_IN_STUDIO:
192+
print(
193+
"Create an account on https://lightning.ai/ to transform your data faster using "
194+
"multiple nodes and large machines."
195+
)
196+
186197
if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0:
187198
_output_dir: Dir = _resolve_dir(output_dir)
188199

@@ -242,8 +253,8 @@ def optimize(
242253
compression: The compression algorithm to use over the chunks.
243254
num_workers: The number of workers to use during processing
244255
fast_dev_run: Whether to use process only a sub part of the inputs
245-
num_nodes: When doing remote execution, the number of nodes to use.
246-
machine: When doing remote execution, the machine to use.
256+
num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/.
257+
machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/.
247258
num_downloaders: The number of downloaders per worker.
248259
reorder_files: By default, reorders the files by file size to distribute work equally among all workers.
249260
Set this to ``False`` if the order in which samples are processed should be preserved.
@@ -258,6 +269,18 @@ def optimize(
258269
if chunk_size is None and chunk_bytes is None:
259270
raise ValueError("Either `chunk_size` or `chunk_bytes` needs to be defined.")
260271

272+
if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None):
273+
raise ValueError(
274+
"Only https://lightning.ai/ supports multiple nodes or selecting a machine."
275+
"Create an account to try it out."
276+
)
277+
278+
if not _IS_IN_STUDIO:
279+
print(
280+
"Create an account on https://lightning.ai/ to optimize your data faster "
281+
"using multiple nodes and large machines."
282+
)
283+
261284
if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0:
262285
_output_dir: Dir = _resolve_dir(output_dir)
263286

@@ -312,6 +335,9 @@ def __init__(self, folder: str, max_workers: Optional[int] = os.cpu_count()) ->
312335
self.max_workers = max_workers or 1
313336
self.futures: List[concurrent.futures.Future] = []
314337

338+
if not _IS_IN_STUDIO:
339+
print("This method is optimized to run on https://lightning.ai/. Don't use it otherwise.")
340+
315341
def __iter__(self) -> Any:
316342
"""This function queues the folders to perform listdir across multiple workers."""
317343
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:

0 commit comments

Comments
 (0)