Skip to content

Commit eb9c205

Browse files
t0mdavid-mclaude
andauthored
Fix threads in offline deployment (#333)
* feat: add configurable max threads for local and online deployments - Add max_threads configuration to settings.json with separate values for local (4) and online (2) deployments - Add get_max_threads() helper function in common.py that returns appropriate thread count based on deployment mode - Add Threads number input in parameter_section for local mode UI - Update CommandExecutor.run_multiple_commands() to use semaphore limiting based on max_threads setting - Update CommandExecutor.run_topp() to pass -threads parameter to TOPP tools with intelligently distributed thread allocation Thread distribution algorithm: - parallel_commands = min(num_files, max_threads) - threads_per_command = max(1, max_threads // parallel_commands) This ensures optimal resource usage: with 4 max_threads and 2 files, runs 2 parallel commands with 2 threads each; with 4 files, runs 4 parallel commands with 1 thread each. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * fix: properly initialize session state for Threads number input Pre-initialize max_threads_override in session state before rendering the widget, and remove the value parameter to let Streamlit read from session state. This ensures user changes are properly tracked. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * fix: persist Threads parameter across page navigation Use input_widget() which integrates with the parameter manager to persist the value to params.json. Also copy value to session state for get_max_threads() to access. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * refactor: use parameter manager directly in get_max_threads() - Pass parameter_manager to get_max_threads() to read persisted value - Remove session state copying workaround in StreamlitUI - Cleaner design that reads directly from params.json https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * refactor: move _get_max_threads() to CommandExecutor - Move function from common.py to CommandExecutor as private method - Simplifies code by using self.parameter_manager directly - Remove unnecessary import and parameter passing https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * fix: validate _get_max_threads() return value Coerce to int and clamp to minimum of 1 to prevent: - Semaphore(0) deadlocks - Divide-by-zero in threads_per_command calculation - Issues from non-numeric config values https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * style: move streamlit import to top of file https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * feat: log warning for invalid max_threads values Logs to minimal log (level 0) when max_threads is non-numeric or < 1. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * simplify: remove warning logging from _get_max_threads() UI already enforces min_value=1, keep just the validation. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi * simplify: remove exception handling, fail loud on invalid config https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 0820681 commit eb9c205

File tree

3 files changed

+30
-32
lines changed

3 files changed

+30
-32
lines changed

src/common/common.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,6 @@
3232
OS_PLATFORM = sys.platform
3333

3434

35-
def get_max_threads() -> int:
36-
"""
37-
Get max threads for current deployment mode.
38-
39-
In local mode, checks for UI override in session state.
40-
In online mode, uses the configured value directly.
41-
42-
Returns:
43-
int: Maximum number of threads to use for parallel processing.
44-
"""
45-
settings = st.session_state.get("settings", {})
46-
max_threads_config = settings.get("max_threads", {"local": 4, "online": 2})
47-
48-
if settings.get("online_deployment", False):
49-
return max_threads_config.get("online", 2)
50-
else:
51-
# Local mode: check for UI override, fallback to config default
52-
return st.session_state.get(
53-
"max_threads_override",
54-
max_threads_config.get("local", 4)
55-
)
56-
57-
5835
def is_safe_workspace_name(name: str) -> bool:
5936
"""
6037
Check if a workspace name is safe (no path traversal characters).

src/workflow/CommandExecutor.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import sys
1010
import importlib.util
1111
import json
12+
import streamlit as st
1213

1314
class CommandExecutor:
1415
"""
@@ -25,6 +26,28 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame
2526
self.logger = logger
2627
self.parameter_manager = parameter_manager
2728

29+
def _get_max_threads(self) -> int:
30+
"""
31+
Get max threads for current deployment mode.
32+
33+
In local mode, reads from parameter manager (persisted params.json).
34+
In online mode, uses the configured value directly from settings.
35+
36+
Returns:
37+
int: Maximum number of threads to use for parallel processing (minimum 1).
38+
"""
39+
settings = st.session_state.get("settings", {})
40+
max_threads_config = settings.get("max_threads", {"local": 4, "online": 2})
41+
42+
if settings.get("online_deployment", False):
43+
value = max_threads_config.get("online", 2)
44+
else:
45+
default = max_threads_config.get("local", 4)
46+
params = self.parameter_manager.get_parameters_from_json()
47+
value = params.get("max_threads", default)
48+
49+
return max(1, int(value))
50+
2851
def run_multiple_commands(
2952
self, commands: list[str]
3053
) -> bool:
@@ -43,10 +66,8 @@ def run_multiple_commands(
4366
Returns:
4467
bool: True if all commands succeeded, False if any failed.
4568
"""
46-
from src.common.common import get_max_threads
47-
4869
# Get thread settings and calculate distribution
49-
max_threads = get_max_threads()
70+
max_threads = self._get_max_threads()
5071
num_commands = len(commands)
5172
parallel_commands = min(num_commands, max_threads)
5273

@@ -234,8 +255,7 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> b
234255
n_processes = max(io_lengths)
235256

236257
# Calculate threads per command based on max_threads setting
237-
from src.common.common import get_max_threads
238-
max_threads = get_max_threads()
258+
max_threads = self._get_max_threads()
239259
parallel_commands = min(n_processes, max_threads)
240260
threads_per_command = max(1, max_threads // parallel_commands)
241261

src/workflow/StreamlitUI.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,11 +1112,12 @@ def parameter_section(self, custom_parameter_function) -> None:
11121112
if not st.session_state.settings.get("online_deployment", False):
11131113
max_threads_config = st.session_state.settings.get("max_threads", {})
11141114
default_threads = max_threads_config.get("local", 4)
1115-
st.number_input(
1116-
"Threads",
1115+
self.input_widget(
1116+
key="max_threads",
1117+
default=default_threads,
1118+
name="Threads",
1119+
widget_type="number",
11171120
min_value=1,
1118-
value=default_threads,
1119-
key="max_threads_override",
11201121
help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation."
11211122
)
11221123

0 commit comments

Comments
 (0)