Skip to content

Commit acedf33

Browse files
committed
♻️ Backend code cleanup and import organization (data_process/) #1037
1 parent 4b57107 commit acedf33

File tree

6 files changed

+160
-92
lines changed

6 files changed

+160
-92
lines changed

backend/data_process/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
Celery application configuration for data processing tasks
33
"""
44
import logging
5+
56
from celery import Celery
6-
from consts.const import REDIS_URL, REDIS_BACKEND_URL, ELASTICSEARCH_SERVICE
77
from celery.backends.base import DisabledBackend
88

9+
from consts.const import ELASTICSEARCH_SERVICE, REDIS_BACKEND_URL, REDIS_URL
10+
911
# Configure logging
1012
logger = logging.getLogger("data_process.app")
1113

backend/data_process/ray_actors.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import logging
2+
from typing import Any, Dict, List, Optional
3+
24
import ray
3-
from typing import List, Dict, Any, Optional
45

5-
from nexent.data_process import DataProcessCore
6-
from database.attachment_db import get_file_stream
76
from consts.const import RAY_ACTOR_NUM_CPUS
7+
from database.attachment_db import get_file_stream
8+
from nexent.data_process import DataProcessCore
89

9-
logger = logging.getLogger(__name__)
10+
logger = logging.getLogger("data_process.ray_actors")
1011
# This now controls the number of CPUs requested by each DataProcessorRayActor instance.
1112
# It allows a single file processing task to potentially use more than one core if the
1213
# underlying processing library (e.g., unstructured) can leverage it.
@@ -22,7 +23,14 @@ def __init__(self):
2223
logger.info(f"Ray actor initialized using {RAY_ACTOR_NUM_CPUS} CPU cores...")
2324
self._processor = DataProcessCore()
2425

25-
def process_file(self, source: str, chunking_strategy: str, destination: str, task_id: Optional[str] = None, **params) -> List[Dict[str, Any]]:
26+
def process_file(
27+
self,
28+
source: str,
29+
chunking_strategy: str,
30+
destination: str,
31+
task_id: Optional[str] = None,
32+
**params
33+
) -> List[Dict[str, Any]]:
2634
"""
2735
Process a file, auto-detecting its type using DataProcessCore.file_process.
2836
@@ -62,4 +70,4 @@ def process_file(self, source: str, chunking_strategy: str, destination: str, ta
6270
return []
6371

6472
logger.debug(f"[RayActor] file_process returned {len(chunks)} chunks, returning as is.")
65-
return chunks
73+
return chunks

backend/data_process/ray_config.py

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,41 @@
22
Ray configuration management module
33
"""
44

5-
import os
65
import logging
6+
import os
7+
from typing import Any, Dict, Optional
8+
79
import ray
8-
from typing import Dict, Any, Optional
9-
from consts.const import RAY_PLASMA_DIRECTORY, RAY_OBJECT_STORE_MEMORY_GB, RAY_TEMP_DIR, RAY_NUM_CPUS
1010

11-
logger = logging.getLogger(__name__)
11+
from consts.const import (
12+
RAY_NUM_CPUS,
13+
RAY_OBJECT_STORE_MEMORY_GB,
14+
RAY_PLASMA_DIRECTORY,
15+
RAY_TEMP_DIR,
16+
)
17+
18+
logger = logging.getLogger("data_process.ray_config")
1219

1320
# Forward declaration variable so runtime references succeed before instantiation
1421
ray_config: Optional["RayConfig"] = None
1522

1623

1724
class RayConfig:
1825
"""Ray configuration manager"""
19-
26+
2027
def __init__(self):
2128
self.plasma_directory = RAY_PLASMA_DIRECTORY
2229
self.object_store_memory_gb = RAY_OBJECT_STORE_MEMORY_GB
2330
self.temp_dir = RAY_TEMP_DIR
24-
25-
def get_init_params(self,
26-
address: Optional[str] = None,
27-
num_cpus: Optional[int] = None,
28-
include_dashboard: bool = False,
29-
dashboard_host: str = "0.0.0.0",
30-
dashboard_port: int = 8265) -> Dict[str, Any]:
31+
32+
def get_init_params(
33+
self,
34+
address: Optional[str] = None,
35+
num_cpus: Optional[int] = None,
36+
include_dashboard: bool = False,
37+
dashboard_host: str = "0.0.0.0",
38+
dashboard_port: int = 8265
39+
) -> Dict[str, Any]:
3140
"""
3241
Get Ray initialization parameters
3342
@@ -45,29 +54,29 @@ def get_init_params(self,
4554
"ignore_reinit_error": True,
4655
"_plasma_directory": self.plasma_directory,
4756
}
48-
57+
4958
if address:
5059
params["address"] = address
5160
else:
5261
# Local cluster configuration
5362
if num_cpus:
5463
params["num_cpus"] = num_cpus
55-
64+
5665
# Object store memory configuration (convert to bytes)
5766
object_store_memory = int(self.object_store_memory_gb * 1024 * 1024 * 1024)
5867
params["object_store_memory"] = object_store_memory
59-
68+
6069
# Temp directory configuration
6170
params["_temp_dir"] = self.temp_dir
62-
71+
6372
# Dashboard configuration
6473
if include_dashboard:
6574
params["include_dashboard"] = True
6675
params["dashboard_host"] = dashboard_host
6776
params["dashboard_port"] = dashboard_port
68-
77+
6978
return params
70-
79+
7180
def init_ray(self, **kwargs) -> bool:
7281
"""
7382
Initialize Ray
@@ -82,11 +91,8 @@ def init_ray(self, **kwargs) -> bool:
8291
if ray.is_initialized():
8392
logger.info("Ray already initialized, skipping...")
8493
return True
85-
94+
8695
params = self.get_init_params(**kwargs)
87-
88-
# Get Ray configuration from environment
89-
num_cpus = int(RAY_NUM_CPUS) if RAY_NUM_CPUS else None # None lets Ray decide
9096

9197
# Log the attempt to initialize
9298
logger.debug("Initializing Ray cluster...")
@@ -95,27 +101,27 @@ def init_ray(self, **kwargs) -> bool:
95101
if key.startswith('_'):
96102
logger.debug(f" {key}: {value}")
97103
elif key == 'object_store_memory':
98-
logger.debug(f" {key}: {value / (1024**3):.1f} GB")
104+
logger.debug(f" {key}: {value / (1024 ** 3):.1f} GB")
99105
else:
100106
logger.debug(f" {key}: {value}")
101-
107+
102108
ray.init(**params)
103109
logger.info("✅ Ray initialization successful")
104-
110+
105111
# Display cluster information
106112
try:
107113
if hasattr(ray, 'cluster_resources'):
108114
resources = ray.cluster_resources()
109115
logger.debug(f"Ray cluster resources: {resources}")
110116
except Exception as e:
111117
logger.error(f"Failed to get cluster resources information: {e}")
112-
118+
113119
return True
114-
120+
115121
except Exception as e:
116122
logger.error(f"❌ Ray initialization failed: {str(e)}")
117123
return False
118-
124+
119125
def connect_to_cluster(self, address: str = "auto") -> bool:
120126
"""
121127
Connect to existing Ray cluster
@@ -130,23 +136,25 @@ def connect_to_cluster(self, address: str = "auto") -> bool:
130136
if ray.is_initialized():
131137
logger.debug("Ray already initialized, skipping...")
132138
return True
133-
139+
134140
params = self.get_init_params(address=address)
135-
141+
136142
logger.debug(f"Connecting to Ray cluster: {address}")
137143
ray.init(**params)
138144
logger.info("✅ Successfully connected to Ray cluster")
139-
145+
140146
return True
141-
147+
142148
except Exception as e:
143149
logger.info(f"Cannot connect to Ray cluster: {str(e)}")
144150
return False
145-
146-
def start_local_cluster(self,
147-
num_cpus: Optional[int] = None,
148-
include_dashboard: bool = True,
149-
dashboard_port: int = 8265) -> bool:
151+
152+
def start_local_cluster(
153+
self,
154+
num_cpus: Optional[int] = None,
155+
include_dashboard: bool = True,
156+
dashboard_port: int = 8265
157+
) -> bool:
150158
"""
151159
Start local Ray cluster
152160
@@ -160,13 +168,13 @@ def start_local_cluster(self,
160168
"""
161169
if num_cpus is None:
162170
num_cpus = os.cpu_count()
163-
171+
164172
return self.init_ray(
165173
num_cpus=num_cpus,
166174
include_dashboard=include_dashboard,
167175
dashboard_port=dashboard_port
168176
)
169-
177+
170178
def log_configuration(self):
171179
"""Log current configuration information"""
172180
logger.debug("Ray Configuration:")
@@ -182,11 +190,13 @@ def init_ray_for_worker(cls, address: str = "auto") -> bool:
182190
return ray_config.connect_to_cluster(address)
183191

184192
@classmethod
185-
def init_ray_for_service(cls,
186-
num_cpus: Optional[int] = None,
187-
dashboard_port: int = 8265,
188-
try_connect_first: bool = True,
189-
include_dashboard: bool = True) -> bool:
193+
def init_ray_for_service(
194+
cls,
195+
num_cpus: Optional[int] = None,
196+
dashboard_port: int = 8265,
197+
try_connect_first: bool = True,
198+
include_dashboard: bool = True
199+
) -> bool:
190200
"""Initialize Ray for data processing service (class method wrapper)."""
191201
ray_config.log_configuration()
192202

@@ -203,7 +213,3 @@ def init_ray_for_service(cls,
203213
include_dashboard=include_dashboard,
204214
dashboard_port=dashboard_port
205215
)
206-
207-
# Create a global RayConfig instance accessible throughout the module
208-
ray_config = RayConfig()
209-

0 commit comments

Comments
 (0)