Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit c846a01

Browse files
committed
feat: initial pass at hatchet function
1 parent c73f9ba commit c846a01

File tree

2 files changed

+304
-10
lines changed

2 files changed

+304
-10
lines changed

examples/simple/worker.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@
33
hatchet = Hatchet(debug=True)
44

55

6-
class MyWorkflow(BaseWorkflow):
7-
@hatchet.step(timeout="11s", retries=3)
8-
def step1(self, context: Context) -> dict[str, str]:
9-
print("executed step1")
10-
return {
11-
"step1": "step1",
12-
}
6+
@hatchet.function(timeout="11s")
7+
def step1(context: Context) -> dict[str, str]:
8+
print("executed step1")
9+
return {
10+
"step1": "step1",
11+
}
1312

1413

1514
def main() -> None:
16-
wf = MyWorkflow()
17-
1815
worker = hatchet.worker("test-worker", max_runs=1)
19-
worker.register_workflow(wf)
16+
worker.register_function(step1)
2017
worker.start()
2118

2219

hatchet_sdk/v2/hatchet.py

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
import asyncio
2+
import logging
3+
from typing import (
4+
TYPE_CHECKING,
5+
Any,
6+
Callable,
7+
Optional,
8+
ParamSpec,
9+
Type,
10+
TypeVar,
11+
cast,
12+
)
13+
14+
from hatchet_sdk.client import Client, new_client, new_client_raw
15+
from hatchet_sdk.clients.admin import AdminClient
16+
from hatchet_sdk.clients.dispatcher.dispatcher import DispatcherClient
17+
from hatchet_sdk.clients.events import EventClient
18+
from hatchet_sdk.clients.rest_client import RestApi
19+
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
20+
from hatchet_sdk.context.context import Context
21+
from hatchet_sdk.contracts.workflows_pb2 import DesiredWorkerLabels
22+
from hatchet_sdk.features.cron import CronClient
23+
from hatchet_sdk.features.scheduled import ScheduledClient
24+
from hatchet_sdk.labels import DesiredWorkerLabel
25+
from hatchet_sdk.loader import ClientConfig
26+
from hatchet_sdk.logger import logger
27+
from hatchet_sdk.rate_limit import RateLimit
28+
from hatchet_sdk.v2.workflows import (
29+
BaseWorkflowImpl,
30+
ConcurrencyExpression,
31+
EmptyModel,
32+
Step,
33+
StepType,
34+
StickyStrategy,
35+
TWorkflowInput,
36+
WorkflowConfig,
37+
WorkflowDeclaration,
38+
)
39+
40+
if TYPE_CHECKING:
41+
from hatchet_sdk.worker.worker import Worker
42+
43+
P = ParamSpec("P")
44+
R = TypeVar("R")
45+
46+
47+
def transform_desired_worker_label(d: DesiredWorkerLabel) -> DesiredWorkerLabels:
48+
value = d.value
49+
return DesiredWorkerLabels(
50+
strValue=value if not isinstance(value, int) else None,
51+
intValue=value if isinstance(value, int) else None,
52+
required=d.required,
53+
weight=d.weight,
54+
comparator=d.comparator, # type: ignore[arg-type]
55+
)
56+
57+
58+
class Hatchet:
59+
"""
60+
Main client for interacting with the Hatchet SDK.
61+
62+
This class provides access to various client interfaces and utility methods
63+
for working with Hatchet workers, workflows, and steps.
64+
65+
Attributes:
66+
cron (CronClient): Interface for cron trigger operations.
67+
68+
admin (AdminClient): Interface for administrative operations.
69+
dispatcher (DispatcherClient): Interface for dispatching operations.
70+
event (EventClient): Interface for event-related operations.
71+
rest (RestApi): Interface for REST API operations.
72+
"""
73+
74+
_client: Client
75+
cron: CronClient
76+
scheduled: ScheduledClient
77+
78+
@classmethod
79+
def from_environment(
80+
cls, defaults: ClientConfig = ClientConfig(), **kwargs: Any
81+
) -> "Hatchet":
82+
return cls(client=new_client(defaults), **kwargs)
83+
84+
@classmethod
85+
def from_config(cls, config: ClientConfig, **kwargs: Any) -> "Hatchet":
86+
return cls(client=new_client_raw(config), **kwargs)
87+
88+
def __init__(
89+
self,
90+
debug: bool = False,
91+
client: Optional[Client] = None,
92+
config: ClientConfig = ClientConfig(),
93+
):
94+
"""
95+
Initialize a new Hatchet instance.
96+
97+
Args:
98+
debug (bool, optional): Enable debug logging. Defaults to False.
99+
client (Optional[Client], optional): A pre-configured Client instance. Defaults to None.
100+
config (ClientConfig, optional): Configuration for creating a new Client. Defaults to ClientConfig().
101+
"""
102+
if client is not None:
103+
self._client = client
104+
else:
105+
self._client = new_client(config, debug)
106+
107+
if debug:
108+
logger.setLevel(logging.DEBUG)
109+
110+
self.cron = CronClient(self._client)
111+
self.scheduled = ScheduledClient(self._client)
112+
113+
@property
114+
def admin(self) -> AdminClient:
115+
return self._client.admin
116+
117+
@property
118+
def dispatcher(self) -> DispatcherClient:
119+
return self._client.dispatcher
120+
121+
@property
122+
def event(self) -> EventClient:
123+
return self._client.event
124+
125+
@property
126+
def rest(self) -> RestApi:
127+
return self._client.rest
128+
129+
@property
130+
def listener(self) -> RunEventListenerClient:
131+
return self._client.listener
132+
133+
@property
134+
def config(self) -> ClientConfig:
135+
return self._client.config
136+
137+
@property
138+
def tenant_id(self) -> str:
139+
return self._client.config.tenant_id
140+
141+
def step(
142+
self,
143+
name: str = "",
144+
timeout: str = "60m",
145+
parents: list[str] = [],
146+
retries: int = 0,
147+
rate_limits: list[RateLimit] = [],
148+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
149+
backoff_factor: float | None = None,
150+
backoff_max_seconds: int | None = None,
151+
) -> Callable[[Callable[[Any, Context], Any]], Step[R]]:
152+
def inner(func: Callable[[Any, Context], R]) -> Step[R]:
153+
return Step(
154+
fn=func,
155+
type=StepType.DEFAULT,
156+
name=name.lower() or str(func.__name__).lower(),
157+
timeout=timeout,
158+
parents=parents,
159+
retries=retries,
160+
rate_limits=[r for rate_limit in rate_limits if (r := rate_limit._req)],
161+
desired_worker_labels={
162+
key: transform_desired_worker_label(d)
163+
for key, d in desired_worker_labels.items()
164+
},
165+
backoff_factor=backoff_factor,
166+
backoff_max_seconds=backoff_max_seconds,
167+
)
168+
169+
return inner
170+
171+
def on_failure_step(
172+
self,
173+
name: str = "",
174+
timeout: str = "60m",
175+
parents: list[str] = [],
176+
retries: int = 0,
177+
rate_limits: list[RateLimit] = [],
178+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
179+
backoff_factor: float | None = None,
180+
backoff_max_seconds: int | None = None,
181+
) -> Callable[[Callable[[Any, Context], Any]], Step[R]]:
182+
def inner(func: Callable[[Any, Context], R]) -> Step[R]:
183+
return Step(
184+
fn=func,
185+
type=StepType.ON_FAILURE,
186+
name=name.lower() or str(func.__name__).lower(),
187+
timeout=timeout,
188+
parents=parents,
189+
retries=retries,
190+
rate_limits=[r for rate_limit in rate_limits if (r := rate_limit._req)],
191+
desired_worker_labels={
192+
key: transform_desired_worker_label(d)
193+
for key, d in desired_worker_labels.items()
194+
},
195+
backoff_factor=backoff_factor,
196+
backoff_max_seconds=backoff_max_seconds,
197+
)
198+
199+
return inner
200+
201+
def function(
202+
self,
203+
name: str = "",
204+
on_events: list[str] = [],
205+
on_crons: list[str] = [],
206+
version: str = "",
207+
timeout: str = "60m",
208+
schedule_timeout: str = "5m",
209+
sticky: StickyStrategy | None = None,
210+
default_priority: int = 1,
211+
concurrency: ConcurrencyExpression | None = None,
212+
input_validator: Type[TWorkflowInput] | None = None,
213+
) -> Callable[[Callable[[Context], R]], BaseWorkflowImpl]:
214+
declaration = WorkflowDeclaration[TWorkflowInput](
215+
WorkflowConfig(
216+
name=name,
217+
on_events=on_events,
218+
on_crons=on_crons,
219+
version=version,
220+
timeout=timeout,
221+
schedule_timeout=schedule_timeout,
222+
sticky=sticky,
223+
default_priority=default_priority,
224+
concurrency=concurrency,
225+
input_validator=input_validator
226+
or cast(Type[TWorkflowInput], EmptyModel),
227+
),
228+
self,
229+
)
230+
231+
def inner(func: Callable[[Context], R]) -> BaseWorkflowImpl:
232+
class Workflow(BaseWorkflowImpl):
233+
config = declaration.config
234+
235+
@self.step(
236+
name=name,
237+
timeout=timeout,
238+
retries=0,
239+
rate_limits=[],
240+
backoff_factor=None,
241+
backoff_max_seconds=None,
242+
)
243+
def fn(self, context: Context) -> R:
244+
return func(context)
245+
246+
return Workflow()
247+
248+
return inner
249+
250+
def worker(
251+
self, name: str, max_runs: int | None = None, labels: dict[str, str | int] = {}
252+
) -> "Worker":
253+
from hatchet_sdk.worker.worker import Worker
254+
255+
try:
256+
loop = asyncio.get_running_loop()
257+
except RuntimeError:
258+
loop = None
259+
260+
return Worker(
261+
name=name,
262+
max_runs=max_runs,
263+
labels=labels,
264+
config=self._client.config,
265+
debug=self._client.debug,
266+
owned_loop=loop is None,
267+
)
268+
269+
def declare_workflow(
270+
self,
271+
name: str = "",
272+
on_events: list[str] = [],
273+
on_crons: list[str] = [],
274+
version: str = "",
275+
timeout: str = "60m",
276+
schedule_timeout: str = "5m",
277+
sticky: StickyStrategy | None = None,
278+
default_priority: int = 1,
279+
concurrency: ConcurrencyExpression | None = None,
280+
input_validator: Type[TWorkflowInput] | None = None,
281+
) -> WorkflowDeclaration[TWorkflowInput]:
282+
return WorkflowDeclaration[TWorkflowInput](
283+
WorkflowConfig(
284+
name=name,
285+
on_events=on_events,
286+
on_crons=on_crons,
287+
version=version,
288+
timeout=timeout,
289+
schedule_timeout=schedule_timeout,
290+
sticky=sticky,
291+
default_priority=default_priority,
292+
concurrency=concurrency,
293+
input_validator=input_validator
294+
or cast(Type[TWorkflowInput], EmptyModel),
295+
),
296+
self,
297+
)

0 commit comments

Comments
 (0)