Skip to content

Commit d7e0747

Browse files
committed
Review fixes
1 parent de38b9d commit d7e0747

File tree

5 files changed

+88
-74
lines changed

5 files changed

+88
-74
lines changed

temporalio/client.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
Mapping,
2727
Optional,
2828
Sequence,
29+
Tuple,
2930
Type,
3031
Union,
3132
cast,
@@ -43,6 +44,7 @@
4344
import temporalio.api.failure.v1
4445
import temporalio.api.history.v1
4546
import temporalio.api.schedule.v1
47+
import temporalio.api.sdk.v1
4648
import temporalio.api.taskqueue.v1
4749
import temporalio.api.update.v1
4850
import temporalio.api.workflow.v1
@@ -2423,8 +2425,8 @@ async def _from_raw_description(
24232425
description: temporalio.api.workflowservice.v1.DescribeWorkflowExecutionResponse,
24242426
converter: temporalio.converter.DataConverter,
24252427
) -> WorkflowExecutionDescription:
2426-
(summ, deets) = await converter._decode_user_metadata(
2427-
description.execution_config.user_metadata
2428+
(summ, deets) = await _decode_user_metadata(
2429+
converter, description.execution_config.user_metadata
24282430
)
24292431
return WorkflowExecutionDescription._from_raw_info( # type: ignore
24302432
description.workflow_execution_info,
@@ -3547,8 +3549,8 @@ async def _to_proto(
35473549
for k, v in self.memo.items()
35483550
},
35493551
),
3550-
user_metadata=await client.data_converter._encode_user_metadata(
3551-
self.static_summary, self.static_details
3552+
user_metadata=await _encode_user_metadata(
3553+
client.data_converter, self.static_summary, self.static_details
35523554
),
35533555
),
35543556
)
@@ -5141,8 +5143,8 @@ async def start_workflow(
51415143
temporalio.converter.encode_search_attributes(
51425144
input.search_attributes, req.search_attributes
51435145
)
5144-
metadata = await self._client.data_converter._encode_user_metadata(
5145-
input.static_summary, input.static_details
5146+
metadata = await _encode_user_metadata(
5147+
self._client.data_converter, input.static_summary, input.static_details
51465148
)
51475149
if metadata is not None:
51485150
req.user_metadata.CopyFrom(metadata)
@@ -6373,3 +6375,42 @@ def api_key(self, value: Optional[str]) -> None:
63736375
# Update config and perform update
63746376
self.service_client.config.api_key = value
63756377
self.service_client.update_api_key(value)
6378+
6379+
6380+
async def _encode_user_metadata(
6381+
converter: temporalio.converter.DataConverter,
6382+
summary: Optional[Union[str, temporalio.api.common.v1.Payload]],
6383+
details: Optional[Union[str, temporalio.api.common.v1.Payload]],
6384+
) -> Optional[temporalio.api.sdk.v1.UserMetadata]:
6385+
if summary is None and details is None:
6386+
return None
6387+
enc_summary = None
6388+
enc_details = None
6389+
if summary is not None:
6390+
if isinstance(summary, str):
6391+
enc_summary = (await converter.encode([summary]))[0]
6392+
else:
6393+
enc_summary = summary
6394+
if details is not None:
6395+
if isinstance(details, str):
6396+
enc_details = (await converter.encode([details]))[0]
6397+
else:
6398+
enc_details = details
6399+
return temporalio.api.sdk.v1.UserMetadata(summary=enc_summary, details=enc_details)
6400+
6401+
6402+
async def _decode_user_metadata(
6403+
converter: temporalio.converter.DataConverter,
6404+
metadata: Optional[temporalio.api.sdk.v1.UserMetadata],
6405+
) -> Tuple[Optional[str], Optional[str]]:
6406+
"""Returns (summary, details)"""
6407+
if metadata is None:
6408+
return None, None
6409+
return (
6410+
None
6411+
if not metadata.HasField("summary")
6412+
else (await converter.decode([metadata.summary]))[0],
6413+
None
6414+
if not metadata.HasField("details")
6415+
else (await converter.decode([metadata.details]))[0],
6416+
)

temporalio/converter.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,29 +1085,6 @@ async def encode_wrapper(
10851085
"""
10861086
return temporalio.api.common.v1.Payloads(payloads=(await self.encode(values)))
10871087

1088-
async def _encode_user_metadata(
1089-
self,
1090-
summary: Optional[Union[str, temporalio.api.common.v1.Payload]],
1091-
details: Optional[Union[str, temporalio.api.common.v1.Payload]],
1092-
) -> Optional[temporalio.api.sdk.v1.UserMetadata]:
1093-
if summary is None and details is None:
1094-
return None
1095-
enc_summary = None
1096-
enc_details = None
1097-
if summary is not None:
1098-
if isinstance(summary, str):
1099-
enc_summary = (await self.encode([summary]))[0]
1100-
else:
1101-
enc_summary = summary
1102-
if details is not None:
1103-
if isinstance(details, str):
1104-
enc_details = (await self.encode([details]))[0]
1105-
else:
1106-
enc_details = details
1107-
return temporalio.api.sdk.v1.UserMetadata(
1108-
summary=enc_summary, details=enc_details
1109-
)
1110-
11111088
async def decode_wrapper(
11121089
self,
11131090
payloads: Optional[temporalio.api.common.v1.Payloads],
@@ -1136,21 +1113,6 @@ async def decode_failure(
11361113
await self.payload_codec.decode_failure(failure)
11371114
return self.failure_converter.from_failure(failure, self.payload_converter)
11381115

1139-
async def _decode_user_metadata(
1140-
self, metadata: Optional[temporalio.api.sdk.v1.UserMetadata]
1141-
) -> Tuple[Optional[str], Optional[str]]:
1142-
"""Returns (summary, details)"""
1143-
if metadata is None:
1144-
return None, None
1145-
return (
1146-
None
1147-
if not metadata.HasField("summary")
1148-
else (await self.decode([metadata.summary]))[0],
1149-
None
1150-
if not metadata.HasField("details")
1151-
else (await self.decode([metadata.details]))[0],
1152-
)
1153-
11541116

11551117
DefaultPayloadConverter.default_encoding_payload_converters = (
11561118
BinaryNullPayloadConverter(),

temporalio/worker/_workflow_instance.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
331331

332332
# The current details (as opposed to static details on workflow start), returned in the
333333
# metadata query
334-
self._current_details: str = ""
334+
self._current_details = ""
335335

336336
def get_thread_id(self) -> Optional[int]:
337337
return self._current_thread_id
@@ -1449,9 +1449,13 @@ async def workflow_sleep(
14491449
if summary
14501450
else None
14511451
)
1452+
fut = self.create_future()
14521453
self._timer_impl(
1453-
duration, lambda _: None, options=_TimerOptions(user_metadata=user_metadata)
1454+
duration,
1455+
_TimerOptions(user_metadata=user_metadata),
1456+
lambda: fut.set_result(None),
14541457
)
1458+
await fut
14551459

14561460
async def workflow_wait_condition(
14571461
self,
@@ -1476,9 +1480,9 @@ async def workflow_wait_condition(
14761480
def workflow_get_current_details(self) -> str:
14771481
return self._current_details
14781482

1479-
def workflow_set_current_details(self, description):
1483+
def workflow_set_current_details(self, details: str):
14801484
self._assert_not_read_only("set current details")
1481-
self._current_details = description
1485+
self._current_details = details
14821486

14831487
#### Calls from outbound impl ####
14841488
# These are in alphabetical order and all start with "_outbound_".
@@ -2014,24 +2018,24 @@ def _enhanced_stack_trace(self) -> temporalio.api.sdk.v1.EnhancedStackTrace:
20142018
def _temporal_workflow_metadata(self) -> temporalio.api.sdk.v1.WorkflowMetadata:
20152019
query_definitions = [
20162020
temporalio.api.sdk.v1.WorkflowInteractionDefinition(
2017-
name=qd.name if qd.name is not None else "",
2018-
description=qd.description if qd.description is not None else "",
2021+
name=qd.name or "",
2022+
description=qd.description or "",
20192023
)
20202024
for qd in self._queries.values()
20212025
]
20222026
query_definitions.sort(key=lambda qd: qd.name)
20232027
signal_definitions = [
20242028
temporalio.api.sdk.v1.WorkflowInteractionDefinition(
2025-
name=sd.name if sd.name is not None else "",
2026-
description=sd.description if sd.description is not None else "",
2029+
name=sd.name or "",
2030+
description=sd.description or "",
20272031
)
20282032
for sd in self._signals.values()
20292033
]
20302034
signal_definitions.sort(key=lambda sd: sd.name)
20312035
update_definitions = [
20322036
temporalio.api.sdk.v1.WorkflowInteractionDefinition(
2033-
name=ud.name if ud.name is not None else "",
2034-
description=ud.description if ud.description is not None else "",
2037+
name=ud.name or "",
2038+
description=ud.description or "",
20352039
)
20362040
for ud in self._updates.values()
20372041
]
@@ -2050,10 +2054,10 @@ def _temporal_workflow_metadata(self) -> temporalio.api.sdk.v1.WorkflowMetadata:
20502054
def _timer_impl(
20512055
self,
20522056
delay: float,
2057+
options: _TimerOptions,
20532058
callback: Callable[..., Any],
20542059
*args: Any,
20552060
context: Optional[contextvars.Context] = None,
2056-
options: Optional[_TimerOptions] = None,
20572061
):
20582062
self._assert_not_read_only("schedule timer")
20592063
# Delay must be positive
@@ -2062,12 +2066,6 @@ def _timer_impl(
20622066

20632067
# Create, schedule, and return
20642068
seq = self._next_seq("timer")
2065-
# If options aren't explicitly passed, attempt to fetch them from the class field,
2066-
# erasing them afterward. Support callers who cannot call this directly because they
2067-
# rely on asyncio functions.
2068-
if options is None:
2069-
options = self._next_timer_options
2070-
self._next_timer_options = None
20712069
handle = _TimerHandle(
20722070
seq, self.time() + delay, options, callback, args, self, context
20732071
)
@@ -2079,7 +2077,6 @@ def _timer_impl(
20792077
# These are in the order defined in CPython's impl of the base class. Many
20802078
# functions are intentionally not implemented/supported.
20812079

2082-
# TODO/Review: This doesn't appear to implement any base class fn and isn't called anywhere?
20832080
def _timer_handle_cancelled(self, handle: asyncio.TimerHandle) -> None:
20842081
if not isinstance(handle, _TimerHandle):
20852082
raise TypeError("Expected Temporal timer handle")
@@ -2107,7 +2104,12 @@ def call_later(
21072104
*args: Any,
21082105
context: Optional[contextvars.Context] = None,
21092106
) -> asyncio.TimerHandle:
2110-
return self._timer_impl(delay, callback, *args, context=context)
2107+
# Fetch options from the class field, erasing them afterward.
2108+
options = (
2109+
self._next_timer_options if self._next_timer_options else _TimerOptions()
2110+
)
2111+
self._next_timer_options = None
2112+
return self._timer_impl(delay, options, callback, *args, context=context)
21112113

21122114
def call_at(
21132115
self,

temporalio/workflow.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ def signal(
224224
def signal(
225225
*,
226226
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
227+
description: Optional[str] = None,
227228
) -> Callable[
228229
[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType
229230
]: ...
@@ -251,16 +252,6 @@ def signal(
251252
]: ...
252253

253254

254-
@overload
255-
def signal(
256-
*,
257-
description: str,
258-
unfinished_policy: HandlerUnfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON,
259-
) -> Callable[
260-
[CallableSyncOrAsyncReturnNoneType], CallableSyncOrAsyncReturnNoneType
261-
]: ...
262-
263-
264255
def signal(
265256
fn: Optional[CallableSyncOrAsyncReturnNoneType] = None,
266257
*,
@@ -768,7 +759,7 @@ async def workflow_wait_condition(
768759
def workflow_get_current_details(self) -> str: ...
769760

770761
@abstractmethod
771-
def workflow_set_current_details(self, description): ...
762+
def workflow_set_current_details(self, details: str): ...
772763

773764

774765
_current_update_info: contextvars.ContextVar[UpdateInfo] = contextvars.ContextVar(

tests/worker/test_workflow.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6287,3 +6287,21 @@ async def waiting() -> bool:
62876287
describe_r = await handle.describe()
62886288
assert describe_r.static_summary == "cool workflow bro"
62896289
assert describe_r.static_details == "xtremely detailed"
6290+
6291+
6292+
@workflow.defn
6293+
class WorkflowSleepWorkflow:
6294+
@workflow.run
6295+
async def run(self) -> None:
6296+
await workflow.sleep(1)
6297+
6298+
6299+
async def test_workflow_sleep(client: Client):
6300+
async with new_worker(client, WorkflowSleepWorkflow) as worker:
6301+
start_time = datetime.now()
6302+
await client.execute_workflow(
6303+
WorkflowSleepWorkflow.run,
6304+
id=f"workflow-{uuid.uuid4()}",
6305+
task_queue=worker.task_queue,
6306+
)
6307+
assert (datetime.now() - start_time) >= timedelta(seconds=1)

0 commit comments

Comments
 (0)