-
Notifications
You must be signed in to change notification settings - Fork 769
SignalHandler.wait_for_any_signal #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduce the ability to wait for any one of multiple signals in workflow execution by adding a Changes
Sequence Diagram(s)sequenceDiagram
participant Workflow
participant SignalHandler
participant SignalBus
participant User/External
Workflow->>SignalHandler: wait_for_any_signal([signal1, signal2], workflow_id)
SignalHandler->>SignalBus: register waiters for signal1, signal2
User/External-->>SignalBus: emit signal2 with payload
SignalBus->>SignalHandler: notify signal2 received
SignalHandler->>Workflow: return Signal(name="signal2", payload, workflow_id)
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
src/mcp_agent/executor/executor.py
Outdated
if not hasattr(self.signal_bus, 'wait_for_any_signal'): | ||
raise NotImplementedError(f"The configured signal handler ({type(self.signal_bus).__name__}) does not support wait_for_any_signal.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an indentation inconsistency in this line. The line has an extra space at the beginning compared to the surrounding code, which will cause a Python syntax error. The indentation should be aligned with the other lines in this block.
if not hasattr(self.signal_bus, 'wait_for_any_signal'): | |
raise NotImplementedError(f"The configured signal handler ({type(self.signal_bus).__name__}) does not support wait_for_any_signal.") | |
if not hasattr(self.signal_bus, 'wait_for_any_signal'): | |
raise NotImplementedError(f"The configured signal handler ({type(self.signal_bus).__name__}) does not support wait_for_any_signal.") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments
|
||
|
||
def test_attach_to_workflow(handler, mock_workflow): | ||
@patch('mcp_agent.executor.temporal.workflow_signal.logger') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for the patch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problematic logging on event bus, this is a better fix: https://github.com/lastmile-ai/mcp-agent/pull/259/files#diff-94b2a5eaada97eada8595799f3c95c888ba65b3c709df94646a8d6f5b1d81a47R392
8039de0
to
312092e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (10)
src/mcp_agent/logging/transport.py (1)
392-395
: Potential silent-loss of events when bus not startedThe new guard avoids an
AttributeError
, but it also drops events emitted beforeAsyncEventBus.start()
is called.
If callers assume everyemit()
is eventually delivered to listeners, this silent loss can be surprising.if hasattr(self, '_queue') and self._queue is not None: await self._queue.put(event) +else: + # Consider at least logging at debug level so missed events are traceable + logger.debug( + "EventBus.emit: event dropped because queue not initialised " + "(did you forget to call start()?) – %s", + event, + )A minimal log (or buffering the events) will make debugging far easier.
src/mcp_agent/executor/executor.py (1)
215-235
:workflow_id
should be optional for parity withwait_for_signal
wait_for_signal()
lets callers passworkflow_id=None
, yetwait_for_any_signal()
requires a non-optionalworkflow_id
.
For API consistency and easier local-workflow use-cases the parameter should default toNone
.- workflow_id: str, + workflow_id: str | None = None,Down-stream implementations already accept
None
, so this is a drop-in change.tests/test_app.py (2)
13-22
: Autouse fixture works, but can be simplifiedUsing a singleton accessor each time incurs the extra instantiation cost:
-GlobalWorkflowTaskRegistry().clear() +GlobalWorkflowTaskRegistry.clear()(if
clear()
is a@classmethod
).
Purely optional readability tweak.
573-574
: Minor duplication
asyncio.iscoroutinefunction(decorated)
is asserted twice (lines 575 & 578).
One assertion is sufficient.examples/usecases/reliable_conversation/test_basic.py (1)
31-33
: Drop the no-op__init__
.
__init__
only delegates tosuper().__init__
without adding behavior—just delete it to avoid needless boilerplate.tests/executor/test_multi_signal.py (3)
5-5
: Remove unusedAsyncMock
import.Ruff F401 flags it; dropping the import keeps the test clean.
-from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import MagicMock, patch
125-132
: Avoid explicit== True
check.
assert condition_func()
is clearer and silences Ruff E712.- assert condition_func() == True + assert condition_func()
174-175
: Collapse nestedwith
statements.Use a single context-manager line to satisfy Ruff SIM117 and improve readability.
-with patch("temporalio.workflow._Runtime.current", return_value=None): - with pytest.raises(RuntimeError, match=...): +with patch("temporalio.workflow._Runtime.current", return_value=None), \ + pytest.raises(RuntimeError, match=...):(and similarly for the second occurrence)
Also applies to: 186-187
src/mcp_agent/executor/workflow_signal.py (2)
349-365
: Guard against emptysignal_names
& trim locals.
asyncio.wait
raisesValueError
on an empty iterable; catch this earlier and cut a few locals:- pending_signals: List[PendingSignal] = [] - waiter_tasks: List[asyncio.Task] = [] - - async with self._lock: - for name in signal_names: + if not signal_names: + raise ValueError("signal_names must be non-empty") + + async with self._lock: + pending_signals: list[PendingSignal] = [] + waiter_tasks: list[asyncio.Task] = [] + for name in signal_names:Helps readability and addresses pylint R0914 (too many locals).
258-269
: Console handler: clarify invalid input path.
input()
returns raw strings; if the user mistypes a signal name the function raisesValueError
after already spending timeout. Offering a re-prompt or early case-insensitive match would improve UX, but at minimum document this strict behavior.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (9)
examples/usecases/reliable_conversation/test_basic.py
(2 hunks)src/mcp_agent/executor/executor.py
(1 hunks)src/mcp_agent/executor/temporal/workflow_signal.py
(3 hunks)src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/executor/workflow_signal.py
(3 hunks)src/mcp_agent/logging/transport.py
(1 hunks)tests/executor/temporal/test_signal_handler.py
(1 hunks)tests/executor/test_multi_signal.py
(1 hunks)tests/test_app.py
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
src/mcp_agent/executor/workflow.py (2)
src/mcp_agent/logging/logger.py (1)
error
(135-143)src/mcp_agent/executor/temporal/workflow_signal.py (1)
push
(36-47)
src/mcp_agent/logging/transport.py (1)
src/mcp_agent/logging/logger.py (1)
event
(77-103)
src/mcp_agent/executor/temporal/workflow_signal.py (5)
src/mcp_agent/executor/executor.py (3)
wait_for_any_signal
(215-234)signal
(151-176)signal
(403-413)src/mcp_agent/executor/workflow_signal.py (8)
wait_for_any_signal
(78-103)wait_for_any_signal
(232-269)wait_for_any_signal
(349-424)Signal
(13-48)signal
(66-67)signal
(176-177)signal
(271-283)signal
(426-441)tests/executor/temporal/test_signal_handler.py (1)
mailbox
(11-12)src/mcp_agent/app.py (1)
logger
(164-168)src/mcp_agent/logging/logger.py (1)
debug
(105-113)
examples/usecases/reliable_conversation/test_basic.py (3)
src/mcp_agent/executor/workflow.py (1)
WorkflowResult
(55-59)examples/usecases/reliable_conversation/src/workflows/conversation_workflow.py (2)
ConversationWorkflow
(29-317)run
(42-55)src/mcp_agent/app.py (1)
config
(128-129)
src/mcp_agent/executor/workflow_signal.py (2)
src/mcp_agent/executor/executor.py (1)
wait_for_any_signal
(215-234)src/mcp_agent/executor/temporal/workflow_signal.py (2)
wait_for_any_signal
(181-241)value
(53-72)
🪛 GitHub Actions: Pull Request Checks
src/mcp_agent/executor/workflow.py
[error] 408-408: Ruff F821: Undefined name payload
.
tests/executor/test_multi_signal.py
[error] 5-5: Ruff F401: unittest.mock.AsyncMock
imported but unused.
[error] 131-131: Ruff E712: Avoid equality comparisons to True
; use condition_func():
for truth checks.
🪛 Ruff (0.11.9)
tests/executor/test_multi_signal.py
5-5: unittest.mock.AsyncMock
imported but unused
Remove unused import: unittest.mock.AsyncMock
(F401)
131-131: Avoid equality comparisons to True
; use if condition_func():
for truth checks
Replace with condition_func()
(E712)
174-175: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
186-187: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
🪛 Pylint (3.3.7)
examples/usecases/reliable_conversation/test_basic.py
[refactor] 28-28: Too few public methods (1/2)
(R0903)
src/mcp_agent/executor/workflow_signal.py
[refactor] 349-349: Too many local variables (18/15)
(R0914)
🔇 Additional comments (2)
tests/executor/temporal/test_signal_handler.py (1)
15-16
: LGTM – mocking the module logger keeps test output clean
No issues noted.examples/usecases/reliable_conversation/test_basic.py (1)
108-110
: Prefer immutable config over in-place mutation.Mutating
app.config.execution_engine
at runtime can lead to surprising side-effects if other components cache the setting. Consider constructing theMCPApp
with aSettings(execution_engine="asyncio")
object instead.
raw_payload = args[0] if args else None | ||
|
||
# Deserialize the RawValue to get the actual content | ||
actual_payload = raw_payload | ||
if raw_payload and hasattr(raw_payload, 'payload'): | ||
try: | ||
from temporalio.converter import default_converter | ||
# Use Temporal's converter to deserialize the payload | ||
actual_payload = default_converter.from_payloads([raw_payload.payload])[0] | ||
except Exception as e: | ||
self._logger.error(f"Failed to deserialize signal payload: {e}") | ||
# Fallback: try to extract JSON data directly | ||
try: | ||
import json | ||
if hasattr(raw_payload.payload, 'data'): | ||
# Decode the raw bytes and parse as JSON | ||
json_str = raw_payload.payload.data.decode('utf-8') | ||
actual_payload = json.loads(json_str) | ||
else: | ||
actual_payload = str(raw_payload) | ||
except Exception as e2: | ||
self._logger.error(f"Fallback deserialization also failed: {e2}") | ||
actual_payload = raw_payload | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined variable payload
breaks the build
payload
was removed in favour of actual_payload
, but it is still referenced later when constructing Signal
, causing F821
(payload
undefined).
- sig_obj = Signal(
- name=name,
- payload=payload,
+ sig_obj = Signal(
+ name=name,
+ payload=actual_payload,
Apply the change above (and any similar occurrences) to restore type-checking & runtime correctness.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/mcp_agent/executor/workflow.py around lines 374 to 397, the variable
`payload` is referenced after being removed and replaced by `actual_payload`,
causing an undefined variable error. Replace all occurrences of `payload` with
`actual_payload` in this section and any similar places to ensure the code uses
the correct variable and passes type-checking and runs correctly.
1ad47f7
to
43323a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (7)
examples/usecases/reliable_conversation/test_basic.py (2)
31-35
:__init__
adds no value – drop it.
TestConversationWorkflowTemplate.__init__
only delegates tosuper().__init__
without adding behaviour or documentation, so the override is redundant and slightly increases the public-method count flagged by pylint (R0903).- def __init__(self, app): - super().__init__(app)
108-113
: Shadowing the template class may confuse IDEs & static tools.
TestConversationWorkflow = app.workflow(TestConversationWorkflowTemplate)
overwrites the original template symbol with the registered class.
Consider using a distinct name (e.g.RegisteredConversationWorkflow
) to keep both objects accessible and avoid accidental misuse in future edits/tests.tests/executor/test_multi_signal.py (2)
125-132
: Avoid explicit comparison toTrue
.Ruff E712 flags
assert condition_func() == True
; direct truthiness reads better.- assert condition_func() == True + assert condition_func()
174-176
: Collapse nestedwith
statements.Ruff SIM117 suggests merging the two context managers for readability:
- with patch("temporalio.workflow.wait_condition") as mock_wait_condition, \ - patch("temporalio.workflow.info") as mock_info: + with patch("temporalio.workflow.wait_condition") as mock_wait_condition, \ + patch("temporalio.workflow.info") as mock_info: ...(only one backslash needed).
Not critical, purely stylistic.src/mcp_agent/executor/workflow_signal.py (3)
244-256
: Useget_running_loop()
to future-proof console I/O.
asyncio.get_event_loop()
is deprecated in 3.11+ outside of legacy contexts. Preferasyncio.get_running_loop()
:- loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop()This keeps the console handler working when the default event-loop policy changes.
380-387
: Timeout handling could lose granularity.
asyncio.wait(..., timeout=timeout_seconds)
returns(done, pending)
without raising. Iftimeout_seconds
expires but some task completes after that instant,done
may still be empty and we raiseTimeoutError
even though a signal arrived milliseconds later.A safer pattern is
asyncio.wait_for(asyncio.wait(..., return_when=FIRST_COMPLETED), timeout_seconds)
which guarantees an exception only when the overall timer elapses with no completion.Not blocking, but worth considering if sub-millisecond correctness matters.
409-425
: Potential race: clean-up after cancellation.
task.cancel()
is issued but not awaited; in rare cases_pending_signals
might be cleaned before the task finishes, leaving it to set its result on an already-deletedPendingSignal
.- for task in waiter_tasks: - if not task.done(): - task.cancel() + for task in waiter_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*waiter_tasks, return_exceptions=True)Ensures tasks are fully settled before
_pending_signals
is mutated.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (9)
examples/usecases/reliable_conversation/test_basic.py
(2 hunks)src/mcp_agent/executor/executor.py
(1 hunks)src/mcp_agent/executor/temporal/workflow_signal.py
(3 hunks)src/mcp_agent/executor/workflow.py
(1 hunks)src/mcp_agent/executor/workflow_signal.py
(3 hunks)src/mcp_agent/logging/transport.py
(1 hunks)tests/executor/temporal/test_signal_handler.py
(1 hunks)tests/executor/test_multi_signal.py
(1 hunks)tests/test_app.py
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- tests/executor/temporal/test_signal_handler.py
- src/mcp_agent/logging/transport.py
- src/mcp_agent/executor/workflow.py
- src/mcp_agent/executor/executor.py
- src/mcp_agent/executor/temporal/workflow_signal.py
- tests/test_app.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/mcp_agent/executor/workflow_signal.py (2)
src/mcp_agent/executor/temporal/workflow_signal.py (2)
wait_for_any_signal
(181-241)value
(53-72)src/mcp_agent/executor/executor.py (1)
wait_for_any_signal
(215-234)
🪛 Pylint (3.3.7)
src/mcp_agent/executor/workflow_signal.py
[refactor] 349-349: Too many local variables (18/15)
(R0914)
examples/usecases/reliable_conversation/test_basic.py
[refactor] 28-28: Too few public methods (1/2)
(R0903)
🪛 Ruff (0.11.9)
tests/executor/test_multi_signal.py
5-5: unittest.mock.AsyncMock
imported but unused
Remove unused import: unittest.mock.AsyncMock
(F401)
131-131: Avoid equality comparisons to True
; use if condition_func():
for truth checks
Replace with condition_func()
(E712)
174-175: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
186-187: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
🪛 GitHub Actions: Pull Request Checks
tests/executor/test_multi_signal.py
[error] 5-5: Unused import AsyncMock
from unittest.mock
(F401). Remove unused import.
[error] 131-131: Avoid equality comparisons to True
; use condition_func():
for truth checks (E712). Replace assert condition_func() == True
with assert condition_func()
.
🔇 Additional comments (1)
src/mcp_agent/executor/workflow_signal.py (1)
77-103
: Nice addition – interface documents multi-signal semantics clearly.The new abstract method is well-documented and typed, giving downstream handlers a consistent contract.
👍
|
||
import asyncio | ||
import pytest | ||
from unittest.mock import MagicMock, AsyncMock, patch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused AsyncMock
import to unblock CI.
AsyncMock
isn’t referenced anywhere in the file and causes the Ruff F401 error.
-from unittest.mock import MagicMock, AsyncMock, patch
+from unittest.mock import MagicMock, patch
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from unittest.mock import MagicMock, AsyncMock, patch | |
-from unittest.mock import MagicMock, AsyncMock, patch | |
+from unittest.mock import MagicMock, patch |
🧰 Tools
🪛 Ruff (0.11.9)
5-5: unittest.mock.AsyncMock
imported but unused
Remove unused import: unittest.mock.AsyncMock
(F401)
🪛 GitHub Actions: Pull Request Checks
[error] 5-5: Unused import AsyncMock
from unittest.mock
(F401). Remove unused import.
🤖 Prompt for AI Agents
In tests/executor/test_multi_signal.py at line 5, remove the unused import
AsyncMock from the import statement to fix the Ruff F401 error and unblock CI.
Change the line to only import MagicMock and patch since AsyncMock is not used
anywhere in the file.
async def wait_for_any_signal( | ||
self, | ||
signal_names: List[str], | ||
workflow_id: str, | ||
run_id: str | None = None, | ||
timeout_seconds: int | None = None | ||
) -> Signal[SignalValueT]: | ||
""" | ||
Waits for any of a list of signals using asyncio primitives. | ||
""" | ||
# Create an event and a registration for each signal | ||
pending_signals: List[PendingSignal] = [] | ||
waiter_tasks: List[asyncio.Task] = [] | ||
|
||
async with self._lock: | ||
for name in signal_names: | ||
event = asyncio.Event() | ||
unique_name = f"{name}_{uuid.uuid4()}" | ||
registration = SignalRegistration( | ||
signal_name=name, | ||
unique_name=unique_name, | ||
workflow_id=workflow_id, | ||
run_id=run_id, | ||
) | ||
pending = PendingSignal(registration=registration, event=event) | ||
pending_signals.append(pending) | ||
self._pending_signals.setdefault(name, []).append(pending) | ||
waiter_tasks.append(asyncio.create_task(event.wait())) | ||
|
||
try: | ||
# Wait for any of the events to be set | ||
done, pending = await asyncio.wait( | ||
waiter_tasks, | ||
return_when=asyncio.FIRST_COMPLETED, | ||
timeout=timeout_seconds, | ||
) | ||
|
||
if not done: | ||
raise asyncio.TimeoutError(f"Timeout waiting for signals: {signal_names}") | ||
|
||
# Find which pending signal corresponds to the completed task | ||
completed_task = done.pop() | ||
triggered_pending_signal = None | ||
for i, task in enumerate(waiter_tasks): | ||
if task is completed_task: | ||
triggered_pending_signal = pending_signals[i] | ||
break | ||
|
||
if not triggered_pending_signal: | ||
# Should not happen | ||
raise RuntimeError("Could not identify which signal was triggered.") | ||
|
||
return Signal( | ||
name=triggered_pending_signal.registration.signal_name, | ||
payload=triggered_pending_signal.value, | ||
workflow_id=workflow_id, | ||
run_id=run_id | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Edge-case: empty signal_names
yields confusing error.
If signal_names
is empty we fall through to asyncio.wait([], …)
→ ValueError("Set of coroutines/Futures is empty")
.
Consider raising a domain-specific ValueError
earlier:
+ if not signal_names:
+ raise ValueError("signal_names must contain at least one signal to wait for")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def wait_for_any_signal( | |
self, | |
signal_names: List[str], | |
workflow_id: str, | |
run_id: str | None = None, | |
timeout_seconds: int | None = None | |
) -> Signal[SignalValueT]: | |
""" | |
Waits for any of a list of signals using asyncio primitives. | |
""" | |
# Create an event and a registration for each signal | |
pending_signals: List[PendingSignal] = [] | |
waiter_tasks: List[asyncio.Task] = [] | |
async with self._lock: | |
for name in signal_names: | |
event = asyncio.Event() | |
unique_name = f"{name}_{uuid.uuid4()}" | |
registration = SignalRegistration( | |
signal_name=name, | |
unique_name=unique_name, | |
workflow_id=workflow_id, | |
run_id=run_id, | |
) | |
pending = PendingSignal(registration=registration, event=event) | |
pending_signals.append(pending) | |
self._pending_signals.setdefault(name, []).append(pending) | |
waiter_tasks.append(asyncio.create_task(event.wait())) | |
try: | |
# Wait for any of the events to be set | |
done, pending = await asyncio.wait( | |
waiter_tasks, | |
return_when=asyncio.FIRST_COMPLETED, | |
timeout=timeout_seconds, | |
) | |
if not done: | |
raise asyncio.TimeoutError(f"Timeout waiting for signals: {signal_names}") | |
# Find which pending signal corresponds to the completed task | |
completed_task = done.pop() | |
triggered_pending_signal = None | |
for i, task in enumerate(waiter_tasks): | |
if task is completed_task: | |
triggered_pending_signal = pending_signals[i] | |
break | |
if not triggered_pending_signal: | |
# Should not happen | |
raise RuntimeError("Could not identify which signal was triggered.") | |
return Signal( | |
name=triggered_pending_signal.registration.signal_name, | |
payload=triggered_pending_signal.value, | |
workflow_id=workflow_id, | |
run_id=run_id | |
) | |
async def wait_for_any_signal( | |
self, | |
signal_names: List[str], | |
workflow_id: str, | |
run_id: str | None = None, | |
timeout_seconds: int | None = None | |
) -> Signal[SignalValueT]: | |
""" | |
Waits for any of a list of signals using asyncio primitives. | |
""" | |
if not signal_names: | |
raise ValueError("signal_names must contain at least one signal to wait for") | |
# Create an event and a registration for each signal | |
pending_signals: List[PendingSignal] = [] | |
waiter_tasks: List[asyncio.Task] = [] | |
async with self._lock: | |
for name in signal_names: | |
event = asyncio.Event() | |
unique_name = f"{name}_{uuid.uuid4()}" | |
registration = SignalRegistration( | |
signal_name=name, | |
unique_name=unique_name, | |
workflow_id=workflow_id, | |
run_id=run_id, | |
) | |
pending = PendingSignal(registration=registration, event=event) | |
pending_signals.append(pending) | |
self._pending_signals.setdefault(name, []).append(pending) | |
waiter_tasks.append(asyncio.create_task(event.wait())) | |
try: | |
# Wait for any of the events to be set | |
done, pending = await asyncio.wait( | |
waiter_tasks, | |
return_when=asyncio.FIRST_COMPLETED, | |
timeout=timeout_seconds, | |
) | |
if not done: | |
raise asyncio.TimeoutError(f"Timeout waiting for signals: {signal_names}") | |
# Find which pending signal corresponds to the completed task | |
completed_task = done.pop() | |
triggered_pending_signal = None | |
for i, task in enumerate(waiter_tasks): | |
if task is completed_task: | |
triggered_pending_signal = pending_signals[i] | |
break | |
if not triggered_pending_signal: | |
# Should not happen | |
raise RuntimeError("Could not identify which signal was triggered.") | |
return Signal( | |
name=triggered_pending_signal.registration.signal_name, | |
payload=triggered_pending_signal.value, | |
workflow_id=workflow_id, | |
run_id=run_id | |
) |
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 349-349: Too many local variables (18/15)
(R0914)
🤖 Prompt for AI Agents
In src/mcp_agent/executor/workflow_signal.py around lines 349 to 407, the method
wait_for_any_signal does not handle the case when the input list signal_names is
empty, causing asyncio.wait to raise a ValueError with a confusing message. To
fix this, add an explicit check at the start of the method to raise a ValueError
with a clear, domain-specific message if signal_names is empty, preventing the
confusing error from asyncio.wait.
Closing this PR, will take pieces of this once we get human-in-the-loop flows hooked up through elicitation, and other new ways after @roman-van-der-krogt's changes for server -> client communication as part of #386 |
Largely VIBE'd so WIP for now.
Add wait_for_any_signal method to SignalHandler protocol
Summary
This PR implements a new wait_for_any_signal method across all signal handlers, enabling workflows to wait for any of multiple
signals concurrently. This provides a clean, engine-agnostic abstraction for building reactive workflows that need to handle multiple
types of events.
What's Changed
Replaces complex manual signal coordination with simple, declarative API:
Before: Complex manual signal handling
After: Clean, readable multi-signal waiting
triggered_signal = await self.context.executor.wait_for_any_signal(
signal_names=["new_message", "end_session"],
workflow_id=str(self.session_id)
)
if triggered_signal.name == "new_message":
await self._process_conversation_turn(triggered_signal.payload)
elif triggered_signal.name == "end_session":
self.is_active = False
Engine Independence: Same workflow code works with both Temporal and AsyncIO executors
Error Handling: Consistent timeout and exception handling across all implementations
Testing
All tests pass including a rather sophisticated demo I'm using this for (code soon!)
Summary by CodeRabbit