Skip to content

Commit 1665060

Browse files
committed
feat: add configurable max threads for local and online deployments
- Add max_threads configuration to settings.json with separate values for local (4) and online (2) deployments - Add get_max_threads() helper function in common.py that returns appropriate thread count based on deployment mode - Add Threads number input in parameter_section for local mode UI - Update CommandExecutor.run_multiple_commands() to use semaphore limiting based on max_threads setting - Update CommandExecutor.run_topp() to pass -threads parameter to TOPP tools with intelligently distributed thread allocation Thread distribution algorithm: - parallel_commands = min(num_files, max_threads) - threads_per_command = max(1, max_threads // parallel_commands) This ensures optimal resource usage: with 4 max_threads and 2 files, runs 2 parallel commands with 2 threads each; with 4 files, runs 4 parallel commands with 1 thread each. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
1 parent a784301 commit 1665060

File tree

4 files changed

+64
-7
lines changed

4 files changed

+64
-7
lines changed

settings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,9 @@
2626
"source_dirs": [
2727
"example-data/workspaces"
2828
]
29+
},
30+
"max_threads": {
31+
"local": 4,
32+
"online": 2
2933
}
3034
}

src/common/common.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,29 @@
3232
OS_PLATFORM = sys.platform
3333

3434

35+
def get_max_threads() -> int:
36+
"""
37+
Get max threads for current deployment mode.
38+
39+
In local mode, checks for UI override in session state.
40+
In online mode, uses the configured value directly.
41+
42+
Returns:
43+
int: Maximum number of threads to use for parallel processing.
44+
"""
45+
settings = st.session_state.get("settings", {})
46+
max_threads_config = settings.get("max_threads", {"local": 4, "online": 2})
47+
48+
if settings.get("online_deployment", False):
49+
return max_threads_config.get("online", 2)
50+
else:
51+
# Local mode: check for UI override, fallback to config default
52+
return st.session_state.get(
53+
"max_threads_override",
54+
max_threads_config.get("local", 4)
55+
)
56+
57+
3558
def is_safe_workspace_name(name: str) -> bool:
3659
"""
3760
Check if a workspace name is safe (no path traversal characters).

src/workflow/CommandExecutor.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ def run_multiple_commands(
3232
Executes multiple shell commands concurrently in separate threads.
3333
3434
This method leverages threading to run each command in parallel, improving
35-
efficiency for batch command execution. Execution time and command results are
36-
logged if specified.
35+
efficiency for batch command execution. The number of concurrent commands
36+
is limited by the max_threads setting, which is distributed between
37+
parallel command execution and per-tool thread allocation.
3738
3839
Args:
3940
commands (list[str]): A list where each element is a list representing
@@ -42,17 +43,26 @@ def run_multiple_commands(
4243
Returns:
4344
bool: True if all commands succeeded, False if any failed.
4445
"""
46+
from src.common.common import get_max_threads
47+
48+
# Get thread settings and calculate distribution
49+
max_threads = get_max_threads()
50+
num_commands = len(commands)
51+
parallel_commands = min(num_commands, max_threads)
52+
4553
# Log the start of command execution
46-
self.logger.log(f"Running {len(commands)} commands in parallel...", 1)
54+
self.logger.log(f"Running {num_commands} commands (max {parallel_commands} parallel, {max_threads} total threads)...", 1)
4755
start_time = time.time()
4856

4957
results = []
5058
lock = threading.Lock()
59+
semaphore = threading.Semaphore(parallel_commands)
5160

5261
def run_and_track(cmd):
53-
success = self.run_command(cmd)
54-
with lock:
55-
results.append(success)
62+
with semaphore:
63+
success = self.run_command(cmd)
64+
with lock:
65+
results.append(success)
5666

5767
# Initialize a list to keep track of threads
5868
threads = []
@@ -69,7 +79,7 @@ def run_and_track(cmd):
6979

7080
# Calculate and log the total execution time
7181
end_time = time.time()
72-
self.logger.log(f"Total time to run {len(commands)} commands: {end_time - start_time:.2f} seconds", 1)
82+
self.logger.log(f"Total time to run {num_commands} commands: {end_time - start_time:.2f} seconds", 1)
7383

7484
return all(results)
7585

@@ -210,6 +220,12 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b
210220
else:
211221
n_processes = max(io_lengths)
212222

223+
# Calculate threads per command based on max_threads setting
224+
from src.common.common import get_max_threads
225+
max_threads = get_max_threads()
226+
parallel_commands = min(n_processes, max_threads)
227+
threads_per_command = max(1, max_threads // parallel_commands)
228+
213229
commands = []
214230

215231
# Load parameters for non-defaults
@@ -253,6 +269,8 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b
253269
command += [str(x) for x in v]
254270
else:
255271
command += [str(v)]
272+
# Add threads parameter for TOPP tools
273+
command += ["-threads", str(threads_per_command)]
256274
commands.append(command)
257275

258276
# check if a ini file has been written, if yes use it (contains custom defaults)

src/workflow/StreamlitUI.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,18 @@ def file_upload_section(self, custom_upload_function) -> None:
11081108
def parameter_section(self, custom_parameter_function) -> None:
11091109
st.toggle("Show advanced parameters", value=False, key="advanced")
11101110

1111+
# Display threads configuration for local mode only
1112+
if not st.session_state.settings.get("online_deployment", False):
1113+
max_threads_config = st.session_state.settings.get("max_threads", {})
1114+
default_threads = max_threads_config.get("local", 4)
1115+
st.number_input(
1116+
"Threads",
1117+
min_value=1,
1118+
value=default_threads,
1119+
key="max_threads_override",
1120+
help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation."
1121+
)
1122+
11111123
# Display preset buttons if presets are available for this workflow
11121124
self.preset_buttons()
11131125

0 commit comments

Comments
 (0)