From e492db258d8c80db413368f430ddf9f42f391461 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 30 Oct 2025 15:46:01 -0700 Subject: [PATCH 1/3] feat: Implement signal_workflow method Signed-off-by: Tim Li --- cadence/client.py | 46 ++++++++++++++++++++++++ tests/integration_tests/test_client.py | 49 ++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/cadence/client.py b/cadence/client.py index dcad7f3..323dcdb 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -15,6 +15,7 @@ from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.api.v1.service_workflow_pb2 import ( + SignalWorkflowExecutionRequest, StartWorkflowExecutionRequest, StartWorkflowExecutionResponse, ) @@ -229,6 +230,51 @@ async def start_workflow( except Exception: raise + async def signal_workflow( + self, + workflow_id: str, + run_id: str, + signal_name: str, + signal_input: Any = None, + ) -> None: + """ + Send a signal to a running workflow execution. + + Args: + workflow_id: The workflow ID + run_id: The run ID (can be empty string to signal current run) + signal_name: Name of the signal + signal_input: Input data for the signal + + Raises: + ValueError: If signal encoding fails + Exception: If the gRPC call fails + """ + signal_payload = None + if signal_input is not None: + try: + signal_payload = await self.data_converter.to_data(signal_input) + except Exception as e: + raise ValueError(f"Failed to encode signal input: {e}") + + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + if run_id: + workflow_execution.run_id = run_id + + signal_request = SignalWorkflowExecutionRequest( + domain=self.domain, + workflow_execution=workflow_execution, + identity=self.identity, + request_id=str(uuid.uuid4()), + signal_name=signal_name, + ) + + if signal_payload: + signal_request.signal_input.CopyFrom(signal_payload) + + await self.workflow_stub.SignalWorkflowExecution(signal_request) + def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions: if "target" not in options: diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 3f96b9b..8bb0e02 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -136,3 +136,52 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper): assert task_timeout_seconds == task_timeout.total_seconds(), ( f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s" ) + + +@pytest.mark.usefixtures("helper") +async def test_signal_workflow(helper: CadenceHelper): + """Test signal_workflow method. + + This integration test verifies: + 1. Starting a workflow execution + 2. Sending a signal to the running workflow + 3. Signal is accepted (no errors thrown) + """ + async with helper.client() as client: + workflow_type = "test-workflow-signal" + task_list_name = "test-task-list-signal" + workflow_id = "test-workflow-signal-789" + execution_timeout = timedelta(minutes=5) + signal_name = "test-signal" + signal_input = {"action": "update", "value": 42} + + execution = await client.start_workflow( + workflow_type, + task_list=task_list_name, + execution_start_to_close_timeout=execution_timeout, + workflow_id=workflow_id, + ) + + await client.signal_workflow( + workflow_id=execution.workflow_id, + run_id=execution.run_id, + signal_name=signal_name, + signal_input=signal_input, + ) + + describe_request = DescribeWorkflowExecutionRequest( + domain=DOMAIN_NAME, + workflow_execution=WorkflowExecution( + workflow_id=execution.workflow_id, + run_id=execution.run_id, + ), + ) + + response = await client.workflow_stub.DescribeWorkflowExecution( + describe_request + ) + + assert ( + response.workflow_execution_info.workflow_execution.workflow_id + == workflow_id + ) From 2eabd54fee0a39ce097fda603cf55af9cebfd502 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 25 Nov 2025 14:31:56 -0800 Subject: [PATCH 2/3] fix integration tests Signed-off-by: Tim Li --- cadence/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/client.py b/cadence/client.py index 475d7c6..e847fc8 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -253,7 +253,7 @@ async def signal_workflow( signal_payload = None if signal_input is not None: try: - signal_payload = await self.data_converter.to_data(signal_input) + signal_payload = self.data_converter.to_data([signal_input]) except Exception as e: raise ValueError(f"Failed to encode signal input: {e}") From 1efc927834fcb7c513592653d12d3e25cf39a07f Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 25 Nov 2025 15:01:04 -0800 Subject: [PATCH 3/3] address comments Signed-off-by: Tim Li --- cadence/client.py | 8 ++-- tests/integration_tests/test_client.py | 51 +++++++++++++++----------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/cadence/client.py b/cadence/client.py index e847fc8..536317a 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -235,7 +235,7 @@ async def signal_workflow( workflow_id: str, run_id: str, signal_name: str, - signal_input: Any = None, + *signal_args: Any, ) -> None: """ Send a signal to a running workflow execution. @@ -244,16 +244,16 @@ async def signal_workflow( workflow_id: The workflow ID run_id: The run ID (can be empty string to signal current run) signal_name: Name of the signal - signal_input: Input data for the signal + *signal_args: Arguments to pass to the signal handler Raises: ValueError: If signal encoding fails Exception: If the gRPC call fails """ signal_payload = None - if signal_input is not None: + if signal_args: try: - signal_payload = self.data_converter.to_data([signal_input]) + signal_payload = self.data_converter.to_data(list[Any](signal_args)) except Exception as e: raise ValueError(f"Failed to encode signal input: {e}") diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index fa7f4ea..191c2a6 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -7,7 +7,10 @@ ) from cadence.error import EntityNotExistsError from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME -from cadence.api.v1.service_workflow_pb2 import DescribeWorkflowExecutionRequest +from cadence.api.v1.service_workflow_pb2 import ( + DescribeWorkflowExecutionRequest, + GetWorkflowExecutionHistoryRequest, +) from cadence.api.v1.common_pb2 import WorkflowExecution @@ -144,7 +147,7 @@ async def test_signal_workflow(helper: CadenceHelper): This integration test verifies: 1. Starting a workflow execution 2. Sending a signal to the running workflow - 3. Signal is accepted (no errors thrown) + 3. Signal appears in the workflow's history """ async with helper.client() as client: workflow_type = "test-workflow-signal" @@ -152,7 +155,7 @@ async def test_signal_workflow(helper: CadenceHelper): workflow_id = "test-workflow-signal-789" execution_timeout = timedelta(minutes=5) signal_name = "test-signal" - signal_input = {"action": "update", "value": 42} + signal_arg = {"action": "update", "value": 42} execution = await client.start_workflow( workflow_type, @@ -162,25 +165,31 @@ async def test_signal_workflow(helper: CadenceHelper): ) await client.signal_workflow( - workflow_id=execution.workflow_id, - run_id=execution.run_id, - signal_name=signal_name, - signal_input=signal_input, - ) - - describe_request = DescribeWorkflowExecutionRequest( - domain=DOMAIN_NAME, - workflow_execution=WorkflowExecution( - workflow_id=execution.workflow_id, - run_id=execution.run_id, - ), + execution.workflow_id, + execution.run_id, + signal_name, + signal_arg, + ) + + # Fetch workflow history to verify signal was recorded + history_response = await client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + skip_archival=True, + ) ) - response = await client.workflow_stub.DescribeWorkflowExecution( - describe_request - ) + # Verify signal event appears in history + signal_events = [ + event + for event in history_response.history.events + if event.HasField("workflow_execution_signaled_event_attributes") + ] + assert len(signal_events) == 1, "Expected exactly one signal event in history" + signal_event = signal_events[0] assert ( - response.workflow_execution_info.workflow_execution.workflow_id - == workflow_id - ) + signal_event.workflow_execution_signaled_event_attributes.signal_name + == signal_name + ), f"Expected signal name '{signal_name}'"