22import os
33import sys
44import time
5- from pathlib import Path
65
76import pipeline_nodes
87import ray
@@ -28,20 +27,26 @@ def main() -> None:
2827 logger .info ("load start" )
2928
3029 parallelism = int (os .environ .get ("PARALLELISM" , "6" ))
31- # Shutdown any existing Ray instance first to avoid conflicts.
32- if ray .is_initialized (): # type: ignore
33- ray .shutdown () # type: ignore
34- time .sleep (1 )
35- ray .init (
36- logging_level = "info" ,
37- num_cpus = parallelism ,
38- local_mode = True , # Run tasks inprocess to avoid worker version mismatches
39- ignore_reinit_error = True ,
40- runtime_env = None ,
41- ) # type: ignore
42-
43- dict_builder = base .DictResult ()
44- adapter = RayGraphAdapter (result_builder = dict_builder )
30+ mode = sys .argv [1 ] if len (sys .argv ) > 1 else ""
31+ use_ray = mode not in ("synthetic" , "local" )
32+
33+ if use_ray :
34+ # Shutdown any existing Ray instance first to avoid conflicts.
35+ if ray .is_initialized ():
36+ ray .shutdown ()
37+ time .sleep (1 )
38+
39+ ray .init (
40+ logging_level = "info" ,
41+ num_cpus = parallelism ,
42+ )
43+ dict_builder = base .DictResult ()
44+ adapter = RayGraphAdapter (result_builder = dict_builder )
45+ else :
46+ # Use simple non distributed adapter for tests
47+ dict_builder = base .DictResult ()
48+ adapter = base .SimplePythonGraphAdapter (result_builder = dict_builder )
49+
4550 load_type = str (os .environ .get ("LOAD_TYPE" , "incremental" ))
4651 logger .info ("load_type %s" , load_type )
4752 dr = (
0 commit comments