Skip to content

Commit 35d53ed

Browse files
committed
support scale engine for deploy and fix _process_dict in task_service
1 parent d96072b commit 35d53ed

File tree

3 files changed

+17
-29
lines changed

3 files changed

+17
-29
lines changed

docker-compose.dev.yml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ services:
2424
build:
2525
context: ./backend
2626
dockerfile: Dockerfile
27-
container_name: lmeterx-backend
2827
restart: unless-stopped
2928
depends_on:
3029
mysql:
@@ -39,8 +38,6 @@ services:
3938
volumes:
4039
- ./logs:/logs
4140
- ./upload_files:/app/upload_files
42-
ports:
43-
- "5001:5001"
4441
healthcheck:
4542
test: [ "CMD", "curl", "-s", "-f", "http://localhost:5001/health" ]
4643
interval: 10s
@@ -53,7 +50,6 @@ services:
5350
build:
5451
context: ./st_engine
5552
dockerfile: Dockerfile
56-
container_name: lmeterx-engine
5753
restart: unless-stopped
5854
depends_on:
5955
mysql:
@@ -80,14 +76,12 @@ services:
8076
volumes:
8177
- ./logs:/logs
8278
- ./upload_files:/app/upload_files
83-
ports:
84-
- "5002:5002"
8579
# Resource limits for the container
8680
deploy:
8781
resources:
8882
limits:
8983
cpus: '2.0' # Must match LOCUST_CPU_CORES
90-
memory: 2G
84+
memory: 4G
9185
reservations:
9286
cpus: '1.0'
9387
memory: 1G

docker-compose.yml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ services:
2020
# backend service
2121
backend:
2222
image: charmy1220/lmeterx-be:latest
23-
container_name: lmeterx-backend
2423
restart: unless-stopped
2524
depends_on:
2625
mysql:
@@ -35,8 +34,6 @@ services:
3534
volumes:
3635
- ./logs:/logs
3736
- ./upload_files:/app/upload_files
38-
ports:
39-
- "5001:5001"
4037
healthcheck:
4138
test: [ "CMD", "curl", "-s", "-f", "http://localhost:5001/health" ]
4239
interval: 10s
@@ -47,7 +44,6 @@ services:
4744
# engine service
4845
engine:
4946
image: charmy1220/lmeterx-eng:latest
50-
container_name: lmeterx-engine
5147
restart: unless-stopped
5248
depends_on:
5349
mysql:
@@ -60,7 +56,7 @@ services:
6056
DB_NAME: lmeterx
6157
# Multi-process configuration
6258
ENABLE_MULTIPROCESS: auto
63-
LOCUST_CPU_CORES: 4.0
59+
LOCUST_CPU_CORES: 2.0
6460
MULTIPROCESS_THRESHOLD: 1000 # Enable multiprocess only for 1000+ users
6561
MIN_USERS_PER_PROCESS: 600 # Each process handles at least 600 users
6662
# Process management and stability
@@ -74,14 +70,12 @@ services:
7470
volumes:
7571
- ./logs:/logs
7672
- ./upload_files:/app/upload_files
77-
ports:
78-
- "5002:5002"
7973
# Resource limits for the container
8074
deploy:
8175
resources:
8276
limits:
8377
cpus: '2.0' # Multi-process mode requires at least 2 cores
84-
memory: 2G
78+
memory: 4G
8579
reservations:
8680
cpus: '1.0'
8781
memory: 1G

st_engine/service/task_service.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ def stop_task(self, task_id: str) -> bool:
463463
task_logger.info(f"Received stop request for task {task_id}.")
464464

465465
# Check if the process is managed by this runner instance
466-
process = self.runner.process_dict.get(task_id)
466+
process = self.runner._process_dict.get(task_id)
467467
if not process:
468468
task_logger.warning(
469469
f"Task {task_id}: Process not found in runner's dictionary. "
@@ -477,7 +477,7 @@ def stop_task(self, task_id: str) -> bool:
477477
task_logger.info(
478478
f"Task {task_id}: Process with PID {process.pid} has already terminated. Cleaning up local reference."
479479
)
480-
self.runner.process_dict.pop(task_id, None)
480+
self.runner._process_dict.pop(task_id, None)
481481
return True
482482

483483
# Delegate the complex termination logic to the process manager via the runner.
@@ -489,7 +489,7 @@ def stop_task(self, task_id: str) -> bool:
489489
f"Successfully terminated process group for task {task_id}."
490490
)
491491
# Remove from local tracking after successful termination
492-
self.runner.process_dict.pop(task_id, None)
492+
self.runner._process_dict.pop(task_id, None)
493493
return True
494494
else:
495495
task_logger.error(
@@ -525,7 +525,7 @@ def stop_task_old(self, task_id: str) -> bool:
525525
)
526526

527527
# Clean up local tracking
528-
self.runner.process_dict.pop(task_id, None)
528+
self.runner._process_dict.pop(task_id, None)
529529
if hasattr(self.runner, "_terminating_processes"):
530530
termination_key = f"{task_id}_terminating"
531531
self.runner._terminating_processes.discard(termination_key)
@@ -535,13 +535,13 @@ def stop_task_old(self, task_id: str) -> bool:
535535
return True
536536

537537
# Step 2: Fallback to original process termination if multiprocess cleanup failed
538-
process = self.runner.process_dict.get(task_id)
538+
process = self.runner._process_dict.get(task_id)
539539

540540
if not process:
541541
task_logger.warning(
542542
f"Task {task_id}, Process not found in runner's dictionary. It might have finished or be on another node."
543543
)
544-
self.runner.process_dict.pop(task_id, None)
544+
self.runner._process_dict.pop(task_id, None)
545545
# Clean up task resources (do not force cleanup orphaned processes here)
546546
cleanup_task_resources(task_id)
547547
return True
@@ -550,7 +550,7 @@ def stop_task_old(self, task_id: str) -> bool:
550550
task_logger.info(
551551
f"Task {task_id}, Process with PID {process.pid} has already terminated. Cleaning up."
552552
)
553-
self.runner.process_dict.pop(task_id, None)
553+
self.runner._process_dict.pop(task_id, None)
554554
cleanup_task_resources(task_id)
555555
return True
556556

@@ -577,7 +577,7 @@ def stop_task_old(self, task_id: str) -> bool:
577577
task_logger.info(
578578
f"Task {task_id}, Process with PID {process.pid} terminated naturally while preparing to stop it."
579579
)
580-
self.runner.process_dict.pop(task_id, None)
580+
self.runner._process_dict.pop(task_id, None)
581581
self.runner._terminating_processes.discard(termination_key)
582582
cleanup_task_resources(task_id)
583583
return True
@@ -587,7 +587,7 @@ def stop_task_old(self, task_id: str) -> bool:
587587
task_logger.info(
588588
f"Task {task_id}, Process terminated successfully via SIGTERM."
589589
)
590-
self.runner.process_dict.pop(task_id, None)
590+
self.runner._process_dict.pop(task_id, None)
591591
self.runner._terminating_processes.discard(termination_key)
592592
cleanup_task_resources(task_id)
593593
return True
@@ -601,7 +601,7 @@ def stop_task_old(self, task_id: str) -> bool:
601601
task_logger.info(
602602
f"Task {task_id}, Process with PID {process.pid} terminated naturally during SIGTERM timeout."
603603
)
604-
self.runner.process_dict.pop(task_id, None)
604+
self.runner._process_dict.pop(task_id, None)
605605
self.runner._terminating_processes.discard(termination_key)
606606
return True
607607

@@ -610,7 +610,7 @@ def stop_task_old(self, task_id: str) -> bool:
610610
task_logger.info(
611611
f"Task {task_id}, Process killed successfully via SIGKILL."
612612
)
613-
self.runner.process_dict.pop(task_id, None)
613+
self.runner._process_dict.pop(task_id, None)
614614
self.runner._terminating_processes.discard(termination_key)
615615
return True
616616
except subprocess.TimeoutExpired:
@@ -630,7 +630,7 @@ def stop_task_old(self, task_id: str) -> bool:
630630
task_logger.info(
631631
f"Task {task_id}, Process with PID {process.pid} no longer exists (ProcessLookupError). Cleaning up."
632632
)
633-
self.runner.process_dict.pop(task_id, None)
633+
self.runner._process_dict.pop(task_id, None)
634634
self.runner._terminating_processes.discard(termination_key)
635635
return True
636636
except Exception as e:
@@ -646,7 +646,7 @@ def stop_task_old(self, task_id: str) -> bool:
646646
task_logger.info(
647647
f"Task {task_id}, Process completed its natural shutdown successfully."
648648
)
649-
self.runner.process_dict.pop(task_id, None)
649+
self.runner._process_dict.pop(task_id, None)
650650
self.runner._terminating_processes.discard(termination_key)
651651
return True
652652
except subprocess.TimeoutExpired:
@@ -659,7 +659,7 @@ def stop_task_old(self, task_id: str) -> bool:
659659
task_logger.info(
660660
f"Task {task_id}, Process force-killed successfully after natural shutdown timeout."
661661
)
662-
self.runner.process_dict.pop(task_id, None)
662+
self.runner._process_dict.pop(task_id, None)
663663
self.runner._terminating_processes.discard(termination_key)
664664
return True
665665
except Exception as kill_e:

0 commit comments

Comments
 (0)