Skip to content

Commit d4146cb

Browse files
authored
feat: Implement signal_with_start_workflow client method (#44)
<!-- Describe what has changed in this PR --> **What changed?** Implement signal_with_start_workflow client method <!-- Tell your future self why have you made these changes --> **Why?** It's a necessary feature for signaling in python client <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Add an integration test to test signal_with_start. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Tim Li <[email protected]>
1 parent 890fe83 commit d4146cb

File tree

4 files changed

+156
-17
lines changed

4 files changed

+156
-17
lines changed

cadence/client.py

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import socket
33
import uuid
44
from datetime import timedelta
5-
from typing import TypedDict, Unpack, Any, cast, Union, Callable
5+
from typing import TypedDict, Unpack, Any, cast, Union
66

77
from grpc import ChannelCredentials, Compression
88
from google.protobuf.duration_pb2 import Duration
@@ -17,11 +17,14 @@
1717
from cadence.api.v1.service_workflow_pb2 import (
1818
StartWorkflowExecutionRequest,
1919
StartWorkflowExecutionResponse,
20+
SignalWithStartWorkflowExecutionRequest,
21+
SignalWithStartWorkflowExecutionResponse,
2022
)
2123
from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution
2224
from cadence.api.v1.tasklist_pb2 import TaskList
2325
from cadence.data_converter import DataConverter, DefaultDataConverter
2426
from cadence.metrics import MetricsEmitter, NoOpMetricsEmitter
27+
from cadence.workflow import WorkflowDefinition
2528

2629

2730
class StartWorkflowOptions(TypedDict, total=False):
@@ -132,7 +135,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
132135

133136
def _build_start_workflow_request(
134137
self,
135-
workflow: Union[str, Callable],
138+
workflow: Union[str, WorkflowDefinition],
136139
args: tuple[Any, ...],
137140
options: StartWorkflowOptions,
138141
) -> StartWorkflowExecutionRequest:
@@ -144,8 +147,8 @@ def _build_start_workflow_request(
144147
if isinstance(workflow, str):
145148
workflow_type_name = workflow
146149
else:
147-
# For callable, use function name or __name__ attribute
148-
workflow_type_name = getattr(workflow, "__name__", str(workflow))
150+
# For WorkflowDefinition, use the name property
151+
workflow_type_name = workflow.name
149152

150153
# Encode input arguments
151154
input_payload = None
@@ -186,15 +189,15 @@ def _build_start_workflow_request(
186189

187190
async def start_workflow(
188191
self,
189-
workflow: Union[str, Callable],
192+
workflow: Union[str, WorkflowDefinition],
190193
*args,
191194
**options_kwargs: Unpack[StartWorkflowOptions],
192195
) -> WorkflowExecution:
193196
"""
194197
Start a workflow execution asynchronously.
195198
196199
Args:
197-
workflow: Workflow function or workflow type name string
200+
workflow: WorkflowDefinition or workflow type name string
198201
*args: Arguments to pass to the workflow
199202
**options_kwargs: StartWorkflowOptions as keyword arguments
200203
@@ -229,6 +232,69 @@ async def start_workflow(
229232
except Exception:
230233
raise
231234

235+
async def signal_with_start_workflow(
236+
self,
237+
workflow: Union[str, WorkflowDefinition],
238+
signal_name: str,
239+
signal_args: list[Any],
240+
*workflow_args: Any,
241+
**options_kwargs: Unpack[StartWorkflowOptions],
242+
) -> WorkflowExecution:
243+
"""
244+
Signal a workflow execution, starting it if it is not already running.
245+
246+
Args:
247+
workflow: WorkflowDefinition or workflow type name string
248+
signal_name: Name of the signal
249+
signal_args: List of arguments to pass to the signal handler
250+
*workflow_args: Arguments to pass to the workflow if it needs to be started
251+
**options_kwargs: StartWorkflowOptions as keyword arguments
252+
253+
Returns:
254+
WorkflowExecution with workflow_id and run_id
255+
256+
Raises:
257+
ValueError: If required parameters are missing or invalid
258+
Exception: If the gRPC call fails
259+
"""
260+
# Convert kwargs to StartWorkflowOptions and validate
261+
options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs))
262+
263+
# Build the start workflow request
264+
start_request = self._build_start_workflow_request(
265+
workflow, workflow_args, options
266+
)
267+
268+
# Encode signal input
269+
signal_payload = None
270+
if signal_args:
271+
try:
272+
signal_payload = self.data_converter.to_data(signal_args)
273+
except Exception as e:
274+
raise ValueError(f"Failed to encode signal input: {e}")
275+
276+
# Build the SignalWithStartWorkflowExecution request
277+
request = SignalWithStartWorkflowExecutionRequest(
278+
start_request=start_request,
279+
signal_name=signal_name,
280+
)
281+
282+
if signal_payload:
283+
request.signal_input.CopyFrom(signal_payload)
284+
285+
# Execute the gRPC call
286+
try:
287+
response: SignalWithStartWorkflowExecutionResponse = (
288+
await self.workflow_stub.SignalWithStartWorkflowExecution(request)
289+
)
290+
291+
execution = WorkflowExecution()
292+
execution.workflow_id = start_request.workflow_id
293+
execution.run_id = response.run_id
294+
return execution
295+
except Exception:
296+
raise
297+
232298

233299
def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
234300
if "target" not in options:

cadence/workflow.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
from dataclasses import dataclass
55
from datetime import timedelta
66
from typing import (
7+
Iterator,
78
Callable,
9+
TypeVar,
10+
TypedDict,
11+
Type,
812
cast,
13+
Any,
914
Optional,
1015
Union,
11-
Iterator,
12-
TypedDict,
13-
TypeVar,
14-
Type,
1516
Unpack,
16-
Any,
1717
Generic,
1818
)
1919
import inspect

tests/cadence/test_client_workflow.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from cadence.client import Client, StartWorkflowOptions, _validate_and_apply_defaults
1212
from cadence.data_converter import DefaultDataConverter
13+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1314

1415

1516
@pytest.fixture
@@ -96,11 +97,17 @@ async def test_build_request_with_string_workflow(self, mock_client):
9697
uuid.UUID(request.request_id) # This will raise if not valid UUID
9798

9899
@pytest.mark.asyncio
99-
async def test_build_request_with_callable_workflow(self, mock_client):
100-
"""Test building request with callable workflow."""
100+
async def test_build_request_with_workflow_definition(self, mock_client):
101+
"""Test building request with WorkflowDefinition."""
102+
from cadence import workflow
101103

102-
def test_workflow():
103-
pass
104+
class TestWorkflow:
105+
@workflow.run
106+
async def run(self):
107+
pass
108+
109+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
110+
workflow_definition = WorkflowDefinition.wrap(TestWorkflow, workflow_opts)
104111

105112
client = Client(domain="test-domain", target="localhost:7933")
106113

@@ -110,7 +117,7 @@ def test_workflow():
110117
task_start_to_close_timeout=timedelta(seconds=30),
111118
)
112119

113-
request = client._build_start_workflow_request(test_workflow, (), options)
120+
request = client._build_start_workflow_request(workflow_definition, (), options)
114121

115122
assert request.workflow_type.name == "test_workflow"
116123

tests/integration_tests/test_client.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
)
88
from cadence.error import EntityNotExistsError
99
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME
10-
from cadence.api.v1.service_workflow_pb2 import DescribeWorkflowExecutionRequest
10+
from cadence.api.v1.service_workflow_pb2 import (
11+
DescribeWorkflowExecutionRequest,
12+
GetWorkflowExecutionHistoryRequest,
13+
)
1114
from cadence.api.v1.common_pb2 import WorkflowExecution
1215

1316

@@ -135,3 +138,66 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper):
135138
assert task_timeout_seconds == task_timeout.total_seconds(), (
136139
f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s"
137140
)
141+
142+
143+
@pytest.mark.usefixtures("helper")
144+
async def test_signal_with_start_workflow(helper: CadenceHelper):
145+
"""Test signal_with_start_workflow method.
146+
147+
This integration test verifies:
148+
1. Starting a workflow via signal_with_start_workflow
149+
2. Sending a signal to the workflow
150+
3. Signal appears in the workflow's history with correct name and payload
151+
"""
152+
async with helper.client() as client:
153+
workflow_type = "test-workflow-signal-with-start"
154+
task_list_name = "test-task-list-signal-with-start"
155+
workflow_id = "test-workflow-signal-with-start-123"
156+
execution_timeout = timedelta(minutes=5)
157+
signal_name = "test-signal"
158+
signal_arg = {"data": "test-signal-data"}
159+
160+
execution = await client.signal_with_start_workflow(
161+
workflow_type,
162+
signal_name,
163+
[signal_arg],
164+
"arg1",
165+
"arg2",
166+
task_list=task_list_name,
167+
execution_start_to_close_timeout=execution_timeout,
168+
workflow_id=workflow_id,
169+
)
170+
171+
assert execution is not None
172+
assert execution.workflow_id == workflow_id
173+
assert execution.run_id is not None
174+
assert execution.run_id != ""
175+
176+
# Fetch workflow history to verify signal was recorded
177+
history_response = await client.workflow_stub.GetWorkflowExecutionHistory(
178+
GetWorkflowExecutionHistoryRequest(
179+
domain=DOMAIN_NAME,
180+
workflow_execution=execution,
181+
skip_archival=True,
182+
)
183+
)
184+
185+
# Verify signal event appears in history with correct name and payload
186+
signal_events = [
187+
event
188+
for event in history_response.history.events
189+
if event.HasField("workflow_execution_signaled_event_attributes")
190+
]
191+
192+
assert len(signal_events) == 1, "Expected exactly one signal event in history"
193+
signal_event = signal_events[0]
194+
assert (
195+
signal_event.workflow_execution_signaled_event_attributes.signal_name
196+
== signal_name
197+
), f"Expected signal name '{signal_name}'"
198+
199+
# Verify signal payload matches what we sent
200+
signal_payload_data = signal_event.workflow_execution_signaled_event_attributes.input.data.decode()
201+
assert signal_arg["data"] in signal_payload_data, (
202+
f"Expected signal payload to contain '{signal_arg['data']}'"
203+
)

0 commit comments

Comments
 (0)