Skip to content

Commit 48face5

Browse files
committed
New client interceptor data classes
1 parent 07562cb commit 48face5

File tree

1 file changed

+94
-41
lines changed

1 file changed

+94
-41
lines changed

temporalio/client.py

Lines changed: 94 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,8 @@ async def _start_update_with_start(
975975
else:
976976
update_name = str(update)
977977

978-
update_input = StartWorkflowUpdateInput(
978+
update_input = StartWorkflowUpdateWithStartUpdateWorkflowInput(
979979
id=start_workflow_operation._start_workflow_input.id,
980-
run_id=None,
981980
update_id=id,
982981
update=update_name,
983982
args=temporalio.common._arg_or_args(arg, args),
@@ -988,9 +987,12 @@ async def _start_update_with_start(
988987
wait_for_stage=wait_for_stage,
989988
)
990989

991-
update_handle = await self._impl.start_workflow_update_with_start(
992-
update_input, start_workflow_operation._start_workflow_input
990+
input = StartWorkflowUpdateWithStartInput(
991+
start_workflow_input=start_workflow_operation._start_workflow_input,
992+
update_workflow_input=update_input,
993993
)
994+
995+
update_handle = await self._impl.start_workflow_update_with_start(input)
994996
# TODO https://github.com/temporalio/sdk-python/issues/682
995997
assert update_handle.workflow_run_id, "In Client.start_workflow_update why don't we use the run ID from the update response?"
996998
start_workflow_operation._workflow_handle.set_result(
@@ -2339,11 +2341,8 @@ def __init__(
23392341
]
23402342
] = None,
23412343
start_delay: Optional[timedelta] = None,
2342-
start_signal: Optional[str] = None,
2343-
start_signal_args: Sequence[Any] = [],
23442344
rpc_metadata: Mapping[str, str] = {},
23452345
rpc_timeout: Optional[timedelta] = None,
2346-
request_eager_start: bool = False,
23472346
) -> None: ...
23482347

23492348
# Overload for single-param workflow, with_start
@@ -2370,11 +2369,8 @@ def __init__(
23702369
]
23712370
] = None,
23722371
start_delay: Optional[timedelta] = None,
2373-
start_signal: Optional[str] = None,
2374-
start_signal_args: Sequence[Any] = [],
23752372
rpc_metadata: Mapping[str, str] = {},
23762373
rpc_timeout: Optional[timedelta] = None,
2377-
request_eager_start: bool = False,
23782374
) -> None: ...
23792375

23802376
# Overload for multi-param workflow, with_start
@@ -2403,11 +2399,8 @@ def __init__(
24032399
]
24042400
] = None,
24052401
start_delay: Optional[timedelta] = None,
2406-
start_signal: Optional[str] = None,
2407-
start_signal_args: Sequence[Any] = [],
24082402
rpc_metadata: Mapping[str, str] = {},
24092403
rpc_timeout: Optional[timedelta] = None,
2410-
request_eager_start: bool = False,
24112404
) -> None: ...
24122405

24132406
# Overload for string-name workflow, with_start
@@ -2436,11 +2429,8 @@ def __init__(
24362429
]
24372430
] = None,
24382431
start_delay: Optional[timedelta] = None,
2439-
start_signal: Optional[str] = None,
2440-
start_signal_args: Sequence[Any] = [],
24412432
rpc_metadata: Mapping[str, str] = {},
24422433
rpc_timeout: Optional[timedelta] = None,
2443-
request_eager_start: bool = False,
24442434
) -> None: ...
24452435

24462436
def __init__(
@@ -2467,11 +2457,8 @@ def __init__(
24672457
]
24682458
] = None,
24692459
start_delay: Optional[timedelta] = None,
2470-
start_signal: Optional[str] = None,
2471-
start_signal_args: Sequence[Any] = [],
24722460
rpc_metadata: Mapping[str, str] = {},
24732461
rpc_timeout: Optional[timedelta] = None,
2474-
request_eager_start: bool = False,
24752462
stack_level: int = 2,
24762463
) -> None:
24772464
"""
@@ -2489,7 +2476,7 @@ def __init__(
24892476

24902477
self._id = id
24912478
self._workflow_handle: Future[WorkflowHandle[SelfType, ReturnType]] = Future()
2492-
self._start_workflow_input = StartWorkflowInput(
2479+
self._start_workflow_input = StartWorkflowUpdateWithStartStartWorkflowInput(
24932480
workflow=name,
24942481
args=temporalio.common._arg_or_args(arg, args),
24952482
id=id,
@@ -2505,12 +2492,9 @@ def __init__(
25052492
search_attributes=search_attributes,
25062493
start_delay=start_delay,
25072494
headers={},
2508-
start_signal=start_signal,
2509-
start_signal_args=start_signal_args,
25102495
ret_type=result_type or result_type_from_run_fn,
25112496
rpc_metadata=rpc_metadata,
25122497
rpc_timeout=rpc_timeout,
2513-
request_eager_start=request_eager_start,
25142498
)
25152499

25162500
# TODO: any of these needed here?
@@ -5088,6 +5072,62 @@ class StartWorkflowUpdateInput:
50885072
rpc_timeout: Optional[timedelta]
50895073

50905074

5075+
@dataclass
5076+
class StartWorkflowUpdateWithStartUpdateWorkflowInput:
5077+
"""Update input for :py:meth:`OutboundInterceptor.start_workflow_update_with_start`."""
5078+
5079+
id: str
5080+
update_id: Optional[str]
5081+
update: str
5082+
args: Sequence[Any]
5083+
wait_for_stage: WorkflowUpdateStage
5084+
headers: Mapping[str, temporalio.api.common.v1.Payload]
5085+
ret_type: Optional[Type]
5086+
rpc_metadata: Mapping[str, str]
5087+
rpc_timeout: Optional[timedelta]
5088+
5089+
5090+
# TODO: name
5091+
@dataclass
5092+
class StartWorkflowUpdateWithStartStartWorkflowInput:
5093+
"""StartWorkflow input for :py:meth:`OutboundInterceptor.start_workflow_update_with_start`."""
5094+
5095+
# Similar to StartWorkflowInput but without e.g. run_id, start_signal,
5096+
# start_signal_args, request_eager_start.
5097+
5098+
workflow: str
5099+
args: Sequence[Any]
5100+
id: str
5101+
task_queue: str
5102+
execution_timeout: Optional[timedelta]
5103+
run_timeout: Optional[timedelta]
5104+
task_timeout: Optional[timedelta]
5105+
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
5106+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy
5107+
retry_policy: Optional[temporalio.common.RetryPolicy]
5108+
cron_schedule: str
5109+
memo: Optional[Mapping[str, Any]]
5110+
search_attributes: Optional[
5111+
Union[
5112+
temporalio.common.SearchAttributes, temporalio.common.TypedSearchAttributes
5113+
]
5114+
]
5115+
start_delay: Optional[timedelta]
5116+
headers: Mapping[str, temporalio.api.common.v1.Payload]
5117+
# Type may be absent
5118+
ret_type: Optional[Type]
5119+
rpc_metadata: Mapping[str, str]
5120+
rpc_timeout: Optional[timedelta]
5121+
5122+
5123+
@dataclass
5124+
class StartWorkflowUpdateWithStartInput:
5125+
"""Input for :py:meth:`OutboundInterceptor.start_workflow_update_with_start`."""
5126+
5127+
update_workflow_input: StartWorkflowUpdateWithStartUpdateWorkflowInput
5128+
start_workflow_input: StartWorkflowUpdateWithStartStartWorkflowInput
5129+
5130+
50915131
@dataclass
50925132
class HeartbeatAsyncActivityInput:
50935133
"""Input for :py:meth:`OutboundInterceptor.heartbeat_async_activity`."""
@@ -5350,15 +5390,11 @@ async def start_workflow_update(
53505390
return await self.next.start_workflow_update(input)
53515391

53525392
async def start_workflow_update_with_start(
5353-
self,
5354-
update_input: StartWorkflowUpdateInput,
5355-
start_input: StartWorkflowInput,
5393+
self, input: StartWorkflowUpdateWithStartInput
53565394
) -> WorkflowUpdateHandle[Any]:
53575395
"""Called for every :py:meth:`Client.start_update_with_start` and
53585396
:py:meth:`Client.execute_update_with_start` call."""
5359-
return await self.next.start_workflow_update_with_start(
5360-
update_input, start_input
5361-
)
5397+
return await self.next.start_workflow_update_with_start(input)
53625398

53635399
### Async activity calls
53645400

@@ -5511,9 +5547,7 @@ async def _build_start_workflow_execution_request(
55115547
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest:
55125548
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
55135549
req.request_eager_execution = input.request_eager_start
5514-
await self._populate_start_or_signal_with_start_workflow_execution_request(
5515-
req, input
5516-
)
5550+
await self._populate_start_workflow_execution_request(req, input)
55175551
return req
55185552

55195553
async def _build_signal_with_start_workflow_execution_request(
@@ -5527,18 +5561,25 @@ async def _build_signal_with_start_workflow_execution_request(
55275561
req.signal_input.payloads.extend(
55285562
await self._client.data_converter.encode(input.start_signal_args)
55295563
)
5530-
await self._populate_start_or_signal_with_start_workflow_execution_request(
5531-
req, input
5532-
)
5564+
await self._populate_start_workflow_execution_request(req, input)
55335565
return req
55345566

5535-
async def _populate_start_or_signal_with_start_workflow_execution_request(
5567+
async def _build_update_with_start_start_workflow_execution_request(
5568+
self, input: StartWorkflowUpdateWithStartStartWorkflowInput
5569+
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest:
5570+
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
5571+
await self._populate_start_workflow_execution_request(req, input)
5572+
return req
5573+
5574+
async def _populate_start_workflow_execution_request(
55365575
self,
55375576
req: Union[
55385577
temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest,
55395578
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest,
55405579
],
5541-
input: StartWorkflowInput,
5580+
input: Union[
5581+
StartWorkflowInput, StartWorkflowUpdateWithStartStartWorkflowInput
5582+
],
55425583
) -> None:
55435584
req.namespace = self._client.namespace
55445585
req.workflow_id = input.id
@@ -5780,13 +5821,17 @@ async def start_workflow_update(
57805821
return handle
57815822

57825823
async def _build_update_workflow_execution_request(
5783-
self, input: StartWorkflowUpdateInput
5824+
self,
5825+
input: Union[
5826+
StartWorkflowUpdateInput, StartWorkflowUpdateWithStartUpdateWorkflowInput
5827+
],
57845828
) -> temporalio.api.workflowservice.v1.UpdateWorkflowExecutionRequest:
5829+
run_id = input.run_id if isinstance(input, StartWorkflowUpdateInput) else None
57855830
req = temporalio.api.workflowservice.v1.UpdateWorkflowExecutionRequest(
57865831
namespace=self._client.namespace,
57875832
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
57885833
workflow_id=input.id,
5789-
run_id=input.run_id or "",
5834+
run_id=run_id or "",
57905835
),
57915836
request=temporalio.api.update.v1.Request(
57925837
meta=temporalio.api.update.v1.Meta(
@@ -5814,9 +5859,17 @@ async def _build_update_workflow_execution_request(
58145859
return req
58155860

58165861
async def start_workflow_update_with_start(
5817-
self, update_input: StartWorkflowUpdateInput, start_input: StartWorkflowInput
5862+
self, input: StartWorkflowUpdateWithStartInput
58185863
) -> WorkflowUpdateHandle[Any]:
5819-
start_req = await self._build_start_workflow_execution_request(start_input)
5864+
start_input, update_input = (
5865+
input.start_workflow_input,
5866+
input.update_workflow_input,
5867+
)
5868+
start_req = (
5869+
await self._build_update_with_start_start_workflow_execution_request(
5870+
start_input
5871+
)
5872+
)
58205873
update_req = await self._build_update_workflow_execution_request(update_input)
58215874
multiop_req = temporalio.api.workflowservice.v1.ExecuteMultiOperationRequest(
58225875
namespace=self._client.namespace,

0 commit comments

Comments
 (0)