Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit 3c9d634

Browse files
committed
feat: registration for standalone functions
1 parent 23ad4e0 commit 3c9d634

File tree

3 files changed

+114
-41
lines changed

3 files changed

+114
-41
lines changed

examples/simple/worker.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
hatchet = Hatchet(debug=True)
99

1010

11-
@hatchet.function(timeout="11s")
11+
@hatchet.function()
1212
def step1(context: Context) -> dict[str, str]:
13-
print("executed step1")
14-
return {
15-
"step1": "step1",
16-
}
13+
message = "Hello from Hatchet!"
14+
15+
context.log(message)
16+
17+
return {"message": message}
1718

1819

1920
def main() -> None:

hatchet_sdk/v2/hatchet.py

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
TYPE_CHECKING,
55
Any,
66
Callable,
7+
Generic,
78
Optional,
89
ParamSpec,
910
Type,
1011
TypeVar,
12+
Union,
1113
cast,
1214
)
1315

@@ -55,6 +57,56 @@ def transform_desired_worker_label(d: DesiredWorkerLabel) -> DesiredWorkerLabels
5557
)
5658

5759

60+
class Function(Generic[R, TWorkflowInput]):
61+
def __init__(
62+
self,
63+
fn: Callable[[Context], R],
64+
hatchet: "Hatchet",
65+
name: str = "",
66+
on_events: list[str] = [],
67+
on_crons: list[str] = [],
68+
version: str = "",
69+
timeout: str = "60m",
70+
schedule_timeout: str = "5m",
71+
sticky: StickyStrategy | None = None,
72+
retries: int = 0,
73+
rate_limits: list[RateLimit] = [],
74+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
75+
concurrency: ConcurrencyExpression | None = None,
76+
on_failure: Union["Function[R]", None] = None,
77+
default_priority: int = 1,
78+
input_validator: Type[TWorkflowInput] | None = None,
79+
backoff_factor: float | None = None,
80+
backoff_max_seconds: int | None = None,
81+
) -> None:
82+
def func(_: Any, context: Context) -> R:
83+
return fn(context)
84+
85+
self.hatchet = hatchet
86+
self.step: Step[R] = hatchet.step(
87+
name=name or fn.__name__,
88+
timeout=timeout,
89+
retries=retries,
90+
rate_limits=rate_limits,
91+
desired_worker_labels=desired_worker_labels,
92+
backoff_factor=backoff_factor,
93+
backoff_max_seconds=backoff_max_seconds,
94+
)(func)
95+
self.on_failure_step = on_failure
96+
self.workflow_config = WorkflowConfig(
97+
name=name or fn.__name__,
98+
on_events=on_events,
99+
on_crons=on_crons,
100+
version=version,
101+
timeout=timeout,
102+
schedule_timeout=schedule_timeout,
103+
sticky=sticky,
104+
default_priority=default_priority,
105+
concurrency=concurrency,
106+
input_validator=input_validator or cast(Type[TWorkflowInput], EmptyModel),
107+
)
108+
109+
58110
class Hatchet:
59111
"""
60112
Main client for interacting with the Hatchet SDK.
@@ -207,44 +259,38 @@ def function(
207259
timeout: str = "60m",
208260
schedule_timeout: str = "5m",
209261
sticky: StickyStrategy | None = None,
210-
default_priority: int = 1,
262+
retries: int = 0,
263+
rate_limits: list[RateLimit] = [],
264+
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
211265
concurrency: ConcurrencyExpression | None = None,
266+
on_failure: Union["Function[Any]", None] = None,
267+
default_priority: int = 1,
212268
input_validator: Type[TWorkflowInput] | None = None,
213-
) -> Callable[[Callable[[Context], R]], BaseWorkflowImpl]:
214-
def inner(func: Callable[[Context], R]) -> BaseWorkflowImpl:
215-
declaration = WorkflowDeclaration[TWorkflowInput](
216-
WorkflowConfig(
217-
name=name or func.__name__,
218-
on_events=on_events,
219-
on_crons=on_crons,
220-
version=version,
221-
timeout=timeout,
222-
schedule_timeout=schedule_timeout,
223-
sticky=sticky,
224-
default_priority=default_priority,
225-
concurrency=concurrency,
226-
input_validator=input_validator
227-
or cast(Type[TWorkflowInput], EmptyModel),
228-
),
229-
self,
269+
backoff_factor: float | None = None,
270+
backoff_max_seconds: int | None = None,
271+
) -> Callable[[Callable[[Context], R]], Function[R, TWorkflowInput]]:
272+
def inner(func: Callable[[Context], R]) -> Function[R, TWorkflowInput]:
273+
return Function[R, TWorkflowInput](
274+
func,
275+
hatchet=self,
276+
name=name,
277+
on_events=on_events,
278+
on_crons=on_crons,
279+
version=version,
280+
timeout=timeout,
281+
schedule_timeout=schedule_timeout,
282+
sticky=sticky,
283+
retries=retries,
284+
rate_limits=rate_limits,
285+
desired_worker_labels=desired_worker_labels,
286+
concurrency=concurrency,
287+
on_failure=on_failure,
288+
default_priority=default_priority,
289+
input_validator=input_validator,
290+
backoff_factor=backoff_factor,
291+
backoff_max_seconds=backoff_max_seconds,
230292
)
231293

232-
class Workflow(BaseWorkflowImpl):
233-
config = declaration.config
234-
235-
@self.step(
236-
name=declaration.config.name,
237-
timeout=timeout,
238-
retries=0,
239-
rate_limits=[],
240-
backoff_factor=None,
241-
backoff_max_seconds=None,
242-
)
243-
def fn(self, context: Context) -> R:
244-
return func(context)
245-
246-
return Workflow()
247-
248294
return inner
249295

250296
def worker(

hatchet_sdk/worker/worker.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
if TYPE_CHECKING:
3939
from hatchet_sdk.v2 import BaseWorkflowImpl
40+
from hatchet_sdk.v2.hatchet import Function
4041

4142
T = TypeVar("T")
4243

@@ -110,8 +111,31 @@ def register_workflow_from_opts(
110111
logger.error(e)
111112
sys.exit(1)
112113

113-
def register_function(self, function: "BaseWorkflowImpl") -> None:
114-
self.register_workflow(function)
114+
def register_function(self, function: "Function[Any]") -> None:
115+
from hatchet_sdk.v2.workflows import BaseWorkflowImpl
116+
117+
declaration = function.hatchet.declare_workflow(
118+
**function.workflow_config.model_dump()
119+
)
120+
121+
class Workflow(BaseWorkflowImpl):
122+
config = declaration.config
123+
124+
@property
125+
def default_steps(self) -> list[Step[Any]]:
126+
return [function.step]
127+
128+
@property
129+
def on_failure_steps(self) -> list[Step[Any]]:
130+
if not function.on_failure_step:
131+
return []
132+
133+
step = function.on_failure_step.step
134+
step.type = StepType.ON_FAILURE
135+
136+
return [step]
137+
138+
self.register_workflow(Workflow())
115139

116140
def register_workflow(self, workflow: Union["BaseWorkflowImpl", Any]) -> None:
117141
namespace = self.client.config.namespace
@@ -125,7 +149,9 @@ def register_workflow(self, workflow: Union["BaseWorkflowImpl", Any]) -> None:
125149
logger.error(e)
126150
sys.exit(1)
127151

152+
print(workflow.steps)
128153
for step in workflow.steps:
154+
print(step)
129155
action_name = workflow.create_action_name(namespace, step)
130156
self.action_registry[action_name] = step
131157
return_type = get_type_hints(step.fn).get("return")

0 commit comments

Comments
 (0)