Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit d079700

Browse files
authored
Merge pull request #5 from hatchet-dev/feat-v2-workflows-and-steps
Feat: V2 Workflows and Steps
2 parents f0ac9f4 + 6e06b57 commit d079700

File tree

12 files changed

+723
-56
lines changed

12 files changed

+723
-56
lines changed

examples/simple/worker.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,27 @@
1-
import time
2-
31
from dotenv import load_dotenv
42

5-
from hatchet_sdk import Context, Hatchet
3+
from hatchet_sdk import Context
4+
from hatchet_sdk.v2 import BaseWorkflow, Hatchet
65

76
load_dotenv()
87

98
hatchet = Hatchet(debug=True)
109

1110

12-
@hatchet.workflow(on_events=["user:create"])
13-
class MyWorkflow:
11+
class MyWorkflow(BaseWorkflow):
1412
@hatchet.step(timeout="11s", retries=3)
1513
def step1(self, context: Context) -> dict[str, str]:
1614
print("executed step1")
17-
time.sleep(10)
18-
# raise Exception("test")
1915
return {
2016
"step1": "step1",
2117
}
2218

2319

2420
def main() -> None:
25-
workflow = MyWorkflow()
21+
wf = MyWorkflow()
22+
2623
worker = hatchet.worker("test-worker", max_runs=1)
27-
worker.register_workflow(workflow)
24+
worker.register_workflow(wf)
2825
worker.start()
2926

3027

examples/v2/trigger.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from examples.v2.workflows import ExampleWorkflowInput, example_workflow
2+
3+
4+
def main() -> None:
5+
example_workflow.run(
6+
input=ExampleWorkflowInput(message="Hello, world!"),
7+
)
8+
9+
10+
if __name__ == "__main__":
11+
main()

examples/v2/worker.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from examples.v2.workflows import example_workflow, hatchet
2+
from hatchet_sdk import Context
3+
from hatchet_sdk.v2 import BaseWorkflow
4+
5+
6+
class ExampleV2Workflow(BaseWorkflow):
7+
config = example_workflow.config
8+
9+
@hatchet.step(timeout="11s", retries=3)
10+
def step1(self, context: Context) -> None:
11+
input = example_workflow.workflow_input(context)
12+
13+
print(input.message)
14+
15+
return None
16+
17+
18+
def main() -> None:
19+
worker = hatchet.worker("test-worker", max_runs=1)
20+
worker.register_workflow(ExampleV2Workflow())
21+
worker.start()
22+
23+
24+
if __name__ == "__main__":
25+
main()

examples/v2/workflows.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from dotenv import load_dotenv
2+
from pydantic import BaseModel
3+
4+
from hatchet_sdk.v2 import Hatchet
5+
6+
load_dotenv()
7+
8+
hatchet = Hatchet(debug=True)
9+
10+
11+
class ExampleWorkflowInput(BaseModel):
12+
message: str
13+
14+
15+
example_workflow = hatchet.declare_workflow(
16+
name="example-workflow",
17+
on_events=["example-event"],
18+
timeout="10m",
19+
input_validator=ExampleWorkflowInput,
20+
)

hatchet_sdk/clients/admin.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ def _prepare_workflow_request(
9191
payload_data = json.dumps(input)
9292
_options = options.model_dump()
9393

94+
_options.pop("namespace")
95+
9496
try:
9597
_options["additional_metadata"] = json.dumps(
9698
options.additional_metadata

hatchet_sdk/v2/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .hatchet import Hatchet as Hatchet
2+
from .workflows import BaseWorkflow as BaseWorkflow
3+
from .workflows import WorkflowConfig as WorkflowConfig

hatchet_sdk/v2/hatchet.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import asyncio
2+
import logging
3+
from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar, cast
4+
5+
from hatchet_sdk.client import Client, new_client, new_client_raw
6+
from hatchet_sdk.clients.admin import AdminClient
7+
from hatchet_sdk.clients.dispatcher.dispatcher import DispatcherClient
8+
from hatchet_sdk.clients.events import EventClient
9+
from hatchet_sdk.clients.rest_client import RestApi
10+
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
11+
from hatchet_sdk.context.context import Context
12+
from hatchet_sdk.contracts.workflows_pb2 import DesiredWorkerLabels
13+
from hatchet_sdk.features.cron import CronClient
14+
from hatchet_sdk.features.scheduled import ScheduledClient
15+
from hatchet_sdk.labels import DesiredWorkerLabel
16+
from hatchet_sdk.loader import ClientConfig
17+
from hatchet_sdk.logger import logger
18+
from hatchet_sdk.rate_limit import RateLimit
19+
from hatchet_sdk.v2.workflows import (
20+
ConcurrencyExpression,
21+
EmptyModel,
22+
Step,
23+
StepType,
24+
StickyStrategy,
25+
TWorkflowInput,
26+
WorkflowConfig,
27+
WorkflowDeclaration,
28+
)
29+
30+
if TYPE_CHECKING:
31+
from hatchet_sdk.worker.worker import Worker
32+
33+
R = TypeVar("R")
34+
35+
36+
def transform_desired_worker_label(d: DesiredWorkerLabel) -> DesiredWorkerLabels:
37+
value = d.value
38+
return DesiredWorkerLabels(
39+
strValue=value if not isinstance(value, int) else None,
40+
intValue=value if isinstance(value, int) else None,
41+
required=d.required,
42+
weight=d.weight,
43+
comparator=d.comparator, # type: ignore[arg-type]
44+
)
45+
46+
47+
class Hatchet:
48+
"""
49+
Main client for interacting with the Hatchet SDK.
50+
51+
This class provides access to various client interfaces and utility methods
52+
for working with Hatchet workers, workflows, and steps.
53+
54+
Attributes:
55+
cron (CronClient): Interface for cron trigger operations.
56+
57+
admin (AdminClient): Interface for administrative operations.
58+
dispatcher (DispatcherClient): Interface for dispatching operations.
59+
event (EventClient): Interface for event-related operations.
60+
rest (RestApi): Interface for REST API operations.
61+
"""
62+
63+
_client: Client
64+
cron: CronClient
65+
scheduled: ScheduledClient
66+
67+
@classmethod
68+
def from_environment(
69+
cls, defaults: ClientConfig = ClientConfig(), **kwargs: Any
70+
) -> "Hatchet":
71+
return cls(client=new_client(defaults), **kwargs)
72+
73+
@classmethod
74+
def from_config(cls, config: ClientConfig, **kwargs: Any) -> "Hatchet":
75+
return cls(client=new_client_raw(config), **kwargs)
76+
77+
def __init__(
78+
self,
79+
debug: bool = False,
80+
client: Optional[Client] = None,
81+
config: ClientConfig = ClientConfig(),
82+
):
83+
"""
84+
Initialize a new Hatchet instance.
85+
86+
Args:
87+
debug (bool, optional): Enable debug logging. Defaults to False.
88+
client (Optional[Client], optional): A pre-configured Client instance. Defaults to None.
89+
config (ClientConfig, optional): Configuration for creating a new Client. Defaults to ClientConfig().
90+
"""
91+
if client is not None:
92+
self._client = client
93+
else:
94+
self._client = new_client(config, debug)
95+
96+
if debug:
97+
logger.setLevel(logging.DEBUG)
98+
99+
self.cron = CronClient(self._client)
100+
self.scheduled = ScheduledClient(self._client)
101+
102+
@property
103+
def admin(self) -> AdminClient:
104+
return self._client.admin
105+
106+
@property
107+
def dispatcher(self) -> DispatcherClient:
108+
return self._client.dispatcher
109+
110+
@property
111+
def event(self) -> EventClient:
112+
return self._client.event
113+
114+
@property
115+
def rest(self) -> RestApi:
116+
return self._client.rest
117+
118+
@property
119+
def listener(self) -> RunEventListenerClient:
120+
return self._client.listener
121+
122+
@property
123+
def config(self) -> ClientConfig:
124+
return self._client.config
125+
126+
@property
127+
def tenant_id(self) -> str:
128+
return self._client.config.tenant_id
129+
130+
def step(
131+
self,
132+
name: str = "",
133+
timeout: str = "60m",
134+
parents: list[str] = [],
135+
retries: int = 0,
136+
rate_limits: list[RateLimit] = [],
137+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
138+
backoff_factor: float | None = None,
139+
backoff_max_seconds: int | None = None,
140+
) -> Callable[[Callable[[Any, Context], Any]], Step[R]]:
141+
def inner(func: Callable[[Any, Context], R]) -> Step[R]:
142+
return Step(
143+
fn=func,
144+
type=StepType.DEFAULT,
145+
name=name.lower() or str(func.__name__).lower(),
146+
timeout=timeout,
147+
parents=parents,
148+
retries=retries,
149+
rate_limits=[r for rate_limit in rate_limits if (r := rate_limit._req)],
150+
desired_worker_labels={
151+
key: transform_desired_worker_label(d)
152+
for key, d in desired_worker_labels.items()
153+
},
154+
backoff_factor=backoff_factor,
155+
backoff_max_seconds=backoff_max_seconds,
156+
)
157+
158+
return inner
159+
160+
def on_failure_step(
161+
self,
162+
name: str = "",
163+
timeout: str = "60m",
164+
parents: list[str] = [],
165+
retries: int = 0,
166+
rate_limits: list[RateLimit] = [],
167+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
168+
backoff_factor: float | None = None,
169+
backoff_max_seconds: int | None = None,
170+
) -> Callable[[Callable[[Any, Context], Any]], Step[R]]:
171+
def inner(func: Callable[[Any, Context], R]) -> Step[R]:
172+
return Step(
173+
fn=func,
174+
type=StepType.ON_FAILURE,
175+
name=name.lower() or str(func.__name__).lower(),
176+
timeout=timeout,
177+
parents=parents,
178+
retries=retries,
179+
rate_limits=[r for rate_limit in rate_limits if (r := rate_limit._req)],
180+
desired_worker_labels={
181+
key: transform_desired_worker_label(d)
182+
for key, d in desired_worker_labels.items()
183+
},
184+
backoff_factor=backoff_factor,
185+
backoff_max_seconds=backoff_max_seconds,
186+
)
187+
188+
return inner
189+
190+
def worker(
191+
self, name: str, max_runs: int | None = None, labels: dict[str, str | int] = {}
192+
) -> "Worker":
193+
from hatchet_sdk.worker.worker import Worker
194+
195+
try:
196+
loop = asyncio.get_running_loop()
197+
except RuntimeError:
198+
loop = None
199+
200+
return Worker(
201+
name=name,
202+
max_runs=max_runs,
203+
labels=labels,
204+
config=self._client.config,
205+
debug=self._client.debug,
206+
owned_loop=loop is None,
207+
)
208+
209+
def declare_workflow(
210+
self,
211+
name: str = "",
212+
on_events: list[str] = [],
213+
on_crons: list[str] = [],
214+
version: str = "",
215+
timeout: str = "60m",
216+
schedule_timeout: str = "5m",
217+
sticky: StickyStrategy | None = None,
218+
default_priority: int = 1,
219+
concurrency: ConcurrencyExpression | None = None,
220+
input_validator: Type[TWorkflowInput] | None = None,
221+
) -> WorkflowDeclaration[TWorkflowInput]:
222+
return WorkflowDeclaration[TWorkflowInput](
223+
WorkflowConfig(
224+
name=name,
225+
on_events=on_events,
226+
on_crons=on_crons,
227+
version=version,
228+
timeout=timeout,
229+
schedule_timeout=schedule_timeout,
230+
sticky=sticky,
231+
default_priority=default_priority,
232+
concurrency=concurrency,
233+
input_validator=input_validator
234+
or cast(Type[TWorkflowInput], EmptyModel),
235+
),
236+
self,
237+
)

0 commit comments

Comments
 (0)