Conversation
This document outlines the architecture and implementation steps for introducing a Redis-based job queueing system to online deployments: - Uses RQ (Redis Queue) for task management - Docker Compose additions for Redis, worker, and dashboard services - New QueueManager class for queue interactions - Worker tasks module for background execution - Health monitoring utilities - Backward compatible with local mode (multiprocessing fallback)
Update implementation plan to run Redis server and RQ worker within the same Docker container as the Streamlit app: - Redis server runs as background process via entrypoint script - RQ worker runs in same container with identical environment - No docker-compose orchestration complexity - All communication via localhost - Optional supervisord config for robust process management - Simplified deployment and debugging
…etails - Add "Design Principles" section explaining plug & play architecture - Document offline mode (Windows installer) compatibility - zero changes needed - Add comprehensive "Configuring Worker Count" section with multiple methods - Add "User Experience: Queue Status Display" section showing what users see - Add "Sidebar Metrics" section for queue monitoring alongside CPU/RAM - Include code examples for all UI components
New files: - src/workflow/QueueManager.py: Redis queue interaction layer - src/workflow/tasks.py: Worker task definitions for RQ - src/workflow/health.py: Queue health check utilities Modified files: - Dockerfile: Install Redis server, add entrypoint for Redis/RQ workers - requirements.txt: Add redis and rq packages - settings.json: Add queue_settings configuration - src/workflow/WorkflowManager.py: Add queue support with local fallback - src/workflow/StreamlitUI.py: Add queue status display in execution section - src/common/common.py: Add sidebar queue metrics for online mode Features: - Automatic queue mode in online deployment, local mode unchanged - Queue position and progress display when workflows are queued - Sidebar metrics showing worker utilization and queue depth - Configurable worker count via RQ_WORKER_COUNT env variable - Graceful fallback to multiprocessing if Redis unavailable
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds a Redis-backed job queue (RQ) with worker tasks, health checks, QueueManager and UI/status integration; updates Docker/entrypoint to run Redis and RQ workers, adds dependencies and settings, and provides a docs implementation plan. Changes
Sequence Diagram(s)sequenceDiagram
participant UI as Streamlit UI
participant WM as WorkflowManager
participant QM as QueueManager
participant Redis as Redis Server
participant Worker as RQ Worker
UI->>WM: start_workflow()
WM->>WM: detect online mode / init QueueManager
alt Online mode
WM->>QM: submit_job(execute_workflow, ...)
QM->>Redis: enqueue job
Redis-->>QM: job_id
QM-->>WM: job_id
WM-->>UI: job submitted (job_id)
par Status polling
UI->>WM: get_workflow_status(job_id)
WM->>QM: get_job_info(job_id)
QM->>Redis: fetch job metadata
Redis-->>QM: status, progress, queue_position
QM-->>WM: JobInfo
WM-->>UI: status {progress, queue_position,...}
and Worker execution
Worker->>Redis: pop job
Worker->>Worker: execute_workflow(...)
Worker->>Redis: update job meta (progress, step)
end
else Fallback
WM->>WM: _start_workflow_local()
WM-->>UI: workflow running locally
end
sequenceDiagram
participant UI as Streamlit UI
participant Health as health module
participant Redis as Redis Server
UI->>Health: get_queue_metrics()
Health->>Redis: connect via REDIS_URL
Redis-->>Health: ping, workers, queue info
Health-->>UI: {available, total_workers, busy_workers, queued_jobs}
UI->>UI: render monitor_queue fragment (utilization, warnings)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@Dockerfile`:
- Around line 166-173: The Redis readiness loop (the `until redis-cli ping`
section) can hang indefinitely; modify it to include a maximum retry count or
timeout by introducing a counter and MAX_RETRIES (or a TIMEOUT) and incrementing
the counter each iteration, breaking with a non-zero exit and an error message
if the max is reached; ensure you still echo progress messages and exit
successfully when `redis-cli ping` responds.
- Around line 152-155: The Dockerfile sets ENV REDIS_URL but the entrypoint
script still hardcodes "redis://localhost:6379/0", making the variable
ineffective; update the entrypoint invocation (the script or ENTRYPOINT that
launches the RQ worker) to use the REDIS_URL environment variable (e.g.,
substitute the literal URL with $REDIS_URL or ${REDIS_URL}) so the ENV REDIS_URL
defined alongside ENV RQ_WORKER_COUNT is honored at runtime.
- Around line 175-180: The startup script starts background RQ workers using
WORKER_COUNT (env RQ_WORKER_COUNT) and the command pattern "rq worker
openms-workflows --url redis://localhost:6379/0 --name worker-$i" but lacks
signal handling for graceful shutdown; add a trap for SIGTERM/SIGINT that
forwards these signals to the child worker PIDs, wait for them to exit, and on
timeout send TERM/QUIT/INT as appropriate—capture and store each background PID
when launching workers, implement a cleanup function (invoked by trap) that
iterates over those PIDs to kill/terminate gracefully and then wait for
completion, and ensure the script exits with the workers' exit status.
In `@requirements.txt`:
- Around line 140-143: The requirements.txt currently mixes pip-compile
generated content with manual additions (the lines for redis>=5.0.0 and
rq>=1.16.0) and lacks the expected pip-compile metadata; fix by either creating
a pyproject.toml that declares redis and rq and then regenerate requirements.txt
via pip-compile (so the added entries get proper “# via …” annotations), or keep
the manual additions and update the top-of-file header to explicitly state that
these two entries were manually appended (and add a short inline comment after
each dependency to indicate they are manual additions) so the file format and
provenance are consistent; refer to the dependency names "redis" and "rq" and
the requirements.txt header when making the change.
In `@settings.json`:
- Around line 19-24: The settings.json includes a unused queue_settings object;
either remove it or wire its enabled flag into the queue initialization logic.
Update the QueueManager initialization (constructor/init method in QueueManager)
to read settings.queue_settings?.enabled and treat false as "do not initialize
Redis" before the existing online_deployment and REDIS_URL checks, or remove
queue_settings from settings.json if you prefer to keep the current behavior;
ensure any references to queue initialization still use the final decision and
document that queue_settings.enabled controls queue activation.
In `@src/common/common.py`:
- Around line 43-85: Replace the silent blanket except in monitor_queue with a
debug-level log that preserves exception details: catch Exception as e and call
logging.getLogger(__name__).debug(...) (include a short message like
"monitor_queue: failed to fetch queue metrics" and pass exc_info=True or the
exception) so failures are recorded for diagnostics but not shown in the UI;
ensure logging is imported at top of the module and reference the monitor_queue
function and the get_queue_metrics call in the log message for context.
In `@src/workflow/WorkflowManager.py`:
- Around line 55-81: The job_id generation in _start_workflow_queued currently
uses int(time.time()) which can collide; replace that single-second timestamp
with a collision-resistant identifier (e.g., append or replace with
uuid.uuid4().hex or use time.time_ns() combined with a short random/UUID) when
building job_id in _start_workflow_queued; ensure you import uuid if using
uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.
- Around line 26-42: The lint error arises because 'QueueManager' is referenced
as a forward string but not imported at module scope; update the module to
import QueueManager inside a TYPE_CHECKING block so the name exists for static
checkers without runtime import. Add "from typing import TYPE_CHECKING" at the
top and under "if TYPE_CHECKING:" add "from .QueueManager import QueueManager";
keep the runtime dynamic import inside _init_queue_manager() (the existing
function) and leave _is_online_mode() unchanged.
🧹 Nitpick comments (7)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (2)
17-31: Add language specifiers to fenced code blocks.Several ASCII diagram blocks lack language specifiers, which can affect rendering and linting. Consider adding
textorplaintextas the language identifier for diagram blocks.This applies to multiple blocks throughout the document (lines 17, 65, 100, 204, 1023, 1256, 1270, 1364).
Example fix
-``` +```text ┌─────────────────────────────────────────────────────────────────┐ │ WorkflowManager │
247-250: Consider non-root user for supervisord processes.The supervisord configuration runs as
user=root. For improved security posture in production deployments, consider running processes under a dedicated non-root user. This is especially relevant if the container processes untrusted workflow data.src/workflow/tasks.py (2)
92-96: Destructive results directory cleanup without backup.Deleting the entire results directory before execution could cause unexpected data loss if a user accidentally restarts a workflow. Consider archiving previous results or adding a confirmation mechanism.
120-147: Broad exception handling is appropriate but could benefit from more specific logging.The static analysis flags about blind
Exceptioncatching are less concerning here since this is a top-level task handler that must catch all errors. However, the silentexcept: passblocks (lines 133-134, 140-141) could hide important debugging information.Consider using
logging.debug()or similar to capture these secondary failures without affecting the main error flow.Example improvement for cleanup logging
except Exception: - pass + pass # Intentionally silent - don't mask original errorOr with actual logging:
import logging logger = logging.getLogger(__name__) # ... except Exception as cleanup_err: logger.debug(f"Cleanup failed: {cleanup_err}")src/workflow/health.py (1)
92-128: Expose queue-metrics failures for debugging.
Returning only{"available": False}loses the cause. Consider including the exception message to help troubleshoot without breaking callers.🔧 Suggested fix
- except Exception: - return {"available": False} + except Exception as e: + return {"available": False, "error": str(e)}src/workflow/StreamlitUI.py (1)
1088-1131: Remove unusedmsg_typemapping.
The tuple’s second value isn’t used; simplifying avoids lint noise.♻️ Suggested cleanup
- status_display = { - "queued": ("Queued", "info"), - "started": ("Running", "info"), - "finished": ("Completed", "success"), - "failed": ("Failed", "error"), - "canceled": ("Cancelled", "warning"), - } - - label, msg_type = status_display.get(job_status, ("Unknown", "info")) + status_display = { + "queued": "Queued", + "started": "Running", + "finished": "Completed", + "failed": "Failed", + "canceled": "Cancelled", + } + + label = status_display.get(job_status, "Unknown")src/workflow/QueueManager.py (1)
55-104: Allow Redis init to recover after transient startup failures.
_init_attemptedprevents retry; if Redis isn’t ready at first init (common in Docker), the queue may stay unavailable for the life of the instance. Consider lazy re-init inis_available(optionally with backoff).🔧 Suggested fix
- self._init_attempted = False - - if self._is_online: - self._init_redis() + if self._is_online: + self._init_redis() ... - if self._init_attempted: - return - self._init_attempted = True - try: from redis import Redis from rq import Queue ... `@property` def is_available(self) -> bool: """Check if queue system is available""" - return self._is_online and self._queue is not None + if self._is_online and self._queue is None: + self._init_redis() + return self._is_online and self._queue is not None
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
Dockerfiledocs/REDIS_QUEUE_IMPLEMENTATION_PLAN.mdrequirements.txtsettings.jsonsrc/common/common.pysrc/workflow/QueueManager.pysrc/workflow/StreamlitUI.pysrc/workflow/WorkflowManager.pysrc/workflow/health.pysrc/workflow/tasks.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-03-10T11:09:56.467Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.467Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop, which is considered acceptable for this specific use case as confirmed by the maintainer.
Applied to files:
src/common/common.pysrc/workflow/StreamlitUI.py
📚 Learning: 2025-03-10T11:09:56.468Z
Learnt from: subCode321
Repo: OpenMS/streamlit-template PR: 160
File: content/run_example_workflow.py:83-88
Timestamp: 2025-03-10T11:09:56.468Z
Learning: In the OpenMS streamlit-template project, continuous real-time monitoring of CPU stats and workflow jobs is implemented using a while True loop with a time.sleep(1) in the monitor_cpu_ram_stats function, which is considered acceptable for this specific use case as confirmed by the maintainer.
Applied to files:
src/common/common.py
🧬 Code graph analysis (3)
src/common/common.py (1)
src/workflow/health.py (1)
get_queue_metrics(92-129)
src/workflow/StreamlitUI.py (1)
src/workflow/CommandExecutor.py (1)
stop(248-262)
src/workflow/tasks.py (5)
src/workflow/CommandExecutor.py (1)
CommandExecutor(13-319)src/workflow/FileManager.py (1)
FileManager(7-179)src/workflow/ParameterManager.py (1)
ParameterManager(7-141)src/workflow/Logger.py (2)
Logger(3-42)log(16-42)src/workflow/WorkflowManager.py (1)
execution(259-266)
🪛 LanguageTool
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
[style] ~1607-~1607: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...ctions correctly - [ ] Logs are written correctly from worker - [ ] Multiple concurrent j...
(ADVERB_REPETITION_PREMIUM)
🪛 markdownlint-cli2 (0.18.1)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md
17-17: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
65-65: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
100-100: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
204-204: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1023-1023: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1256-1256: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1270-1270: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1364-1364: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🪛 Ruff (0.14.11)
src/common/common.py
83-84: try-except-pass detected, consider logging the exception
(S110)
83-83: Do not catch blind exception: Exception
(BLE001)
src/workflow/WorkflowManager.py
27-27: Undefined name QueueManager
(F821)
src/workflow/StreamlitUI.py
1101-1101: Unpacked variable msg_type is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
src/workflow/tasks.py
37-37: Do not catch blind exception: Exception
(BLE001)
120-120: Do not catch blind exception: Exception
(BLE001)
121-121: Use explicit conversion flag
Replace with conversion flag
(RUF010)
131-131: Use explicit conversion flag
Replace with conversion flag
(RUF010)
133-134: try-except-pass detected, consider logging the exception
(S110)
133-133: Do not catch blind exception: Exception
(BLE001)
140-141: try-except-pass detected, consider logging the exception
(S110)
140-140: Do not catch blind exception: Exception
(BLE001)
157-158: try-except-pass detected, consider logging the exception
(S110)
157-157: Do not catch blind exception: Exception
(BLE001)
src/workflow/QueueManager.py
75-75: Do not catch blind exception: Exception
(BLE001)
95-95: Do not catch blind exception: Exception
(BLE001)
109-109: PEP 484 prohibits implicit Optional
Convert to T | None
(RUF013)
146-146: Consider moving this statement to an else block
(TRY300)
147-147: Do not catch blind exception: Exception
(BLE001)
205-205: Do not catch blind exception: Exception
(BLE001)
214-214: Consider moving this statement to an else block
(TRY300)
215-215: Do not catch blind exception: Exception
(BLE001)
236-236: Consider moving this statement to an else block
(TRY300)
237-237: Do not catch blind exception: Exception
(BLE001)
265-265: Do not catch blind exception: Exception
(BLE001)
src/workflow/health.py
38-38: Do not catch blind exception: Exception
(BLE001)
85-85: Do not catch blind exception: Exception
(BLE001)
128-128: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build-simple-app
- GitHub Check: build-full-app
- GitHub Check: build-openms
🔇 Additional comments (19)
docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md (1)
1-9: Well-structured implementation plan.The document provides comprehensive guidance covering architecture, migration steps, rollback procedures, and testing checklists. The dual-mode design (online/offline) with graceful fallback is well thought out.
Dockerfile (2)
121-128: Redis installation looks good.The Redis server installation with
--no-install-recommendsand proper data directory setup with correct ownership is well done.
166-166: Redis persistence disabled despite plan documentation.The entrypoint uses
--appendonly nobut the implementation plan (line 175 in docs) recommends--appendonly yesfor job persistence across container restarts. If persistence is intentionally disabled, consider documenting this decision.src/workflow/tasks.py (3)
1-6: Clear module documentation.The docstring correctly notes that this module must be importable without Streamlit, which is crucial for RQ workers.
150-158: Progress update helper is well-designed.The defensive approach with
if job is not Noneand the try-except ensures progress updates never break workflow execution. This is appropriate for a non-critical feature.
78-90: Missinguiattribute concern is invalid for worker execution context.The manual workflow construction omits the
self.uiattribute fromWorkflowManager.__init__, but this is not a problem. The worker execution path only callsworkflow.execution(), which uses onlyself.params,self.logger,self.file_manager, andself.executor—all of which are properly set. Theuiattribute is only needed for Streamlit UI methods (upload(),configure(),results()), which are not called in the worker context. The construction is appropriate and safe for this use case.Likely an incorrect or invalid review comment.
src/common/common.py (1)
433-435: Queue metrics gated to online mode looks good.src/workflow/health.py (2)
11-43: Redis health check output is well-structured.
45-89: Worker health aggregation looks solid.src/workflow/WorkflowManager.py (3)
112-169: Queue-aware status reporting is clean and comprehensive.
171-212: Stop workflow flow looks consistent with local and queued modes.
229-233: Execution-section wiring to new callbacks looks correct.src/workflow/StreamlitUI.py (1)
989-1087: Execution section integration with queue status looks solid.src/workflow/QueueManager.py (6)
105-149: Job submission API is clear and well-scoped.
150-207: Job status mapping and metadata extraction look good.
208-217: Queue position helper is straightforward.
218-239: Cancel flow is clean and minimal.
240-267: Queue stats aggregation is solid.
268-284: Job ID persistence helpers are simple and effective.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| @st.fragment(run_every=5) | ||
| def monitor_queue(): | ||
| """Display queue metrics in sidebar (online mode only)""" | ||
| try: | ||
| from src.workflow.health import get_queue_metrics | ||
|
|
||
| metrics = get_queue_metrics() | ||
| if not metrics.get("available", False): | ||
| return | ||
|
|
||
| st.markdown("---") | ||
| st.markdown("**Queue Status**") | ||
|
|
||
| total_workers = metrics.get("total_workers", 0) | ||
| busy_workers = metrics.get("busy_workers", 0) | ||
| queued_jobs = metrics.get("queued_jobs", 0) | ||
|
|
||
| col1, col2 = st.columns(2) | ||
| col1.metric( | ||
| "Workers", | ||
| f"{busy_workers}/{total_workers}", | ||
| help="Busy workers / Total workers" | ||
| ) | ||
| col2.metric( | ||
| "Queued", | ||
| queued_jobs, | ||
| help="Jobs waiting in queue" | ||
| ) | ||
|
|
||
| # Utilization progress bar | ||
| if total_workers > 0: | ||
| utilization = busy_workers / total_workers | ||
| st.progress(utilization, text=f"{int(utilization * 100)}% utilized") | ||
|
|
||
| # Warning if queue is backing up | ||
| if queued_jobs > total_workers * 2 and total_workers > 0: | ||
| st.warning(f"High queue depth: {queued_jobs} jobs waiting") | ||
|
|
||
| st.caption(f"Last fetched at: {time.strftime('%H:%M:%S')}") | ||
|
|
||
| except Exception: | ||
| pass # Silently fail if queue not available | ||
|
|
There was a problem hiding this comment.
Don’t swallow queue-monitoring exceptions silently.
Silent except Exception: pass makes queue outages invisible and hard to debug. Consider logging at debug level to keep the UI clean while preserving diagnostics.
🔧 Suggested fix
+import logging
+
+LOGGER = logging.getLogger(__name__)
...
- except Exception:
- pass # Silently fail if queue not available
+ except Exception as exc:
+ LOGGER.debug("Queue metrics unavailable", exc_info=exc)🧰 Tools
🪛 Ruff (0.14.11)
83-84: try-except-pass detected, consider logging the exception
(S110)
83-83: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@src/common/common.py` around lines 43 - 85, Replace the silent blanket except
in monitor_queue with a debug-level log that preserves exception details: catch
Exception as e and call logging.getLogger(__name__).debug(...) (include a short
message like "monitor_queue: failed to fetch queue metrics" and pass
exc_info=True or the exception) so failures are recorded for diagnostics but not
shown in the UI; ensure logging is imported at top of the module and reference
the monitor_queue function and the get_queue_metrics call in the log message for
context.
| # Initialize queue manager for online mode | ||
| self._queue_manager: Optional['QueueManager'] = None | ||
| if self._is_online_mode(): | ||
| self._init_queue_manager() | ||
|
|
||
| def _is_online_mode(self) -> bool: | ||
| """Check if running in online deployment mode""" | ||
| return st.session_state.get("settings", {}).get("online_deployment", False) | ||
|
|
||
| def _init_queue_manager(self) -> None: | ||
| """Initialize queue manager for online mode""" | ||
| try: | ||
| from .QueueManager import QueueManager | ||
| self._queue_manager = QueueManager() | ||
| except ImportError: | ||
| pass # Queue not available, will use fallback | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's check the current state of the WorkflowManager.py file
head -50 src/workflow/WorkflowManager.pyRepository: OpenMS/streamlit-template
Length of output: 2026
🏁 Script executed:
# Check imports section
head -30 src/workflow/WorkflowManager.py | cat -nRepository: OpenMS/streamlit-template
Length of output: 1472
🏁 Script executed:
# Find ruff configuration
find . -name "pyproject.toml" -o -name "ruff.toml" -o -name ".ruff.toml" | head -5Repository: OpenMS/streamlit-template
Length of output: 80
🏁 Script executed:
# Check if ruff is available and run it on the WorkflowManager.py file
if command -v ruff &> /dev/null; then
ruff check src/workflow/WorkflowManager.py --select F821 2>&1 || true
else
echo "ruff not available"
fiRepository: OpenMS/streamlit-template
Length of output: 574
🏁 Script executed:
# Check Python version and if we can run a type checker simulation
# Let's look for any linting configuration in the repo
find . -maxdepth 2 -type f \( -name "pyproject.toml" -o -name "setup.py" -o -name ".flake8" -o -name "setup.cfg" \) | head -10Repository: OpenMS/streamlit-template
Length of output: 51
🏁 Script executed:
# Check the ruff configuration
cat ruff.tomlRepository: OpenMS/streamlit-template
Length of output: 975
🏁 Script executed:
# Now let's verify the suggested fix works by creating a test
# Create a temporary file with the fix applied and check it
cat > /tmp/test_fix.py << 'EOF'
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from .Logger import Logger
from .ParameterManager import ParameterManager
from .CommandExecutor import CommandExecutor
from .StreamlitUI import StreamlitUI
from .FileManager import FileManager
import multiprocessing
import streamlit as st
import shutil
import time
import traceback
if TYPE_CHECKING:
from .QueueManager import QueueManager
class WorkflowManager:
# Core workflow logic using the above classes
def __init__(self, name: str, workspace: str):
self.name = name
self.workflow_dir = Path(workspace, name.replace(" ", "-").lower())
self.file_manager = FileManager(self.workflow_dir)
self.logger = Logger(self.workflow_dir)
self.parameter_manager = ParameterManager(self.workflow_dir)
self.executor = CommandExecutor(self.workflow_dir, self.logger, self.parameter_manager)
self.ui = StreamlitUI(self.workflow_dir, self.logger, self.executor, self.parameter_manager)
self.params = self.parameter_manager.get_parameters_from_json()
# Initialize queue manager for online mode
self._queue_manager: Optional["QueueManager"] = None
if self._is_online_mode():
self._init_queue_manager()
EOF
# Verify it with ruff
ruff check /tmp/test_fix.py --select F821 2>&1 || echo "No F821 errors found!"Repository: OpenMS/streamlit-template
Length of output: 87
Add TYPE_CHECKING guard to resolve F821 linting error.
Ruff flags 'QueueManager' as undefined because the name is not bound in module scope, even with the string forward reference. Using TYPE_CHECKING avoids the lint error without runtime overhead.
🔧 Suggested fix
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from .QueueManager import QueueManager
...
-self._queue_manager: Optional['QueueManager'] = None
+self._queue_manager: Optional["QueueManager"] = None🧰 Tools
🪛 Ruff (0.14.11)
27-27: Undefined name QueueManager
(F821)
🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 26 - 42, The lint error arises
because 'QueueManager' is referenced as a forward string but not imported at
module scope; update the module to import QueueManager inside a TYPE_CHECKING
block so the name exists for static checkers without runtime import. Add "from
typing import TYPE_CHECKING" at the top and under "if TYPE_CHECKING:" add "from
.QueueManager import QueueManager"; keep the runtime dynamic import inside
_init_queue_manager() (the existing function) and leave _is_online_mode()
unchanged.
| def _start_workflow_queued(self) -> None: | ||
| """Submit workflow to Redis queue (online mode)""" | ||
| from .tasks import execute_workflow | ||
|
|
||
| # Generate unique job ID based on workflow directory | ||
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" | ||
|
|
||
| # Submit job to queue | ||
| submitted_id = self._queue_manager.submit_job( | ||
| func=execute_workflow, | ||
| kwargs={ | ||
| "workflow_dir": str(self.workflow_dir), | ||
| "workflow_class": self.__class__.__name__, | ||
| "workflow_module": self.__class__.__module__, | ||
| }, | ||
| job_id=job_id, | ||
| timeout=7200, # 2 hour timeout | ||
| description=f"Workflow: {self.name}" | ||
| ) | ||
|
|
||
| if submitted_id: | ||
| # Store job ID for status checking | ||
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | ||
| else: | ||
| # Fallback to local execution if queue submission fails | ||
| st.warning("Queue submission failed, running locally...") | ||
| self._start_workflow_local() |
There was a problem hiding this comment.
Use collision-resistant job IDs for queued workflows.
int(time.time()) can collide when multiple starts occur within the same second, causing enqueue failures and unexpected local fallback.
🔧 Suggested fix
+import uuid
...
- job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}"
+ job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _start_workflow_queued(self) -> None: | |
| """Submit workflow to Redis queue (online mode)""" | |
| from .tasks import execute_workflow | |
| # Generate unique job ID based on workflow directory | |
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" | |
| # Submit job to queue | |
| submitted_id = self._queue_manager.submit_job( | |
| func=execute_workflow, | |
| kwargs={ | |
| "workflow_dir": str(self.workflow_dir), | |
| "workflow_class": self.__class__.__name__, | |
| "workflow_module": self.__class__.__module__, | |
| }, | |
| job_id=job_id, | |
| timeout=7200, # 2 hour timeout | |
| description=f"Workflow: {self.name}" | |
| ) | |
| if submitted_id: | |
| # Store job ID for status checking | |
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | |
| else: | |
| # Fallback to local execution if queue submission fails | |
| st.warning("Queue submission failed, running locally...") | |
| self._start_workflow_local() | |
| def _start_workflow_queued(self) -> None: | |
| """Submit workflow to Redis queue (online mode)""" | |
| from .tasks import execute_workflow | |
| # Generate unique job ID based on workflow directory | |
| job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}" | |
| # Submit job to queue | |
| submitted_id = self._queue_manager.submit_job( | |
| func=execute_workflow, | |
| kwargs={ | |
| "workflow_dir": str(self.workflow_dir), | |
| "workflow_class": self.__class__.__name__, | |
| "workflow_module": self.__class__.__module__, | |
| }, | |
| job_id=job_id, | |
| timeout=7200, # 2 hour timeout | |
| description=f"Workflow: {self.name}" | |
| ) | |
| if submitted_id: | |
| # Store job ID for status checking | |
| self._queue_manager.store_job_id(self.workflow_dir, submitted_id) | |
| else: | |
| # Fallback to local execution if queue submission fails | |
| st.warning("Queue submission failed, running locally...") | |
| self._start_workflow_local() |
🤖 Prompt for AI Agents
In `@src/workflow/WorkflowManager.py` around lines 55 - 81, The job_id generation
in _start_workflow_queued currently uses int(time.time()) which can collide;
replace that single-second timestamp with a collision-resistant identifier
(e.g., append or replace with uuid.uuid4().hex or use time.time_ns() combined
with a short random/UUID) when building job_id in _start_workflow_queued; ensure
you import uuid if using uuid.uuid4(), keep the existing job_id format prefix
("workflow-{self.workflow_dir.name}-...") so downstream uses (e.g.,
_queue_manager.store_job_id and submit_job) continue to find the ID.
There was a problem hiding this comment.
Pull request overview
This pull request introduces a Redis-based queue system for workflow execution in online deployment mode, while maintaining backward compatibility with the existing multiprocessing approach for local mode. The implementation adds job queuing, status tracking, and queue monitoring capabilities specifically for Docker deployments.
Changes:
- Added Redis Queue (RQ) infrastructure for online mode workflow execution
- Implemented QueueManager for job submission, tracking, and cancellation
- Created worker tasks module for background workflow execution
- Added health monitoring and queue metrics display
- Updated Dockerfile to run Redis server and RQ workers alongside Streamlit
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| src/workflow/tasks.py | New worker task execution logic for RQ workers |
| src/workflow/health.py | Redis and worker health check utilities |
| src/workflow/QueueManager.py | Core queue management implementation |
| src/workflow/WorkflowManager.py | Added queue submission and status tracking |
| src/workflow/StreamlitUI.py | Queue status display and progress tracking UI |
| src/common/common.py | Queue metrics monitoring in sidebar |
| settings.json | Queue configuration settings |
| requirements.txt | Added Redis and RQ dependencies |
| Dockerfile | Redis server and RQ worker setup |
| docs/REDIS_QUEUE_IMPLEMENTATION_PLAN.md | Comprehensive implementation documentation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| workflow.logger = logger | ||
| workflow.parameter_manager = parameter_manager | ||
| workflow.executor = executor | ||
| workflow.params = params |
There was a problem hiding this comment.
The workflow instance created using object.new bypasses the normal init method and may miss important initialization. Consider verifying that all required workflow attributes are set before calling workflow.execution(). Specifically, check if the workflow's init sets any other attributes beyond those manually assigned here (lines 81-87), such as self.ui or other workflow-specific state.
| workflow.params = params | |
| workflow.params = params | |
| # Ensure attributes normally initialized in __init__ but not required | |
| # for headless execution are present to avoid AttributeError later. | |
| # In particular, many workflows expect `self.ui` to exist. | |
| if not hasattr(workflow, "ui"): | |
| workflow.ui = None |
| class QueueManager: | ||
| """ | ||
| Manages Redis Queue operations for workflow execution. | ||
|
|
||
| Only active in online mode. Falls back to direct execution in local mode. | ||
| Redis runs on localhost within the same container. | ||
| """ | ||
|
|
||
| QUEUE_NAME = "openms-workflows" | ||
| # Redis runs locally in the same container | ||
| REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") | ||
|
|
||
| def __init__(self): | ||
| self._redis = None | ||
| self._queue = None | ||
| self._is_online = self._check_online_mode() | ||
| self._init_attempted = False | ||
|
|
||
| if self._is_online: | ||
| self._init_redis() | ||
|
|
||
| def _check_online_mode(self) -> bool: | ||
| """Check if running in online mode""" | ||
| # Check environment variable first (set in Docker) | ||
| if os.environ.get("REDIS_URL"): | ||
| return True | ||
|
|
||
| # Fallback: check settings file | ||
| try: | ||
| with open("settings.json", "r") as f: | ||
| settings = json.load(f) | ||
| return settings.get("online_deployment", False) | ||
| except Exception: | ||
| return False | ||
|
|
||
| def _init_redis(self) -> None: | ||
| """Initialize Redis connection and queue""" | ||
| if self._init_attempted: | ||
| return | ||
| self._init_attempted = True | ||
|
|
||
| try: | ||
| from redis import Redis | ||
| from rq import Queue | ||
|
|
||
| self._redis = Redis.from_url(self.REDIS_URL) | ||
| self._redis.ping() # Test connection | ||
| self._queue = Queue(self.QUEUE_NAME, connection=self._redis) | ||
| except ImportError: | ||
| # Redis/RQ packages not installed | ||
| self._redis = None | ||
| self._queue = None | ||
| except Exception: | ||
| # Redis server not available | ||
| self._redis = None | ||
| self._queue = None | ||
|
|
||
| @property | ||
| def is_available(self) -> bool: | ||
| """Check if queue system is available""" | ||
| return self._is_online and self._queue is not None | ||
|
|
||
| def submit_job( | ||
| self, | ||
| func: Callable, | ||
| args: tuple = (), | ||
| kwargs: dict = None, | ||
| job_id: Optional[str] = None, | ||
| timeout: int = 7200, # 2 hour default | ||
| result_ttl: int = 86400, # 24 hours | ||
| description: str = "" | ||
| ) -> Optional[str]: | ||
| """ | ||
| Submit a job to the queue. | ||
|
|
||
| Args: | ||
| func: The function to execute | ||
| args: Positional arguments for the function | ||
| kwargs: Keyword arguments for the function | ||
| job_id: Optional custom job ID (defaults to UUID) | ||
| timeout: Job timeout in seconds | ||
| result_ttl: How long to keep results | ||
| description: Human-readable job description | ||
|
|
||
| Returns: | ||
| Job ID if successful, None otherwise | ||
| """ | ||
| if not self.is_available: | ||
| return None | ||
|
|
||
| kwargs = kwargs or {} | ||
|
|
||
| try: | ||
| job = self._queue.enqueue( | ||
| func, | ||
| args=args, | ||
| kwargs=kwargs, | ||
| job_id=job_id, | ||
| job_timeout=timeout, | ||
| result_ttl=result_ttl, | ||
| description=description, | ||
| meta={"description": description, "progress": 0.0, "current_step": ""} | ||
| ) | ||
| return job.id | ||
| except Exception: | ||
| return None | ||
|
|
||
| def get_job_info(self, job_id: str) -> Optional[JobInfo]: | ||
| """ | ||
| Get information about a job. | ||
|
|
||
| Args: | ||
| job_id: The job ID to query | ||
|
|
||
| Returns: | ||
| JobInfo object or None if not found | ||
| """ | ||
| if not self.is_available: | ||
| return None | ||
|
|
||
| try: | ||
| from rq.job import Job | ||
|
|
||
| job = Job.fetch(job_id, connection=self._redis) | ||
|
|
||
| # Map RQ status to our enum | ||
| status_map = { | ||
| "queued": JobStatus.QUEUED, | ||
| "started": JobStatus.STARTED, | ||
| "finished": JobStatus.FINISHED, | ||
| "failed": JobStatus.FAILED, | ||
| "deferred": JobStatus.DEFERRED, | ||
| "canceled": JobStatus.CANCELED, | ||
| } | ||
|
|
||
| status = status_map.get(job.get_status(), JobStatus.QUEUED) | ||
|
|
||
| # Get progress from job meta | ||
| meta = job.meta or {} | ||
| progress = meta.get("progress", 0.0) | ||
| current_step = meta.get("current_step", "") | ||
|
|
||
| # Calculate queue position if queued | ||
| queue_position = None | ||
| queue_length = None | ||
| if status == JobStatus.QUEUED: | ||
| queue_position = self._get_job_position(job_id) | ||
| queue_length = len(self._queue) | ||
|
|
||
| return JobInfo( | ||
| job_id=job.id, | ||
| status=status, | ||
| progress=progress, | ||
| current_step=current_step, | ||
| queue_position=queue_position, | ||
| queue_length=queue_length, | ||
| result=job.result if status == JobStatus.FINISHED else None, | ||
| error=str(job.exc_info) if job.exc_info else None, | ||
| enqueued_at=str(job.enqueued_at) if job.enqueued_at else None, | ||
| started_at=str(job.started_at) if job.started_at else None, | ||
| ended_at=str(job.ended_at) if job.ended_at else None, | ||
| ) | ||
| except Exception: | ||
| return None | ||
|
|
||
| def _get_job_position(self, job_id: str) -> Optional[int]: | ||
| """Get position of a job in the queue (1-indexed)""" | ||
| try: | ||
| job_ids = self._queue.job_ids | ||
| if job_id in job_ids: | ||
| return job_ids.index(job_id) + 1 | ||
| return None | ||
| except Exception: | ||
| return None | ||
|
|
||
| def cancel_job(self, job_id: str) -> bool: | ||
| """ | ||
| Cancel a queued or running job. | ||
|
|
||
| Args: | ||
| job_id: The job ID to cancel | ||
|
|
||
| Returns: | ||
| True if successfully canceled | ||
| """ | ||
| if not self.is_available: | ||
| return False | ||
|
|
||
| try: | ||
| from rq.job import Job | ||
|
|
||
| job = Job.fetch(job_id, connection=self._redis) | ||
| job.cancel() | ||
| return True | ||
| except Exception: | ||
| return False | ||
|
|
||
| def get_queue_stats(self) -> dict: | ||
| """ | ||
| Get queue statistics. | ||
|
|
||
| Returns: | ||
| Dictionary with queue stats | ||
| """ | ||
| if not self.is_available: | ||
| return {} | ||
|
|
||
| try: | ||
| from rq import Worker | ||
|
|
||
| workers = Worker.all(connection=self._redis) | ||
| busy_workers = len([w for w in workers if w.get_state() == "busy"]) | ||
|
|
||
| return { | ||
| "queued": len(self._queue), | ||
| "started": len(self._queue.started_job_registry), | ||
| "finished": len(self._queue.finished_job_registry), | ||
| "failed": len(self._queue.failed_job_registry), | ||
| "workers": len(workers), | ||
| "busy_workers": busy_workers, | ||
| "idle_workers": len(workers) - busy_workers, | ||
| } | ||
| except Exception: | ||
| return {} | ||
|
|
||
| def store_job_id(self, workflow_dir: Path, job_id: str) -> None: | ||
| """Store job ID in workflow directory for recovery""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| job_file.write_text(job_id) | ||
|
|
||
| def load_job_id(self, workflow_dir: Path) -> Optional[str]: | ||
| """Load job ID from workflow directory""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| if job_file.exists(): | ||
| return job_file.read_text().strip() | ||
| return None | ||
|
|
||
| def clear_job_id(self, workflow_dir: Path) -> None: | ||
| """Clear stored job ID""" | ||
| job_file = Path(workflow_dir) / ".job_id" | ||
| if job_file.exists(): | ||
| job_file.unlink() |
There was a problem hiding this comment.
The new QueueManager class and Redis queue functionality lack automated test coverage. Consider adding tests to verify: queue initialization and availability detection, job submission and retrieval, job status tracking, job cancellation, queue statistics reporting, and graceful fallback to local execution when Redis is unavailable. This is particularly important for production reliability of the online deployment mode.
| import importlib | ||
| module = importlib.import_module(workflow_module) | ||
| WorkflowClass = getattr(module, workflow_class) |
There was a problem hiding this comment.
The workflow_class and workflow_module parameters are used with importlib and getattr to dynamically load and instantiate classes (lines 54-55). If these values are derived from user input without validation, this could potentially allow arbitrary code execution. Ensure that workflow_class and workflow_module are validated against a whitelist of allowed workflows before dynamic loading, or verify they are only set internally by the WorkflowManager.
| # Generate unique job ID based on workflow directory | ||
| job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" |
There was a problem hiding this comment.
The job_id is generated using the workflow directory name and timestamp (line 60), which could lead to collisions if the same workflow is started multiple times within the same second. Consider adding additional entropy (e.g., a random component or UUID suffix) to ensure uniqueness, or document that rapid successive starts of the same workflow are not supported.
| def execute_workflow( | ||
| workflow_dir: str, | ||
| workflow_class: str, | ||
| workflow_module: str, | ||
| ) -> dict: | ||
| """ | ||
| Execute a workflow in the worker process. | ||
|
|
||
| This function is called by the RQ worker to execute a workflow. | ||
| It reconstructs the workflow object and calls its execution() method. | ||
|
|
||
| Args: | ||
| workflow_dir: Path to the workflow directory | ||
| workflow_class: Name of the Workflow class | ||
| workflow_module: Module path containing the Workflow class | ||
|
|
||
| Returns: | ||
| Dictionary with execution results | ||
| """ | ||
| try: | ||
| from rq import get_current_job | ||
| job = get_current_job() | ||
| except Exception: | ||
| job = None | ||
|
|
||
| workflow_path = Path(workflow_dir) | ||
|
|
||
| try: | ||
| # Update progress | ||
| _update_progress(job, 0.0, "Initializing workflow...") | ||
|
|
||
| # Import required modules | ||
| from src.workflow.CommandExecutor import CommandExecutor | ||
| from src.workflow.FileManager import FileManager | ||
| from src.workflow.ParameterManager import ParameterManager | ||
| from src.workflow.Logger import Logger | ||
|
|
||
| # Load the workflow class dynamically | ||
| import importlib | ||
| module = importlib.import_module(workflow_module) | ||
| WorkflowClass = getattr(module, workflow_class) | ||
|
|
||
| _update_progress(job, 0.05, "Loading parameters...") | ||
|
|
||
| # Delete the log file if it already exists | ||
| shutil.rmtree(Path(workflow_path, "logs"), ignore_errors=True) | ||
|
|
||
| # Load parameters from saved params.json | ||
| params_file = workflow_path / "params.json" | ||
| if params_file.exists(): | ||
| with open(params_file, "r") as f: | ||
| params = json.load(f) | ||
| else: | ||
| params = {} | ||
|
|
||
| # Initialize workflow components | ||
| logger = Logger(workflow_path) | ||
| file_manager = FileManager(workflow_path) | ||
| parameter_manager = ParameterManager(workflow_path) | ||
| executor = CommandExecutor(workflow_path, logger, parameter_manager) | ||
|
|
||
| _update_progress(job, 0.1, "Starting workflow execution...") | ||
|
|
||
| # Create workflow instance | ||
| # We need to bypass the normal __init__ which requires Streamlit | ||
| workflow = object.__new__(WorkflowClass) | ||
| workflow.name = workflow_path.name | ||
| workflow.workflow_dir = workflow_path | ||
| workflow.file_manager = file_manager | ||
| workflow.logger = logger | ||
| workflow.parameter_manager = parameter_manager | ||
| workflow.executor = executor | ||
| workflow.params = params | ||
|
|
||
| # Store job reference for progress updates | ||
| workflow._rq_job = job | ||
|
|
||
| # Clear results directory | ||
| results_dir = workflow_path / "results" | ||
| if results_dir.exists(): | ||
| shutil.rmtree(results_dir) | ||
| results_dir.mkdir(parents=True) | ||
|
|
||
| # Log workflow start | ||
| logger.log("STARTING WORKFLOW") | ||
|
|
||
| _update_progress(job, 0.15, "Executing workflow steps...") | ||
|
|
||
| # Execute the workflow | ||
| workflow.execution() | ||
|
|
||
| # Log workflow completion | ||
| logger.log("WORKFLOW FINISHED") | ||
|
|
||
| _update_progress(job, 1.0, "Workflow completed") | ||
|
|
||
| # Clean up pid directory (in case it was created by accident) | ||
| shutil.rmtree(executor.pid_dir, ignore_errors=True) | ||
|
|
||
| return { | ||
| "success": True, | ||
| "workflow_dir": str(workflow_path), | ||
| "message": "Workflow completed successfully" | ||
| } | ||
|
|
||
| except Exception as e: | ||
| error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}" | ||
|
|
||
| # Log error to workflow logs | ||
| try: | ||
| log_dir = workflow_path / "logs" | ||
| log_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]: | ||
| log_file = log_dir / log_name | ||
| with open(log_file, "a") as f: | ||
| f.write(f"\n\nERROR: {str(e)}\n") | ||
| f.write(traceback.format_exc()) | ||
| except Exception: | ||
| pass | ||
|
|
||
| # Clean up pid directory | ||
| try: | ||
| pid_dir = workflow_path / "pids" | ||
| shutil.rmtree(pid_dir, ignore_errors=True) | ||
| except Exception: | ||
| pass | ||
|
|
||
| return { | ||
| "success": False, | ||
| "workflow_dir": str(workflow_path), | ||
| "error": error_msg | ||
| } | ||
|
|
||
|
|
||
| def _update_progress(job, progress: float, step: str) -> None: | ||
| """Update job progress metadata""" | ||
| if job is not None: | ||
| try: | ||
| job.meta["progress"] = progress | ||
| job.meta["current_step"] = step | ||
| job.save_meta() | ||
| except Exception: | ||
| pass # Ignore errors updating progress |
There was a problem hiding this comment.
The execute_workflow function in tasks.py lacks automated test coverage. Consider adding tests to verify: workflow instance creation without Streamlit dependencies, parameter loading from params.json, progress tracking, error handling and logging, and cleanup of temporary directories. These tests should mock the RQ job context and verify the function works correctly in the worker environment.
| else: | ||
| # Fallback to local execution if queue submission fails | ||
| st.warning("Queue submission failed, running locally...") | ||
| self._start_workflow_local() |
There was a problem hiding this comment.
When queue submission fails and falls back to local execution (line 81), a warning is shown but the logs are not deleted before starting the local workflow. In contrast, _start_workflow_local (line 86) deletes logs before starting. This inconsistency means that when falling back, old log files may persist and confuse users. Consider calling shutil.rmtree for logs before the fallback, or refactor to ensure consistent behavior.
| This module must be importable without Streamlit being available. | ||
| """ | ||
|
|
||
| import sys |
There was a problem hiding this comment.
Import of 'sys' is not used.
| import sys |
| with open(log_file, "a") as f: | ||
| f.write(f"\n\nERROR: {str(e)}\n") | ||
| f.write(traceback.format_exc()) | ||
| except Exception: |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| try: | ||
| pid_dir = workflow_path / "pids" | ||
| shutil.rmtree(pid_dir, ignore_errors=True) | ||
| except Exception: |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…plementation-plan-JxzUV
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/workflow/StreamlitUI.py`:
- Around line 1094-1133: status_display currently yields (label, msg_type) but
msg_type is unused and some f-strings have no placeholders (e.g.,
st.error(f"**Status: Completed with errors**")), causing lint noise; fix by
changing the unpack to only extract label (e.g., label =
status_display.get(job_status, ("Unknown", "info"))[0]) or convert the dict to
map to single label strings, and replace any f-strings without placeholders
(st.error(f"...") -> st.error("...")) so only true f-strings remain (update
occurrences in the queue/started/finished branches where
st.info/st.error/st.success are called).
In `@src/workflow/tasks.py`:
- Around line 121-142: The exception handler should stop any spawned processes
before removing PID files: ensure you call executor.stop() (or getattr(executor,
"stop", None)()) if an executor from workflow.execution() or CommandExecutor
exists, and do this prior to deleting pid_dir; update the except block around
the cleanup where pid_dir is computed so it first attempts to stop the executor
safely (wrap in try/except to ignore missing executor or stop errors) and only
then calls shutil.rmtree on pid_dir, keeping existing logging and
error-suppression behavior.
♻️ Duplicate comments (7)
src/workflow/tasks.py (2)
52-55: Validate dynamic imports to avoid arbitrary code execution.If
workflow_module/workflow_classcan be influenced by user input, Line 52–55 allows arbitrary imports. Consider whitelisting allowed workflows or mapping names to classes.
79-88: Ensure required attributes exist when bypassing__init__.Line 81 uses
object.__new__, which skips initialization and can leave workflows missing expected attributes (e.g.,ui). Ensure defaults are set for attributes that downstream workflows assume.🔧 Example defensive initialization
workflow = object.__new__(WorkflowClass) workflow.name = workflow_path.name workflow.workflow_dir = workflow_path workflow.file_manager = file_manager workflow.logger = logger workflow.parameter_manager = parameter_manager workflow.executor = executor workflow.params = params + if not hasattr(workflow, "ui"): + workflow.ui = NoneDockerfile (3)
166-166: Redis persistence is disabled; jobs will be lost on restart.Line 166 uses
--appendonly no, which disables persistence. If this is production-facing, consider enabling AOF or explicitly documenting ephemeral queues.🔧 Option: enable AOF persistence
-redis-server --daemonize yes --dir /var/lib/redis --appendonly no +redis-server --daemonize yes --dir /var/lib/redis --appendonly yes
168-172: Add a timeout to Redis readiness check to avoid infinite wait.The readiness loop can hang indefinitely if Redis fails to start. Add a retry limit or timeout.
🔧 Suggested timeout
-# Wait for Redis to be ready\n\ -until redis-cli ping > /dev/null 2>&1; do\n\ - echo "Waiting for Redis..."\n\ - sleep 1\n\ -done\n\ +# Wait for Redis to be ready (with timeout)\n\ +RETRY=0\n\ +MAX_RETRIES=30\n\ +until redis-cli ping > /dev/null 2>&1; do\n\ + RETRY=$((RETRY+1))\n\ + if [ "$RETRY" -ge "$MAX_RETRIES" ]; then\n\ + echo "Redis failed to start after $MAX_RETRIES attempts"\n\ + exit 1\n\ + fi\n\ + echo "Waiting for Redis..."\n\ + sleep 1\n\ +done\n\
175-180: Add signal handling for background RQ workers.Workers started with
&won’t receive termination signals, which risks abrupt job termination and inconsistent state. Trap signals and stop workers/Redis cleanly.🔧 Example trap/cleanup
-# Start RQ worker(s) in background\n\ -WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\ +# Start RQ worker(s) in background\n\ +WORKER_COUNT=${RQ_WORKER_COUNT:-1}\n\ +PIDS=()\n\ +cleanup() {\n\ + echo "Shutting down..."\n\ + for pid in "${PIDS[@]}"; do\n\ + kill -TERM "$pid" 2>/dev/null || true\n\ + done\n\ + redis-cli shutdown nosave || true\n\ +}\n\ +trap cleanup SIGTERM SIGINT\n\ for i in $(seq 1 $WORKER_COUNT); do\n\ - rq worker openms-workflows --url $REDIS_URL --name worker-$i &\n\ + rq worker openms-workflows --url $REDIS_URL --name worker-$i &\n\ + PIDS+=($!)\n\ done\n\src/workflow/WorkflowManager.py (2)
2-2: FixQueueManagerforward reference to satisfy Ruff.Line 27 references
QueueManagerwithout binding it in module scope. Use aTYPE_CHECKINGguard to keep runtime imports deferred while satisfying static analysis.🔧 Suggested fix
-from typing import Optional +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .QueueManager import QueueManager ... - self._queue_manager: Optional['QueueManager'] = None + self._queue_manager: Optional["QueueManager"] = NoneAlso applies to: 26-28
59-61: Use collision-resistant job IDs.Line 60 uses
int(time.time()), which can collide when jobs start in the same second. Prefer a UUID or time-based suffix with more entropy.🔧 Suggested fix
+import uuid ... - job_id = f"workflow-{self.workflow_dir.name}-{int(time.time())}" + job_id = f"workflow-{self.workflow_dir.name}-{uuid.uuid4().hex}"
🧹 Nitpick comments (1)
src/workflow/tasks.py (1)
34-38: Avoid silentexcept: passblocks in worker context.Lines 37, 134–142, and 158–159 suppress errors entirely, which makes worker failures hard to diagnose. If these failures are expected, consider logging at least once to stderr or the workflow log.
🔧 Example: log progress update failures
except Exception: - pass # Ignore errors updating progress + traceback.print_exc()Also applies to: 125-135, 138-142, 151-159
| # Status icons | ||
| status_display = { | ||
| "queued": ("Queued", "info"), | ||
| "started": ("Running", "info"), | ||
| "finished": ("Completed", "success"), | ||
| "failed": ("Failed", "error"), | ||
| "canceled": ("Cancelled", "warning"), | ||
| } | ||
|
|
||
| label, msg_type = status_display.get(job_status, ("Unknown", "info")) | ||
|
|
||
| # Queue-specific information | ||
| if job_status == "queued": | ||
| queue_position = status.get("queue_position", "?") | ||
| queue_length = status.get("queue_length", "?") | ||
| st.info(f"**Status: {label}** - Your workflow is #{queue_position} in the queue ({queue_length} total jobs)") | ||
|
|
||
| # Visual queue indicator | ||
| if isinstance(queue_position, int) and isinstance(queue_length, int) and queue_length > 0: | ||
| progress = 1 - (queue_position / queue_length) | ||
| st.progress(progress, text=f"Queue position {queue_position} of {queue_length}") | ||
|
|
||
| elif job_status == "started": | ||
| progress = status.get("progress", 0) | ||
| current_step = status.get("current_step", "Processing...") | ||
| st.info(f"**Status: {label}**") | ||
| if progress and progress > 0: | ||
| st.progress(progress, text=current_step or "Processing...") | ||
|
|
||
| elif job_status == "finished": | ||
| # Check if the job result indicates success or failure | ||
| job_result = status.get("result") | ||
| if job_result and isinstance(job_result, dict) and job_result.get("success") is False: | ||
| st.error(f"**Status: Completed with errors**") | ||
| error_msg = job_result.get("error", "Unknown error") | ||
| if error_msg: | ||
| with st.expander("Error Details", expanded=True): | ||
| st.code(error_msg) | ||
| else: | ||
| st.success(f"**Status: {label}**") |
There was a problem hiding this comment.
Remove unused msg_type and redundant f-string.
Line 1103 assigns msg_type but it’s never used, and Line 1127 uses an f-string without placeholders. Clean these to satisfy Ruff and avoid lint noise.
🔧 Cleanup
- label, msg_type = status_display.get(job_status, ("Unknown", "info"))
+ label, _msg_type = status_display.get(job_status, ("Unknown", "info"))
...
- st.error(f"**Status: Completed with errors**")
+ st.error("**Status: Completed with errors**")🧰 Tools
🪛 Ruff (0.14.13)
1103-1103: Unpacked variable msg_type is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
1127-1127: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
In `@src/workflow/StreamlitUI.py` around lines 1094 - 1133, status_display
currently yields (label, msg_type) but msg_type is unused and some f-strings
have no placeholders (e.g., st.error(f"**Status: Completed with errors**")),
causing lint noise; fix by changing the unpack to only extract label (e.g.,
label = status_display.get(job_status, ("Unknown", "info"))[0]) or convert the
dict to map to single label strings, and replace any f-strings without
placeholders (st.error(f"...") -> st.error("...")) so only true f-strings remain
(update occurrences in the queue/started/finished branches where
st.info/st.error/st.success are called).
| except Exception as e: | ||
| error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}" | ||
|
|
||
| # Log error to workflow logs | ||
| try: | ||
| log_dir = workflow_path / "logs" | ||
| log_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]: | ||
| log_file = log_dir / log_name | ||
| with open(log_file, "a") as f: | ||
| f.write(f"\n\nERROR: {str(e)}\n") | ||
| f.write(traceback.format_exc()) | ||
| except Exception: | ||
| pass | ||
|
|
||
| # Clean up pid directory | ||
| try: | ||
| pid_dir = workflow_path / "pids" | ||
| shutil.rmtree(pid_dir, ignore_errors=True) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Stop spawned processes on failure before deleting PID files.
If workflow.execution() spawns processes via CommandExecutor, the exception path currently just deletes the PID dir. That can leave orphaned processes running. Consider calling executor.stop() (when available) before cleanup.
🔧 Proposed cleanup on failure
# Clean up pid directory
try:
+ if "executor" in locals():
+ executor.stop()
pid_dir = workflow_path / "pids"
shutil.rmtree(pid_dir, ignore_errors=True)
except Exception:
pass🧰 Tools
🪛 Ruff (0.14.13)
121-121: Do not catch blind exception: Exception
(BLE001)
122-122: Use explicit conversion flag
Replace with conversion flag
(RUF010)
132-132: Use explicit conversion flag
Replace with conversion flag
(RUF010)
134-135: try-except-pass detected, consider logging the exception
(S110)
134-134: Do not catch blind exception: Exception
(BLE001)
141-142: try-except-pass detected, consider logging the exception
(S110)
141-141: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@src/workflow/tasks.py` around lines 121 - 142, The exception handler should
stop any spawned processes before removing PID files: ensure you call
executor.stop() (or getattr(executor, "stop", None)()) if an executor from
workflow.execution() or CommandExecutor exists, and do this prior to deleting
pid_dir; update the except block around the cleanup where pid_dir is computed so
it first attempts to stop the executor safely (wrap in try/except to ignore
missing executor or stop errors) and only then calls shutil.rmtree on pid_dir,
keeping existing logging and error-suppression behavior.
Summary by CodeRabbit
New Features
Documentation
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.