Skip to content

Commit 2d3cd43

Browse files
committed
Move definitions from bridge to worker
1 parent ec96168 commit 2d3cd43

File tree

2 files changed

+133
-145
lines changed

2 files changed

+133
-145
lines changed

temporalio/bridge/worker.py

Lines changed: 4 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,21 @@
55

66
from __future__ import annotations
77

8-
from abc import ABC, abstractmethod
98
from dataclasses import dataclass
109
from typing import (
1110
TYPE_CHECKING,
1211
Awaitable,
1312
Callable,
1413
List,
15-
Literal,
1614
Optional,
17-
Protocol,
1815
Sequence,
1916
Set,
2017
Tuple,
2118
Union,
2219
)
2320

2421
import google.protobuf.internal.containers
25-
from typing_extensions import TypeAlias, runtime_checkable
22+
from typing_extensions import TypeAlias
2623

2724
import temporalio.api.common.v1
2825
import temporalio.api.history.v1
@@ -91,136 +88,10 @@ class FixedSizeSlotSupplier:
9188
num_slots: int
9289

9390

94-
class SlotPermit:
95-
"""A permit to use a slot for a workflow/activity/local activity task.
96-
97-
You can inherit from this class to add your own data to the permit.
98-
"""
99-
100-
pass
101-
102-
103-
# WARNING: This must match Rust worker::SlotReserveCtx
104-
class SlotReserveContext(Protocol):
105-
"""Context for reserving a slot from a :py:class:`CustomSlotSupplier`."""
106-
107-
slot_type: Literal["workflow", "activity", "local-activity"]
108-
"""The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""
109-
task_queue: str
110-
"""The name of the task queue for which this reservation request is associated."""
111-
worker_identity: str
112-
"""The identity of the worker that is requesting the reservation."""
113-
worker_build_id: str
114-
"""The build id of the worker that is requesting the reservation."""
115-
is_sticky: bool
116-
"""True iff this is a reservation for a sticky poll for a workflow task."""
117-
118-
119-
@runtime_checkable
120-
class WorkflowSlotInfo(Protocol):
121-
"""Info about a workflow task slot usage."""
122-
123-
workflow_type: str
124-
is_sticky: bool
125-
126-
127-
@runtime_checkable
128-
class ActivitySlotInfo(Protocol):
129-
"""Info about an activity task slot usage."""
130-
131-
activity_type: str
132-
133-
134-
@runtime_checkable
135-
class LocalActivitySlotInfo(Protocol):
136-
"""Info about a local activity task slot usage."""
137-
138-
activity_type: str
139-
140-
141-
SlotInfo: TypeAlias = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]
142-
143-
144-
# WARNING: This must match Rust worker::SlotMarkUsedCtx
145-
class SlotMarkUsedContext(Protocol):
146-
"""Context for marking a slot used from a :py:class:`CustomSlotSupplier`."""
147-
148-
slot_info: SlotInfo
149-
"""Info about the task that will be using the slot."""
150-
permit: SlotPermit
151-
"""The permit that was issued when the slot was reserved."""
152-
153-
154-
@dataclass(frozen=True)
155-
class SlotReleaseContext:
156-
"""Context for releasing a slot from a :py:class:`CustomSlotSupplier`."""
157-
158-
slot_info: SlotInfo
159-
"""Info about the task that will be using the slot."""
160-
permit: SlotPermit
161-
"""The permit that was issued when the slot was reserved."""
162-
163-
164-
class CustomSlotSupplier(ABC):
165-
"""This class can be implemented to provide custom slot supplier behavior."""
166-
167-
@abstractmethod
168-
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
169-
"""This function is called before polling for new tasks. Your implementation must block until a
170-
slot is available then return a permit to use that slot.
171-
172-
The only acceptable exception to throw is :py:class:`asyncio.CancelledError`, as invocations of this method may
173-
be cancelled. Any other exceptions thrown will be logged and ignored.
174-
175-
Args:
176-
ctx: The context for slot reservation.
177-
178-
Returns:
179-
A permit to use the slot which may be populated with your own data.
180-
"""
181-
...
182-
183-
@abstractmethod
184-
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
185-
"""This function is called when trying to reserve slots for "eager" workflow and activity tasks.
186-
Eager tasks are those which are returned as a result of completing a workflow task, rather than
187-
from polling. Your implementation must not block, and if a slot is available, return a permit
188-
to use that slot.
189-
190-
Args:
191-
ctx: The context for slot reservation.
192-
193-
Returns:
194-
Maybe a permit to use the slot which may be populated with your own data.
195-
"""
196-
...
197-
198-
@abstractmethod
199-
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
200-
"""This function is called once a slot is actually being used to process some task, which may be
201-
some time after the slot was reserved originally. For example, if there is no work for a
202-
worker, a number of slots equal to the number of active pollers may already be reserved, but
203-
none of them are being used yet. This call should be non-blocking.
204-
205-
Args:
206-
ctx: The context for marking a slot as used.
207-
"""
208-
...
209-
210-
@abstractmethod
211-
def release_slot(self, ctx: SlotReleaseContext) -> None:
212-
"""This function is called once a permit is no longer needed. This could be because the task has
213-
finished, whether successfully or not, or because the slot was no longer needed (ex: the number
214-
of active pollers decreased). This call should be non-blocking.
215-
216-
Args:
217-
ctx: The context for releasing a slot.
218-
"""
219-
...
220-
221-
22291
SlotSupplier: TypeAlias = Union[
223-
FixedSizeSlotSupplier, ResourceBasedSlotSupplier, CustomSlotSupplier
92+
FixedSizeSlotSupplier,
93+
ResourceBasedSlotSupplier,
94+
BridgeCustomSlotSupplier,
22495
]
22596

22697

temporalio/worker/_tuning.py

Lines changed: 129 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,11 @@
33
from abc import ABC, abstractmethod
44
from dataclasses import dataclass
55
from datetime import timedelta
6-
from typing import Any, Callable, Literal, Optional, Union
6+
from typing import Any, Callable, Literal, Optional, Protocol, Union, runtime_checkable
77

88
from typing_extensions import TypeAlias
99

1010
import temporalio.bridge.worker
11-
from temporalio.bridge.worker import (
12-
ActivitySlotInfo,
13-
CustomSlotSupplier,
14-
LocalActivitySlotInfo,
15-
SlotInfo,
16-
SlotMarkUsedContext,
17-
SlotPermit,
18-
SlotReleaseContext,
19-
SlotReserveContext,
20-
WorkflowSlotInfo,
21-
)
2211

2312
_DEFAULT_RESOURCE_ACTIVITY_MAX = 500
2413

@@ -85,6 +74,134 @@ class ResourceBasedSlotSupplier:
8574
:py:class:`CompositeTuner`, all resource-based slot suppliers must use the same tuner options."""
8675

8776

77+
class SlotPermit:
78+
"""A permit to use a slot for a workflow/activity/local activity task.
79+
80+
You can inherit from this class to add your own data to the permit.
81+
"""
82+
83+
pass
84+
85+
86+
# WARNING: This must match Rust worker::SlotReserveCtx
87+
class SlotReserveContext(Protocol):
88+
"""Context for reserving a slot from a :py:class:`CustomSlotSupplier`."""
89+
90+
slot_type: Literal["workflow", "activity", "local-activity"]
91+
"""The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""
92+
task_queue: str
93+
"""The name of the task queue for which this reservation request is associated."""
94+
worker_identity: str
95+
"""The identity of the worker that is requesting the reservation."""
96+
worker_build_id: str
97+
"""The build id of the worker that is requesting the reservation."""
98+
is_sticky: bool
99+
"""True iff this is a reservation for a sticky poll for a workflow task."""
100+
101+
102+
@runtime_checkable
103+
class WorkflowSlotInfo(Protocol):
104+
"""Info about a workflow task slot usage."""
105+
106+
workflow_type: str
107+
is_sticky: bool
108+
109+
110+
@runtime_checkable
111+
class ActivitySlotInfo(Protocol):
112+
"""Info about an activity task slot usage."""
113+
114+
activity_type: str
115+
116+
117+
@runtime_checkable
118+
class LocalActivitySlotInfo(Protocol):
119+
"""Info about a local activity task slot usage."""
120+
121+
activity_type: str
122+
123+
124+
SlotInfo: TypeAlias = Union[WorkflowSlotInfo, ActivitySlotInfo, LocalActivitySlotInfo]
125+
126+
127+
# WARNING: This must match Rust worker::SlotMarkUsedCtx
128+
class SlotMarkUsedContext(Protocol):
129+
"""Context for marking a slot used from a :py:class:`CustomSlotSupplier`."""
130+
131+
slot_info: SlotInfo
132+
"""Info about the task that will be using the slot."""
133+
permit: SlotPermit
134+
"""The permit that was issued when the slot was reserved."""
135+
136+
137+
@dataclass(frozen=True)
138+
class SlotReleaseContext:
139+
"""Context for releasing a slot from a :py:class:`CustomSlotSupplier`."""
140+
141+
slot_info: SlotInfo
142+
"""Info about the task that will be using the slot."""
143+
permit: SlotPermit
144+
"""The permit that was issued when the slot was reserved."""
145+
146+
147+
class CustomSlotSupplier(ABC):
148+
"""This class can be implemented to provide custom slot supplier behavior."""
149+
150+
@abstractmethod
151+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
152+
"""This function is called before polling for new tasks. Your implementation must block until a
153+
slot is available then return a permit to use that slot.
154+
155+
The only acceptable exception to throw is :py:class:`asyncio.CancelledError`, as invocations of this method may
156+
be cancelled. Any other exceptions thrown will be logged and ignored.
157+
158+
Args:
159+
ctx: The context for slot reservation.
160+
161+
Returns:
162+
A permit to use the slot which may be populated with your own data.
163+
"""
164+
...
165+
166+
@abstractmethod
167+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
168+
"""This function is called when trying to reserve slots for "eager" workflow and activity tasks.
169+
Eager tasks are those which are returned as a result of completing a workflow task, rather than
170+
from polling. Your implementation must not block, and if a slot is available, return a permit
171+
to use that slot.
172+
173+
Args:
174+
ctx: The context for slot reservation.
175+
176+
Returns:
177+
Maybe a permit to use the slot which may be populated with your own data.
178+
"""
179+
...
180+
181+
@abstractmethod
182+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
183+
"""This function is called once a slot is actually being used to process some task, which may be
184+
some time after the slot was reserved originally. For example, if there is no work for a
185+
worker, a number of slots equal to the number of active pollers may already be reserved, but
186+
none of them are being used yet. This call should be non-blocking.
187+
188+
Args:
189+
ctx: The context for marking a slot as used.
190+
"""
191+
...
192+
193+
@abstractmethod
194+
def release_slot(self, ctx: SlotReleaseContext) -> None:
195+
"""This function is called once a permit is no longer needed. This could be because the task has
196+
finished, whether successfully or not, or because the slot was no longer needed (ex: the number
197+
of active pollers decreased). This call should be non-blocking.
198+
199+
Args:
200+
ctx: The context for releasing a slot.
201+
"""
202+
...
203+
204+
88205
SlotSupplier: TypeAlias = Union[
89206
FixedSizeSlotSupplier, ResourceBasedSlotSupplier, CustomSlotSupplier
90207
]

0 commit comments

Comments
 (0)