diff --git a/cadence/client.py b/cadence/client.py index a75d7b5..536317a 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -15,6 +15,7 @@ from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.api.v1.service_workflow_pb2 import ( + SignalWorkflowExecutionRequest, StartWorkflowExecutionRequest, StartWorkflowExecutionResponse, ) @@ -229,6 +230,51 @@ async def start_workflow( except Exception: raise + async def signal_workflow( + self, + workflow_id: str, + run_id: str, + signal_name: str, + *signal_args: Any, + ) -> None: + """ + Send a signal to a running workflow execution. + + Args: + workflow_id: The workflow ID + run_id: The run ID (can be empty string to signal current run) + signal_name: Name of the signal + *signal_args: Arguments to pass to the signal handler + + Raises: + ValueError: If signal encoding fails + Exception: If the gRPC call fails + """ + signal_payload = None + if signal_args: + try: + signal_payload = self.data_converter.to_data(list[Any](signal_args)) + except Exception as e: + raise ValueError(f"Failed to encode signal input: {e}") + + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + if run_id: + workflow_execution.run_id = run_id + + signal_request = SignalWorkflowExecutionRequest( + domain=self.domain, + workflow_execution=workflow_execution, + identity=self.identity, + request_id=str(uuid.uuid4()), + signal_name=signal_name, + ) + + if signal_payload: + signal_request.signal_input.CopyFrom(signal_payload) + + await self.workflow_stub.SignalWorkflowExecution(signal_request) + def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions: if "target" not in options: diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 5b4e785..191c2a6 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -7,7 +7,10 @@ ) from cadence.error import EntityNotExistsError from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME -from cadence.api.v1.service_workflow_pb2 import DescribeWorkflowExecutionRequest +from cadence.api.v1.service_workflow_pb2 import ( + DescribeWorkflowExecutionRequest, + GetWorkflowExecutionHistoryRequest, +) from cadence.api.v1.common_pb2 import WorkflowExecution @@ -135,3 +138,58 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper): assert task_timeout_seconds == task_timeout.total_seconds(), ( f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s" ) + + +@pytest.mark.usefixtures("helper") +async def test_signal_workflow(helper: CadenceHelper): + """Test signal_workflow method. + + This integration test verifies: + 1. Starting a workflow execution + 2. Sending a signal to the running workflow + 3. Signal appears in the workflow's history + """ + async with helper.client() as client: + workflow_type = "test-workflow-signal" + task_list_name = "test-task-list-signal" + workflow_id = "test-workflow-signal-789" + execution_timeout = timedelta(minutes=5) + signal_name = "test-signal" + signal_arg = {"action": "update", "value": 42} + + execution = await client.start_workflow( + workflow_type, + task_list=task_list_name, + execution_start_to_close_timeout=execution_timeout, + workflow_id=workflow_id, + ) + + await client.signal_workflow( + execution.workflow_id, + execution.run_id, + signal_name, + signal_arg, + ) + + # Fetch workflow history to verify signal was recorded + history_response = await client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + skip_archival=True, + ) + ) + + # Verify signal event appears in history + signal_events = [ + event + for event in history_response.history.events + if event.HasField("workflow_execution_signaled_event_attributes") + ] + + assert len(signal_events) == 1, "Expected exactly one signal event in history" + signal_event = signal_events[0] + assert ( + signal_event.workflow_execution_signaled_event_attributes.signal_name + == signal_name + ), f"Expected signal name '{signal_name}'"