Skip to content

Commit d56ed70

Browse files
authored
♻️ nexent-data-process now takes less memory, elasticsearch takes more available memory
2 parents e02da79 + d90b04b commit d56ed70

File tree

9 files changed

+1444
-43
lines changed

9 files changed

+1444
-43
lines changed

backend/consts/const.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,14 @@
106106
RAY_DASHBOARD_PORT = int(os.getenv("RAY_DASHBOARD_PORT", "8265"))
107107
RAY_DASHBOARD_HOST = os.getenv("RAY_DASHBOARD_HOST", "0.0.0.0")
108108
RAY_NUM_CPUS = os.getenv("RAY_NUM_CPUS")
109-
RAY_PLASMA_DIRECTORY = os.getenv("RAY_PLASMA_DIRECTORY", "/tmp")
110109
RAY_OBJECT_STORE_MEMORY_GB = float(
111-
os.getenv("RAY_OBJECT_STORE_MEMORY_GB", "2.0"))
110+
os.getenv("RAY_OBJECT_STORE_MEMORY_GB", "0.25"))
112111
RAY_TEMP_DIR = os.getenv("RAY_TEMP_DIR", "/tmp/ray")
113112
RAY_LOG_LEVEL = os.getenv("RAY_LOG_LEVEL", "INFO").upper()
113+
# Disable plasma preallocation to reduce idle memory usage
114+
# When set to false, Ray will allocate object store memory on-demand instead of preallocating
115+
RAY_preallocate_plasma = os.getenv(
116+
"RAY_preallocate_plasma", "false").lower() == "true"
114117

115118

116119
# Service Control Flags

backend/data_process/ray_config.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import ray
1010

1111
from consts.const import (
12-
RAY_NUM_CPUS,
1312
RAY_OBJECT_STORE_MEMORY_GB,
14-
RAY_PLASMA_DIRECTORY,
1513
RAY_TEMP_DIR,
14+
RAY_preallocate_plasma,
1615
)
1716

1817
logger = logging.getLogger("data_process.ray_config")
@@ -25,9 +24,9 @@ class RayConfig:
2524
"""Ray configuration manager"""
2625

2726
def __init__(self):
28-
self.plasma_directory = RAY_PLASMA_DIRECTORY
2927
self.object_store_memory_gb = RAY_OBJECT_STORE_MEMORY_GB
3028
self.temp_dir = RAY_TEMP_DIR
29+
self.preallocate_plasma = RAY_preallocate_plasma
3130

3231
def get_init_params(
3332
self,
@@ -52,7 +51,6 @@ def get_init_params(
5251
"""
5352
params = {
5453
"ignore_reinit_error": True,
55-
"_plasma_directory": self.plasma_directory,
5654
}
5755

5856
if address:
@@ -70,9 +68,16 @@ def get_init_params(
7068
# Temp directory configuration
7169
params["_temp_dir"] = self.temp_dir
7270

71+
# Object spilling directory (stable API)
72+
# This allows Ray to spill objects to disk when memory is full
73+
params["object_spilling_directory"] = self.temp_dir
74+
7375
# Dashboard configuration
76+
# Always pass include_dashboard explicitly because Ray's default is True.
77+
# If we omit this parameter when include_dashboard is False,
78+
# Ray will still start the dashboard by default.
79+
params["include_dashboard"] = include_dashboard
7480
if include_dashboard:
75-
params["include_dashboard"] = True
7681
params["dashboard_host"] = dashboard_host
7782
params["dashboard_port"] = dashboard_port
7883

@@ -93,30 +98,49 @@ def init_ray(self, **kwargs) -> bool:
9398
logger.info("Ray already initialized, skipping...")
9499
return True
95100

101+
# Set RAY_preallocate_plasma environment variable before initialization
102+
# Ray reads this environment variable during initialization
103+
os.environ["RAY_preallocate_plasma"] = str(
104+
self.preallocate_plasma).lower()
105+
96106
params = self.get_init_params(**kwargs)
97107

98108
# Log the attempt to initialize
99-
logger.debug("Initializing Ray cluster...")
100-
logger.debug("Ray configuration parameters:")
109+
logger.info("Initializing Ray cluster...")
110+
logger.info("Ray memory optimization configuration:")
111+
logger.info(
112+
f" RAY_preallocate_plasma: {self.preallocate_plasma}")
113+
logger.info(
114+
f" Object store memory: {self.object_store_memory_gb} GB")
101115
for key, value in params.items():
102116
if key.startswith('_'):
103117
logger.debug(f" {key}: {value}")
104118
elif key == 'object_store_memory':
105-
logger.debug(f" {key}: {value / (1024 ** 3):.1f} GB")
119+
logger.info(f" {key}: {value / (1024 ** 3):.2f} GB")
120+
elif key == 'object_spilling_directory':
121+
logger.info(f" {key}: {value}")
106122
else:
107123
logger.debug(f" {key}: {value}")
108124

109125
ray.init(**params)
110126
logger.info("✅ Ray initialization successful")
111127

112-
# Display cluster information
128+
# Display cluster information and verify memory configuration
113129
try:
114130
if hasattr(ray, 'cluster_resources'):
115131
resources = ray.cluster_resources()
116-
logger.debug(f"Ray cluster resources: {resources}")
132+
logger.info(f"Ray cluster resources: {resources}")
133+
134+
# Log memory-related resources
135+
if 'memory' in resources:
136+
logger.info(
137+
f" Total cluster memory: {resources['memory'] / (1024**3):.2f} GB")
138+
if 'object_store_memory' in resources:
139+
logger.info(
140+
f" Object store memory: {resources['object_store_memory'] / (1024**3):.2f} GB")
117141
except Exception as e:
118-
logger.error(
119-
f"Failed to get cluster resources information: {e}")
142+
logger.warning(
143+
f"Could not retrieve cluster resources information: {e}")
120144

121145
return True
122146

@@ -139,9 +163,17 @@ def connect_to_cluster(self, address: str = "auto") -> bool:
139163
logger.debug("Ray already initialized, skipping...")
140164
return True
141165

166+
# Set RAY_preallocate_plasma environment variable before initialization
167+
# Note: When connecting to existing cluster, this setting may not take effect
168+
# as the cluster was already initialized with its own settings
169+
os.environ["RAY_preallocate_plasma"] = str(
170+
self.preallocate_plasma).lower()
171+
142172
params = self.get_init_params(address=address)
143173

144174
logger.debug(f"Connecting to Ray cluster: {address}")
175+
logger.debug(
176+
f" RAY_preallocate_plasma: {self.preallocate_plasma}")
145177
ray.init(**params)
146178
logger.info("✅ Successfully connected to Ray cluster")
147179

@@ -183,6 +215,7 @@ def log_configuration(self):
183215
logger.debug(f" Plasma directory: {self.plasma_directory}")
184216
logger.debug(f" ObjectStore memory: {self.object_store_memory_gb} GB")
185217
logger.debug(f" Temp directory: {self.temp_dir}")
218+
logger.debug(f" Preallocate plasma: {self.preallocate_plasma}")
186219

187220
@classmethod
188221
def init_ray_for_worker(cls, address: str = "auto") -> bool:

backend/data_process/tasks.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
REDIS_BACKEND_URL,
2323
FORWARD_REDIS_RETRY_DELAY_S,
2424
FORWARD_REDIS_RETRY_MAX,
25+
DISABLE_RAY_DASHBOARD,
2526
)
2627

2728

@@ -36,24 +37,28 @@ def init_ray_in_worker():
3637
Initializes Ray within a Celery worker, ensuring it is done only once.
3738
This function is designed to be called from within a task.
3839
"""
39-
if not ray.is_initialized():
40-
logger.info(
41-
"Ray not initialized. Initializing Ray for Celery worker...")
42-
try:
43-
# `configure_logging=False` prevents Ray from setting up its own loggers,
44-
# which can interfere with Celery's logging.
45-
# `faulthandler=False` is critical to prevent the `AttributeError: 'LoggingProxy' object has no attribute 'fileno'`
46-
# error when running inside a Celery worker.
47-
ray.init(
48-
configure_logging=False,
49-
faulthandler=False
50-
)
51-
logger.info("Ray initialized successfully for Celery worker.")
52-
except Exception as e:
53-
logger.error(f"Failed to initialize Ray for Celery worker: {e}")
54-
raise
55-
else:
40+
if ray.is_initialized():
5641
logger.debug("Ray is already initialized.")
42+
return
43+
44+
logger.info("Ray not initialized. Initializing Ray for Celery worker...")
45+
try:
46+
# `configure_logging=False` prevents Ray from setting up its own loggers,
47+
# which can interfere with Celery's logging.
48+
# `faulthandler=False` is critical to prevent the
49+
# `AttributeError: 'LoggingProxy' object has no attribute 'fileno'`
50+
# error when running inside a Celery worker.
51+
# We also explicitly control the Ray dashboard behavior here to ensure
52+
# that Celery workers respect the global DISABLE_RAY_DASHBOARD setting.
53+
ray.init(
54+
configure_logging=False,
55+
faulthandler=False,
56+
include_dashboard=not DISABLE_RAY_DASHBOARD,
57+
)
58+
logger.info("Ray initialized successfully for Celery worker.")
59+
except Exception as e:
60+
logger.error(f"Failed to initialize Ray for Celery worker: {e}")
61+
raise
5762

5863

5964
def run_async(coro):

backend/data_process/worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
ELASTICSEARCH_SERVICE,
4141
QUEUES,
4242
RAY_ADDRESS,
43-
RAY_PLASMA_DIRECTORY,
43+
RAY_preallocate_plasma,
4444
REDIS_URL,
4545
WORKER_CONCURRENCY,
4646
WORKER_NAME,
@@ -94,13 +94,16 @@ def setup_worker_environment(**kwargs):
9494
ray_address = RAY_ADDRESS
9595

9696
try:
97+
os.environ["RAY_preallocate_plasma"] = str(
98+
RAY_preallocate_plasma).lower()
99+
97100
# Initialize Ray using the centralized RayConfig helper
98101
if not RayConfig.init_ray_for_worker(ray_address):
102+
logger.warning("Warning: fallback to direct ray.init")
99103
# Fallback to direct ray.init if helper fails
100104
ray.init(
101105
address=ray_address,
102106
ignore_reinit_error=True,
103-
_plasma_directory=RAY_PLASMA_DIRECTORY
104107
)
105108

106109
logger.info(

backend/data_process_service.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from consts.const import (
2020
REDIS_URL, REDIS_PORT, FLOWER_PORT, RAY_DASHBOARD_PORT, RAY_DASHBOARD_HOST,
2121
RAY_ACTOR_NUM_CPUS, RAY_NUM_CPUS, DISABLE_RAY_DASHBOARD, DISABLE_CELERY_FLOWER,
22-
DOCKER_ENVIRONMENT
22+
DOCKER_ENVIRONMENT, RAY_OBJECT_STORE_MEMORY_GB, RAY_preallocate_plasma, RAY_TEMP_DIR
2323
)
2424

2525
# Load environment variables
@@ -111,16 +111,30 @@ def start_ray_cluster(self):
111111
if not success:
112112
# Fallback to direct Ray initialization
113113
try:
114+
# Set RAY_preallocate_plasma environment variable before initialization
115+
os.environ["RAY_preallocate_plasma"] = str(
116+
RAY_preallocate_plasma).lower()
117+
118+
# Calculate object store memory in bytes
119+
object_store_memory = int(
120+
RAY_OBJECT_STORE_MEMORY_GB * 1024 * 1024 * 1024)
121+
122+
logger.info(
123+
f"Fallback: Initializing Ray with object_store_memory={RAY_OBJECT_STORE_MEMORY_GB}GB, preallocate_plasma={RAY_preallocate_plasma}")
124+
114125
ray.init(
115126
num_cpus=num_cpus,
116-
_plasma_directory="/tmp",
127+
object_store_memory=object_store_memory,
128+
_temp_dir=RAY_TEMP_DIR,
129+
object_spilling_directory=RAY_TEMP_DIR,
117130
include_dashboard=include_dashboard,
118131
dashboard_host=dashboard_host,
119132
dashboard_port=self.ray_dashboard_port,
120133
ignore_reinit_error=True
121134
)
122135
success = True
123-
except Exception:
136+
except Exception as e:
137+
logger.error(f"Fallback Ray initialization failed: {e}")
124138
success = False
125139

126140
if success:

docker/.env.example

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ ELASTICSEARCH_HOST=http://nexent-elasticsearch:9200
2727
ELASTIC_PASSWORD=nexent@2025
2828

2929
# Elasticsearch Memory Configuration
30-
ES_JAVA_OPTS="-Xms1g -Xmx1g"
30+
ES_JAVA_OPTS="-Xms2g -Xmx2g"
3131

3232
# Elasticsearch Disk Watermark Configuration
3333
ES_DISK_WATERMARK_LOW=85%
@@ -111,15 +111,14 @@ FLOWER_PORT=5555
111111
RAY_ACTOR_NUM_CPUS=2
112112
RAY_DASHBOARD_PORT=8265
113113
RAY_DASHBOARD_HOST=0.0.0.0
114-
RAY_NUM_CPUS=
115-
RAY_PLASMA_DIRECTORY=/tmp
116-
RAY_OBJECT_STORE_MEMORY_GB=2.0
114+
RAY_NUM_CPUS=4
115+
RAY_OBJECT_STORE_MEMORY_GB=0.25
117116
RAY_TEMP_DIR=/tmp/ray
118117
RAY_LOG_LEVEL=INFO
119118

120119
# Service Control Flags
121-
DISABLE_RAY_DASHBOARD=false
122-
DISABLE_CELERY_FLOWER=false
120+
DISABLE_RAY_DASHBOARD=true
121+
DISABLE_CELERY_FLOWER=true
123122
DOCKER_ENVIRONMENT=false
124123

125124
# Celery Configuration

0 commit comments

Comments
 (0)