Skip to content

Commit 21be231

Browse files
committed
Merge branch 'main' into signal-workflow
Resolved conflicts by keeping both signal_workflow and signal_with_start_workflow methods: - signal_workflow: Sends signal to existing workflow execution - signal_with_start_workflow: Signals workflow, starting it if not running Both test cases are preserved to validate each method independently.
2 parents 1efc927 + d4146cb commit 21be231

File tree

4 files changed

+152
-16
lines changed

4 files changed

+152
-16
lines changed

cadence/client.py

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import socket
33
import uuid
44
from datetime import timedelta
5-
from typing import TypedDict, Unpack, Any, cast, Union, Callable
5+
from typing import TypedDict, Unpack, Any, cast, Union
66

77
from grpc import ChannelCredentials, Compression
88
from google.protobuf.duration_pb2 import Duration
@@ -18,11 +18,14 @@
1818
SignalWorkflowExecutionRequest,
1919
StartWorkflowExecutionRequest,
2020
StartWorkflowExecutionResponse,
21+
SignalWithStartWorkflowExecutionRequest,
22+
SignalWithStartWorkflowExecutionResponse,
2123
)
2224
from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution
2325
from cadence.api.v1.tasklist_pb2 import TaskList
2426
from cadence.data_converter import DataConverter, DefaultDataConverter
2527
from cadence.metrics import MetricsEmitter, NoOpMetricsEmitter
28+
from cadence.workflow import WorkflowDefinition
2629

2730

2831
class StartWorkflowOptions(TypedDict, total=False):
@@ -133,7 +136,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
133136

134137
def _build_start_workflow_request(
135138
self,
136-
workflow: Union[str, Callable],
139+
workflow: Union[str, WorkflowDefinition],
137140
args: tuple[Any, ...],
138141
options: StartWorkflowOptions,
139142
) -> StartWorkflowExecutionRequest:
@@ -145,8 +148,8 @@ def _build_start_workflow_request(
145148
if isinstance(workflow, str):
146149
workflow_type_name = workflow
147150
else:
148-
# For callable, use function name or __name__ attribute
149-
workflow_type_name = getattr(workflow, "__name__", str(workflow))
151+
# For WorkflowDefinition, use the name property
152+
workflow_type_name = workflow.name
150153

151154
# Encode input arguments
152155
input_payload = None
@@ -187,15 +190,15 @@ def _build_start_workflow_request(
187190

188191
async def start_workflow(
189192
self,
190-
workflow: Union[str, Callable],
193+
workflow: Union[str, WorkflowDefinition],
191194
*args,
192195
**options_kwargs: Unpack[StartWorkflowOptions],
193196
) -> WorkflowExecution:
194197
"""
195198
Start a workflow execution asynchronously.
196199
197200
Args:
198-
workflow: Workflow function or workflow type name string
201+
workflow: WorkflowDefinition or workflow type name string
199202
*args: Arguments to pass to the workflow
200203
**options_kwargs: StartWorkflowOptions as keyword arguments
201204
@@ -275,6 +278,69 @@ async def signal_workflow(
275278

276279
await self.workflow_stub.SignalWorkflowExecution(signal_request)
277280

281+
async def signal_with_start_workflow(
282+
self,
283+
workflow: Union[str, WorkflowDefinition],
284+
signal_name: str,
285+
signal_args: list[Any],
286+
*workflow_args: Any,
287+
**options_kwargs: Unpack[StartWorkflowOptions],
288+
) -> WorkflowExecution:
289+
"""
290+
Signal a workflow execution, starting it if it is not already running.
291+
292+
Args:
293+
workflow: WorkflowDefinition or workflow type name string
294+
signal_name: Name of the signal
295+
signal_args: List of arguments to pass to the signal handler
296+
*workflow_args: Arguments to pass to the workflow if it needs to be started
297+
**options_kwargs: StartWorkflowOptions as keyword arguments
298+
299+
Returns:
300+
WorkflowExecution with workflow_id and run_id
301+
302+
Raises:
303+
ValueError: If required parameters are missing or invalid
304+
Exception: If the gRPC call fails
305+
"""
306+
# Convert kwargs to StartWorkflowOptions and validate
307+
options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs))
308+
309+
# Build the start workflow request
310+
start_request = self._build_start_workflow_request(
311+
workflow, workflow_args, options
312+
)
313+
314+
# Encode signal input
315+
signal_payload = None
316+
if signal_args:
317+
try:
318+
signal_payload = self.data_converter.to_data(signal_args)
319+
except Exception as e:
320+
raise ValueError(f"Failed to encode signal input: {e}")
321+
322+
# Build the SignalWithStartWorkflowExecution request
323+
request = SignalWithStartWorkflowExecutionRequest(
324+
start_request=start_request,
325+
signal_name=signal_name,
326+
)
327+
328+
if signal_payload:
329+
request.signal_input.CopyFrom(signal_payload)
330+
331+
# Execute the gRPC call
332+
try:
333+
response: SignalWithStartWorkflowExecutionResponse = (
334+
await self.workflow_stub.SignalWithStartWorkflowExecution(request)
335+
)
336+
337+
execution = WorkflowExecution()
338+
execution.workflow_id = start_request.workflow_id
339+
execution.run_id = response.run_id
340+
return execution
341+
except Exception:
342+
raise
343+
278344

279345
def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
280346
if "target" not in options:

cadence/workflow.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
from dataclasses import dataclass
55
from datetime import timedelta
66
from typing import (
7+
Iterator,
78
Callable,
9+
TypeVar,
10+
TypedDict,
11+
Type,
812
cast,
13+
Any,
914
Optional,
1015
Union,
11-
Iterator,
12-
TypedDict,
13-
TypeVar,
14-
Type,
1516
Unpack,
16-
Any,
1717
Generic,
1818
)
1919
import inspect

tests/cadence/test_client_workflow.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
from cadence.client import Client, StartWorkflowOptions, _validate_and_apply_defaults
1212
from cadence.data_converter import DefaultDataConverter
13+
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions
1314

1415

1516
@pytest.fixture
@@ -96,11 +97,17 @@ async def test_build_request_with_string_workflow(self, mock_client):
9697
uuid.UUID(request.request_id) # This will raise if not valid UUID
9798

9899
@pytest.mark.asyncio
99-
async def test_build_request_with_callable_workflow(self, mock_client):
100-
"""Test building request with callable workflow."""
100+
async def test_build_request_with_workflow_definition(self, mock_client):
101+
"""Test building request with WorkflowDefinition."""
102+
from cadence import workflow
101103

102-
def test_workflow():
103-
pass
104+
class TestWorkflow:
105+
@workflow.run
106+
async def run(self):
107+
pass
108+
109+
workflow_opts = WorkflowDefinitionOptions(name="test_workflow")
110+
workflow_definition = WorkflowDefinition.wrap(TestWorkflow, workflow_opts)
104111

105112
client = Client(domain="test-domain", target="localhost:7933")
106113

@@ -110,7 +117,7 @@ def test_workflow():
110117
task_start_to_close_timeout=timedelta(seconds=30),
111118
)
112119

113-
request = client._build_start_workflow_request(test_workflow, (), options)
120+
request = client._build_start_workflow_request(workflow_definition, (), options)
114121

115122
assert request.workflow_type.name == "test_workflow"
116123

tests/integration_tests/test_client.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,66 @@ async def test_signal_workflow(helper: CadenceHelper):
193193
signal_event.workflow_execution_signaled_event_attributes.signal_name
194194
== signal_name
195195
), f"Expected signal name '{signal_name}'"
196+
197+
198+
@pytest.mark.usefixtures("helper")
199+
async def test_signal_with_start_workflow(helper: CadenceHelper):
200+
"""Test signal_with_start_workflow method.
201+
202+
This integration test verifies:
203+
1. Starting a workflow via signal_with_start_workflow
204+
2. Sending a signal to the workflow
205+
3. Signal appears in the workflow's history with correct name and payload
206+
"""
207+
async with helper.client() as client:
208+
workflow_type = "test-workflow-signal-with-start"
209+
task_list_name = "test-task-list-signal-with-start"
210+
workflow_id = "test-workflow-signal-with-start-123"
211+
execution_timeout = timedelta(minutes=5)
212+
signal_name = "test-signal"
213+
signal_arg = {"data": "test-signal-data"}
214+
215+
execution = await client.signal_with_start_workflow(
216+
workflow_type,
217+
signal_name,
218+
[signal_arg],
219+
"arg1",
220+
"arg2",
221+
task_list=task_list_name,
222+
execution_start_to_close_timeout=execution_timeout,
223+
workflow_id=workflow_id,
224+
)
225+
226+
assert execution is not None
227+
assert execution.workflow_id == workflow_id
228+
assert execution.run_id is not None
229+
assert execution.run_id != ""
230+
231+
# Fetch workflow history to verify signal was recorded
232+
history_response = await client.workflow_stub.GetWorkflowExecutionHistory(
233+
GetWorkflowExecutionHistoryRequest(
234+
domain=DOMAIN_NAME,
235+
workflow_execution=execution,
236+
skip_archival=True,
237+
)
238+
)
239+
240+
# Verify signal event appears in history with correct name and payload
241+
signal_events = [
242+
event
243+
for event in history_response.history.events
244+
if event.HasField("workflow_execution_signaled_event_attributes")
245+
]
246+
247+
assert len(signal_events) == 1, "Expected exactly one signal event in history"
248+
signal_event = signal_events[0]
249+
assert (
250+
signal_event.workflow_execution_signaled_event_attributes.signal_name
251+
== signal_name
252+
), f"Expected signal name '{signal_name}'"
253+
254+
# Verify signal payload matches what we sent
255+
signal_payload_data = signal_event.workflow_execution_signaled_event_attributes.input.data.decode()
256+
assert signal_arg["data"] in signal_payload_data, (
257+
f"Expected signal payload to contain '{signal_arg['data']}'"
258+
)

0 commit comments

Comments
 (0)