Skip to content

Commit d945cf5

Browse files
committed
Add configurable thread limits for local and online deployments
Add a thread_limits section to settings.json with separate values for local (4) and online (2) deployments. The CommandExecutor now uses ThreadPoolExecutor to limit parallel command execution based on deployment mode, preventing resource exhaustion on shared cloud environments. https://claude.ai/code/session_01GMNyjKwZQhhQ6NsBdFkavG
1 parent a784301 commit d945cf5

File tree

3 files changed

+56
-25
lines changed

3 files changed

+56
-25
lines changed

settings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
}
1515
},
1616
"online_deployment": false,
17+
"thread_limits": {
18+
"local": 4,
19+
"online": 2
20+
},
1721
"enable_workspaces": true,
1822
"test": true,
1923
"workspaces_dir": "..",

src/workflow/CommandExecutor.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import shutil
44
import subprocess
55
import threading
6+
from concurrent.futures import ThreadPoolExecutor, as_completed
67
from pathlib import Path
8+
from typing import Optional
79
from .Logger import Logger
810
from .ParameterManager import ParameterManager
911
import sys
@@ -20,20 +22,20 @@ class CommandExecutor:
2022
for execution.
2123
"""
2224
# Methods for running commands and logging
23-
def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager):
25+
def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager, max_threads: Optional[int] = None):
2426
self.pid_dir = Path(workflow_dir, "pids")
2527
self.logger = logger
2628
self.parameter_manager = parameter_manager
29+
self.max_threads = max_threads
2730

2831
def run_multiple_commands(
2932
self, commands: list[str]
3033
) -> bool:
3134
"""
32-
Executes multiple shell commands concurrently in separate threads.
35+
Executes multiple shell commands concurrently with optional thread limiting.
3336
34-
This method leverages threading to run each command in parallel, improving
35-
efficiency for batch command execution. Execution time and command results are
36-
logged if specified.
37+
Uses ThreadPoolExecutor to run commands in parallel while respecting the
38+
configured maximum thread limit for the current deployment mode.
3739
3840
Args:
3941
commands (list[str]): A list where each element is a list representing
@@ -42,30 +44,28 @@ def run_multiple_commands(
4244
Returns:
4345
bool: True if all commands succeeded, False if any failed.
4446
"""
47+
# Determine max workers: use configured limit or fall back to command count (unlimited)
48+
max_workers = self.max_threads if self.max_threads else len(commands)
49+
thread_info = f"max threads: {self.max_threads}" if self.max_threads else "unlimited"
50+
4551
# Log the start of command execution
46-
self.logger.log(f"Running {len(commands)} commands in parallel...", 1)
52+
self.logger.log(f"Running {len(commands)} commands in parallel ({thread_info})...", 1)
4753
start_time = time.time()
4854

4955
results = []
50-
lock = threading.Lock()
51-
52-
def run_and_track(cmd):
53-
success = self.run_command(cmd)
54-
with lock:
55-
results.append(success)
56-
57-
# Initialize a list to keep track of threads
58-
threads = []
59-
60-
# Start a new thread for each command
61-
for cmd in commands:
62-
thread = threading.Thread(target=run_and_track, args=(cmd,))
63-
thread.start()
64-
threads.append(thread)
6556

66-
# Wait for all threads to complete
67-
for thread in threads:
68-
thread.join()
57+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
58+
# Submit all commands to the pool
59+
future_to_cmd = {executor.submit(self.run_command, cmd): cmd for cmd in commands}
60+
61+
# Collect results as they complete
62+
for future in as_completed(future_to_cmd):
63+
try:
64+
success = future.result()
65+
results.append(success)
66+
except Exception as e:
67+
self.logger.log(f"ERROR: Command raised exception: {e}", 0)
68+
results.append(False)
6969

7070
# Calculate and log the total execution time
7171
end_time = time.time()

src/workflow/WorkflowManager.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ def __init__(self, name: str, workspace: str):
1919
self.file_manager = FileManager(self.workflow_dir)
2020
self.logger = Logger(self.workflow_dir)
2121
self.parameter_manager = ParameterManager(self.workflow_dir, workflow_name=name)
22-
self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager)
22+
self.executor = CommandExecutor(
23+
self.workflow_dir,
24+
self.logger,
25+
self.parameter_manager,
26+
max_threads=self._get_max_threads()
27+
)
2328
self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager)
2429
self.params = self.parameter_manager.get_parameters_from_json()
2530

@@ -32,6 +37,28 @@ def _is_online_mode(self) -> bool:
3237
"""Check if running in online deployment mode"""
3338
return st.session_state.get("settings", {}).get("online_deployment", False)
3439

40+
def _get_max_threads(self) -> Optional[int]:
41+
"""
42+
Get the maximum number of parallel threads based on deployment mode.
43+
44+
Returns the configured thread limit from settings.json, using the appropriate
45+
value for local or online deployment. Returns None if thread_limits is not
46+
configured, which means unlimited threads.
47+
48+
Returns:
49+
Optional[int]: Maximum number of threads, or None for unlimited.
50+
"""
51+
settings = st.session_state.get("settings", {})
52+
thread_limits = settings.get("thread_limits", {})
53+
54+
if not thread_limits:
55+
return None
56+
57+
if self._is_online_mode():
58+
return thread_limits.get("online", 2)
59+
else:
60+
return thread_limits.get("local", 4)
61+
3562
def _init_queue_manager(self) -> None:
3663
"""Initialize queue manager for online mode"""
3764
try:

0 commit comments

Comments
 (0)