Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cadence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub
from cadence.api.v1.service_workflow_pb2 import (
SignalWorkflowExecutionRequest,
StartWorkflowExecutionRequest,
StartWorkflowExecutionResponse,
)
Expand Down Expand Up @@ -229,6 +230,51 @@ async def start_workflow(
except Exception:
raise

async def signal_workflow(
self,
workflow_id: str,
run_id: str,
signal_name: str,
*signal_args: Any,
) -> None:
"""
Send a signal to a running workflow execution.

Args:
workflow_id: The workflow ID
run_id: The run ID (can be empty string to signal current run)
signal_name: Name of the signal
*signal_args: Arguments to pass to the signal handler

Raises:
ValueError: If signal encoding fails
Exception: If the gRPC call fails
"""
signal_payload = None
if signal_args:
try:
signal_payload = self.data_converter.to_data(list[Any](signal_args))
except Exception as e:
raise ValueError(f"Failed to encode signal input: {e}")

workflow_execution = WorkflowExecution()
workflow_execution.workflow_id = workflow_id
if run_id:
workflow_execution.run_id = run_id

signal_request = SignalWorkflowExecutionRequest(
domain=self.domain,
workflow_execution=workflow_execution,
identity=self.identity,
request_id=str(uuid.uuid4()),
signal_name=signal_name,
)

if signal_payload:
signal_request.signal_input.CopyFrom(signal_payload)

await self.workflow_stub.SignalWorkflowExecution(signal_request)


def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
if "target" not in options:
Expand Down
60 changes: 59 additions & 1 deletion tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
)
from cadence.error import EntityNotExistsError
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME
from cadence.api.v1.service_workflow_pb2 import DescribeWorkflowExecutionRequest
from cadence.api.v1.service_workflow_pb2 import (
DescribeWorkflowExecutionRequest,
GetWorkflowExecutionHistoryRequest,
)
from cadence.api.v1.common_pb2 import WorkflowExecution


Expand Down Expand Up @@ -135,3 +138,58 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper):
assert task_timeout_seconds == task_timeout.total_seconds(), (
f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s"
)


@pytest.mark.usefixtures("helper")
async def test_signal_workflow(helper: CadenceHelper):
"""Test signal_workflow method.

This integration test verifies:
1. Starting a workflow execution
2. Sending a signal to the running workflow
3. Signal appears in the workflow's history
"""
async with helper.client() as client:
workflow_type = "test-workflow-signal"
task_list_name = "test-task-list-signal"
workflow_id = "test-workflow-signal-789"
execution_timeout = timedelta(minutes=5)
signal_name = "test-signal"
signal_arg = {"action": "update", "value": 42}

execution = await client.start_workflow(
workflow_type,
task_list=task_list_name,
execution_start_to_close_timeout=execution_timeout,
workflow_id=workflow_id,
)

await client.signal_workflow(
execution.workflow_id,
execution.run_id,
signal_name,
signal_arg,
)

# Fetch workflow history to verify signal was recorded
history_response = await client.workflow_stub.GetWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest(
domain=DOMAIN_NAME,
workflow_execution=execution,
skip_archival=True,
)
)

# Verify signal event appears in history
signal_events = [
event
for event in history_response.history.events
if event.HasField("workflow_execution_signaled_event_attributes")
]

assert len(signal_events) == 1, "Expected exactly one signal event in history"
signal_event = signal_events[0]
assert (
signal_event.workflow_execution_signaled_event_attributes.signal_name
== signal_name
), f"Expected signal name '{signal_name}'"