Skip to content

Commit 7e3b572

Browse files
committed
Scaffolding
1 parent 001ce8b commit 7e3b572

File tree

2 files changed

+100
-6
lines changed

2 files changed

+100
-6
lines changed

temporalio/bridge/worker.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
from __future__ import annotations
77

8+
from abc import ABC, abstractmethod
89
from dataclasses import dataclass
9-
from datetime import timedelta
1010
from typing import (
1111
TYPE_CHECKING,
1212
Awaitable,
@@ -86,7 +86,105 @@ class FixedSizeSlotSupplier:
8686
num_slots: int
8787

8888

89-
SlotSupplier: TypeAlias = Union[FixedSizeSlotSupplier, ResourceBasedSlotSupplier]
89+
@dataclass(frozen=True)
90+
class SlotPermit:
91+
"""A permit to use a slot for a workflow/activity/local activity task."""
92+
93+
pass
94+
95+
96+
@dataclass(frozen=True)
97+
class SlotReserveContext:
98+
"""Context for reserving a slot from a :py:class:`CustomSlotSupplier`."""
99+
100+
slot_type: str # TODO real type
101+
"""The type of slot trying to be reserved."""
102+
task_queue: str
103+
"""The name of the task queue for which this reservation request is associated."""
104+
worker_identity: str
105+
"""The identity of the worker that is requesting the reservation."""
106+
worker_build_id: str
107+
"""The build id of the worker that is requesting the reservation."""
108+
is_sticky: bool
109+
"""True iff this is a reservation for a sticky poll for a workflow task."""
110+
111+
112+
@dataclass(frozen=True)
113+
class SlotMarkUsedContext:
114+
"""Context for marking a slot used from a :py:class:`CustomSlotSupplier`."""
115+
116+
pass
117+
118+
119+
@dataclass(frozen=True)
120+
class SlotReleaseContext:
121+
"""Context for releasing a slot from a :py:class:`CustomSlotSupplier`."""
122+
123+
pass
124+
125+
126+
class CustomSlotSupplier(ABC):
127+
"""This class can be implemented to provide custom slot supplier behavior."""
128+
129+
# TODO: AbortError equivalent
130+
@abstractmethod
131+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
132+
"""This function is called before polling for new tasks. Your implementation must block until a
133+
slot is available then return a permit to use that slot.
134+
135+
The only acceptable exception to throw is AbortError, any other exceptions thrown will be
136+
logged and ignored.
137+
138+
Args:
139+
ctx: The context for slot reservation.
140+
141+
Returns:
142+
A permit to use the slot which may be populated with your own data.
143+
"""
144+
...
145+
146+
@abstractmethod
147+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
148+
"""This function is called when trying to reserve slots for "eager" workflow and activity tasks.
149+
Eager tasks are those which are returned as a result of completing a workflow task, rather than
150+
from polling. Your implementation must not block, and if a slot is available, return a permit
151+
to use that slot.
152+
153+
Args:
154+
ctx: The context for slot reservation.
155+
156+
Returns:
157+
Maybe a permit to use the slot which may be populated with your own data.
158+
"""
159+
...
160+
161+
@abstractmethod
162+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
163+
"""This function is called once a slot is actually being used to process some task, which may be
164+
some time after the slot was reserved originally. For example, if there is no work for a
165+
worker, a number of slots equal to the number of active pollers may already be reserved, but
166+
none of them are being used yet. This call should be non-blocking.
167+
168+
Args:
169+
ctx: The context for marking a slot as used.
170+
"""
171+
...
172+
173+
@abstractmethod
174+
def release_slot(self, ctx: SlotReleaseContext) -> None:
175+
"""This function is called once a permit is no longer needed. This could be because the task has
176+
finished, whether successfully or not, or because the slot was no longer needed (ex: the number
177+
of active pollers decreased). This call should be non-blocking.
178+
179+
Args:
180+
ctx: The context for releasing a slot.
181+
"""
182+
...
183+
184+
185+
SlotSupplier: TypeAlias = Union[
186+
FixedSizeSlotSupplier, ResourceBasedSlotSupplier, CustomSlotSupplier
187+
]
90188

91189

92190
@dataclass

temporalio/worker/_worker.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,6 @@ def __init__(
325325
disable_safe_eviction=disable_safe_workflow_eviction,
326326
)
327327

328-
workflow_slot_supplier: temporalio.bridge.worker.SlotSupplier
329-
activity_slot_supplier: temporalio.bridge.worker.SlotSupplier
330-
local_activity_slot_supplier: temporalio.bridge.worker.SlotSupplier
331-
332328
if tuner is not None:
333329
if (
334330
max_concurrent_workflow_tasks

0 commit comments

Comments
 (0)