Skip to content

Commit ae290f7

Browse files
authored
[feature] op-level isolated environment spec in ray mode (#892)
* + add basic OPEnvSpec and its test cases * + add OPEnvManager and its test cases * + add reporting of the current states of OPEnvManager * + integrate OPEnvManager into ray dataset + add placeholder test class for load_ops + remove unused functions * + add auto-analyzing and recommendation for OP env requirements according to the LazyLoader usage + add corresponding test cases * + add corresponding docs in dev docs * * modification according to gemini's comments * + add a debug log * + add a debug log * + add a debug log * - remove trouble cases * * refine according to dy's comments * * strip after getting the package name and url * * strip after getting the package name and url * * fix a test case
1 parent 22fea0c commit ae290f7

File tree

17 files changed

+1643
-18
lines changed

17 files changed

+1643
-18
lines changed

data_juicer/config/config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,28 @@ def init_configs(args: Optional[List[str]] = None, which_entry: object = None, l
548548
"meta will be involved. Only available when filter_list_to_mine "
549549
"is true.",
550550
)
551+
parser.add_argument(
552+
"--min_common_dep_num_to_combine",
553+
type=int,
554+
default=-1,
555+
help="The minimum number of common dependencies required to determine whether to merge two operation "
556+
"environment specifications. If set to -1, it means no combination of operation environments, where "
557+
"every OP has its own runtime environment during processing without any merging. If set to >= 0, "
558+
"environments of OPs that share at least min_common_dep_num_to_combine common dependencies will be "
559+
"merged. It will open the operator environment manager to automatically analyze and merge runtime "
560+
"environment for different OPs. It helps different OPs share and reuse the same runtime environment to "
561+
"reduce resource utilization. It's -1 in default. Only available in ray mode. ",
562+
)
563+
parser.add_argument(
564+
"--conflict_resolve_strategy",
565+
type=str,
566+
default="split",
567+
choices=["split", "overwrite", "latest"],
568+
help="Strategy for resolving dependency conflicts, default is 'split' strategy. 'split': Keep the two "
569+
"specs split when there is a conflict. 'overwrite': Overwrite the existing dependency with one "
570+
"from the later OP. 'latest': Use the latest version of all specified dependency versions. "
571+
"Only available when min_common_dep_num_to_combine >= 0.",
572+
)
551573
parser.add_argument(
552574
"--op_fusion",
553575
type=bool,

data_juicer/core/data/ray_dataset.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from data_juicer.ops.base_op import DEFAULT_BATCH_SIZE, TAGGING_OPS
1818
from data_juicer.utils.constant import Fields
1919
from data_juicer.utils.file_utils import is_remote_path
20-
from data_juicer.utils.resource_utils import cuda_device_count
2120
from data_juicer.utils.webdataset_utils import _custom_default_decoder
2221

2322

@@ -86,13 +85,6 @@ def preprocess_dataset(dataset: ray.data.Dataset, dataset_path, cfg) -> ray.data
8685
return dataset
8786

8887

89-
def get_num_gpus(op, op_proc):
90-
if not op.use_cuda():
91-
return 0
92-
proc_per_gpu = op_proc / cuda_device_count()
93-
return 1.0 / proc_per_gpu
94-
95-
9688
def filter_batch(batch, filter_func):
9789
mask = pyarrow.array(filter_func(batch.to_pydict()))
9890
return batch.filter(mask)
@@ -199,7 +191,20 @@ def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -
199191
cached_columns = set(columns_result)
200192

201193
for op in operators:
202-
cached_columns = self._run_single_op(op, cached_columns, tracer=tracer)
194+
try:
195+
cached_columns = self._run_single_op(op, cached_columns, tracer=tracer)
196+
except Exception as e:
197+
logger.error(f"Error processing operator {op}: {e}.")
198+
if op.runtime_env is not None:
199+
logger.error("Try to fallback to the base runtime environment.")
200+
original_runtime_env = op.runtime_env
201+
try:
202+
op.runtime_env = None
203+
cached_columns = self._run_single_op(op, cached_columns, tracer=tracer)
204+
finally:
205+
op.runtime_env = original_runtime_env
206+
else:
207+
raise e
203208
return self
204209

205210
def _run_single_op(self, op, cached_columns=None, tracer=None):

data_juicer/core/executor/ray_executor.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from data_juicer.core.executor.event_logging_mixin import EventLoggingMixin
1414
from data_juicer.core.ray_exporter import RayExporter
1515
from data_juicer.core.tracer.ray_tracer import RayTracer
16-
from data_juicer.ops import load_ops
16+
from data_juicer.ops import OPEnvManager, load_ops
1717
from data_juicer.ops.op_fusion import fuse_operators
1818
from data_juicer.utils.lazy_loader import LazyLoader
1919

@@ -122,6 +122,15 @@ def __init__(self, cfg: Optional[Namespace] = None):
122122
trace_keys=self.cfg.trace_keys,
123123
)
124124

125+
# setup OPEnvManager
126+
self.op_env_manager = None
127+
if self.cfg.min_common_dep_num_to_combine >= 0:
128+
logger.info("Preparing OPEnvManager...")
129+
self.op_env_manager = OPEnvManager(
130+
min_common_dep_num_to_combine=self.cfg.min_common_dep_num_to_combine,
131+
conflict_resolve_strategy=self.cfg.conflict_resolve_strategy,
132+
)
133+
125134
def run(self, load_data_np: Optional[PositiveInt] = None, skip_export: bool = False, skip_return: bool = False):
126135
"""
127136
Running the dataset process pipeline
@@ -138,7 +147,7 @@ def run(self, load_data_np: Optional[PositiveInt] = None, skip_export: bool = Fa
138147

139148
# 2. extract processes
140149
logger.info("Preparing process operators...")
141-
ops = load_ops(self.cfg.process)
150+
ops = load_ops(self.cfg.process, self.op_env_manager)
142151

143152
# Initialize DAG execution planning (pass ops to avoid redundant loading)
144153
self._initialize_dag_execution(self.cfg, ops=ops)

data_juicer/ops/__init__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ def timing_context(description):
3030
Selector,
3131
)
3232
from .load import load_ops
33+
from .op_env import (
34+
OPEnvManager,
35+
OPEnvSpec,
36+
analyze_lazy_loaded_requirements,
37+
analyze_lazy_loaded_requirements_for_code_file,
38+
op_requirements_to_op_env_spec,
39+
)
3340

3441
__all__ = [
3542
'load_ops',
@@ -43,5 +50,10 @@ def timing_context(description):
4350
'NON_STATS_FILTERS',
4451
'OPERATORS',
4552
'TAGGING_OPS',
46-
'Pipeline'
53+
'Pipeline',
54+
'OPEnvSpec',
55+
'op_requirements_to_op_env_spec',
56+
'OPEnvManager',
57+
'analyze_lazy_loaded_requirements',
58+
'analyze_lazy_loaded_requirements_for_code_file',
4759
]

data_juicer/ops/base_op.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
from data_juicer.utils.registry import Registry
1414
from data_juicer.utils.resource_utils import is_cuda_available
1515

16+
from .op_env import (
17+
OPEnvSpec,
18+
analyze_lazy_loaded_requirements_for_code_file,
19+
op_requirements_to_op_env_spec,
20+
)
21+
1622
OPERATORS = Registry("Operators")
1723
UNFORKABLE = Registry("Unforkable")
1824
NON_STATS_FILTERS = Registry("Non-stats Filters")
@@ -281,9 +287,20 @@ def __call__(cls, *args, **kwargs):
281287

282288

283289
class OP(metaclass=OPMetaClass):
290+
# the name of this operator. Automatically set by the registry
291+
_name = ""
292+
293+
# the accelerator to run this operator. Either "cpu" or "cuda"
284294
_accelerator = "cpu"
295+
296+
# whether this operator is a batched operator
285297
_batched_op = False
286298

299+
# extra requirements for this operator. Should be:
300+
# 1. a list of packages
301+
# 2. a string of the path to the requirements.txt file
302+
_requirements = None
303+
287304
def __init__(self, *args, **kwargs):
288305
"""
289306
Base class of operators.
@@ -419,6 +436,12 @@ def __init__(self, *args, **kwargs):
419436
method = wrap_func_with_nested_access(method)
420437
setattr(self, name, method)
421438

439+
def get_env_spec(self) -> OPEnvSpec:
440+
import inspect
441+
442+
auto_analyzed_requirements = analyze_lazy_loaded_requirements_for_code_file(inspect.getfile(self.__class__))
443+
return op_requirements_to_op_env_spec(self._name, self._requirements, auto_analyzed_requirements)
444+
422445
def use_auto_proc(self):
423446
if is_ray_mode() and not self.use_ray_actor(): # ray task
424447
return self.num_proc == -1

data_juicer/ops/load.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from .base_op import OPERATORS
22

33

4-
def load_ops(process_list):
4+
def load_ops(process_list, op_env_manager=None):
55
"""
66
Load op list according to the process list from config file.
77
88
:param process_list: A process list. Each item is an op name and its
99
arguments.
10+
:param op_env_manager: The OPEnvManager to try to merge environment specs of different OPs that have common
11+
dependencies. Only available when min_common_dep_num_to_combine >= 0.
1012
:return: The op instance list.
1113
"""
1214
ops = []
@@ -21,4 +23,20 @@ def load_ops(process_list):
2123
for op_cfg, op in zip(new_process_list, ops):
2224
op._op_cfg = op_cfg
2325

26+
# update op runtime environment if OPEnvManager is enabled
27+
if op_env_manager:
28+
# first round: record and merge possible common env specs
29+
for op in ops:
30+
op_name = op._name
31+
op_env_spec = op.get_env_spec()
32+
op_env_manager.record_op_env_spec(op_name, op_env_spec)
33+
# second round: update op runtime environment
34+
for op in ops:
35+
op_name = op._name
36+
op_env_spec = op_env_manager.get_op_env_spec(op_name)
37+
op._requirements = op_env_spec.pip_pkgs
38+
# if the runtime_env is not set for this OP, update the runtime_env as well
39+
if op.runtime_env is None:
40+
op.runtime_env = op_env_spec.to_dict()
41+
2442
return ops

data_juicer/ops/mapper/image_tagging_mapper.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import numpy as np
44

55
from data_juicer.utils.constant import Fields, MetaKeys
6+
from data_juicer.utils.lazy_loader import LazyLoader
67
from data_juicer.utils.mm_utils import load_data_with_context, load_image
78
from data_juicer.utils.model_utils import get_model, prepare_model, ram, torch
89

@@ -38,6 +39,7 @@ def __init__(self, tag_field_name: str = MetaKeys.image_tags, *args, **kwargs):
3839
"""
3940
kwargs["memory"] = "9GB" if kwargs.get("memory", 0) == 0 else kwargs["memory"]
4041
super().__init__(*args, **kwargs)
42+
LazyLoader.check_packages(["ram @ git+https://github.com/datajuicer/recognize-anything.git"])
4143
self.model_key = prepare_model(
4244
model_type="recognizeAnything", pretrained_model_name_or_path="ram_plus_swin_large_14m.pth", input_size=384
4345
)

data_juicer/ops/mapper/video_tagging_from_frames_mapper.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pydantic import PositiveInt
55

66
from data_juicer.utils.constant import Fields, MetaKeys
7+
from data_juicer.utils.lazy_loader import LazyLoader
78
from data_juicer.utils.mm_utils import (
89
close_video,
910
extract_key_frames,
@@ -72,6 +73,7 @@ def __init__(
7273
"""
7374
kwargs["memory"] = "9GB" if kwargs.get("memory", 0) == 0 else kwargs["memory"]
7475
super().__init__(*args, **kwargs)
76+
LazyLoader.check_packages(["ram @ git+https://github.com/datajuicer/recognize-anything.git"])
7577
if frame_sampling_method not in ["all_keyframes", "uniform"]:
7678
raise ValueError(
7779
f"Frame sampling method [{frame_sampling_method}] is not "

0 commit comments

Comments
 (0)