Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit 9da49fd

Browse files
committed
feat: workflow implementation, declaration, and config
1 parent 0d5310c commit 9da49fd

File tree

5 files changed

+92
-12
lines changed

5 files changed

+92
-12
lines changed

examples/simple/worker.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
import time
2-
31
from dotenv import load_dotenv
42

53
from hatchet_sdk import Context
6-
from hatchet_sdk.v2 import Hatchet, Workflow, WorkflowConfig
4+
from hatchet_sdk.v2 import BaseWorkflowImpl, Hatchet
75

86
load_dotenv()
97

108
hatchet = Hatchet(debug=True)
119

1210

13-
class MyWorkflow(Workflow):
14-
config = WorkflowConfig(name="foobar", on_events=["user:create"])
11+
class MyWorkflow(BaseWorkflowImpl):
12+
declaration = hatchet.declare_workflow(
13+
name="foobar",
14+
on_events=["user:create"],
15+
)
1516

1617
@hatchet.step(timeout="11s", retries=3)
1718
def step1(self, context: Context) -> dict[str, str]:

hatchet_sdk/v2/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from .hatchet import Hatchet as Hatchet
2-
from .workflows import Workflow as Workflow
2+
from .workflows import BaseWorkflowImpl as BaseWorkflowImpl
33
from .workflows import WorkflowConfig as WorkflowConfig

hatchet_sdk/v2/hatchet.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
11
import asyncio
22
import logging
3-
from typing import Any, Callable, Optional, TypeVar
3+
from enum import Enum
4+
from typing import (
5+
Any,
6+
Awaitable,
7+
Callable,
8+
Generic,
9+
Optional,
10+
ParamSpec,
11+
Type,
12+
TypeGuard,
13+
TypeVar,
14+
Union,
15+
cast,
16+
)
17+
18+
from pydantic import BaseModel, ConfigDict
419

520
from hatchet_sdk.client import Client, new_client, new_client_raw
621
from hatchet_sdk.clients.admin import AdminClient
@@ -16,7 +31,16 @@
1631
from hatchet_sdk.loader import ClientConfig
1732
from hatchet_sdk.logger import logger
1833
from hatchet_sdk.rate_limit import RateLimit
19-
from hatchet_sdk.v2.workflows import Step, StepType
34+
from hatchet_sdk.v2.workflows import (
35+
ConcurrencyExpression,
36+
EmptyModel,
37+
Step,
38+
StepType,
39+
StickyStrategy,
40+
TWorkflowInput,
41+
WorkflowConfig,
42+
WorkflowDeclaration,
43+
)
2044
from hatchet_sdk.worker.worker import Worker
2145

2246
R = TypeVar("R")
@@ -192,3 +216,33 @@ def worker(
192216
debug=self._client.debug,
193217
owned_loop=loop is None,
194218
)
219+
220+
def declare_workflow(
221+
self,
222+
name: str = "",
223+
on_events: list[str] = [],
224+
on_crons: list[str] = [],
225+
version: str = "",
226+
timeout: str = "60m",
227+
schedule_timeout: str = "5m",
228+
sticky: StickyStrategy | None = None,
229+
default_priority: int = 1,
230+
concurrency: ConcurrencyExpression | None = None,
231+
input_validator: Type[TWorkflowInput] | None = None,
232+
) -> WorkflowDeclaration[TWorkflowInput]:
233+
return WorkflowDeclaration[TWorkflowInput](
234+
WorkflowConfig(
235+
name=name,
236+
on_events=on_events,
237+
on_crons=on_crons,
238+
version=version,
239+
timeout=timeout,
240+
schedule_timeout=schedule_timeout,
241+
sticky=sticky,
242+
default_priority=default_priority,
243+
concurrency=concurrency,
244+
input_validator=input_validator
245+
or cast(Type[TWorkflowInput], EmptyModel),
246+
),
247+
self,
248+
)

hatchet_sdk/v2/workflows.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
from enum import Enum
33
from typing import (
4+
TYPE_CHECKING,
45
Any,
56
Awaitable,
67
Callable,
@@ -30,6 +31,9 @@
3031
from hatchet_sdk.contracts.workflows_pb2 import WorkflowConcurrencyOpts, WorkflowKind
3132
from hatchet_sdk.logger import logger
3233

34+
if TYPE_CHECKING:
35+
from hatchet_sdk.v2 import Hatchet
36+
3337
R = TypeVar("R")
3438
P = ParamSpec("P")
3539

@@ -69,6 +73,9 @@ class ConcurrencyExpression(BaseModel):
6973
limit_strategy: ConcurrencyLimitStrategy
7074

7175

76+
TWorkflowInput = TypeVar("TWorkflowInput", bound=BaseModel, default=EmptyModel)
77+
78+
7279
class WorkflowConfig(BaseModel):
7380
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
7481

@@ -158,8 +165,25 @@ async def acall(self, ctx: Context) -> R:
158165
raise TypeError(f"{self.name} is not an async function. Use `call` instead.")
159166

160167

161-
class Workflow:
162-
config: WorkflowConfig = WorkflowConfig()
168+
class WorkflowDeclaration(Generic[TWorkflowInput]):
169+
def __init__(self, config: WorkflowConfig, hatchet: "Hatchet"):
170+
self.config = config
171+
self.hatchet = hatchet
172+
173+
def run(self, input: TWorkflowInput | None = None) -> Any:
174+
return self.hatchet.admin.run_workflow(
175+
workflow_name=self.config.name, input=input.model_dump() if input else {}
176+
)
177+
178+
179+
class BaseWorkflowImpl:
180+
"""
181+
A Hatchet workflow implementation base. This class should be inherited by all workflow implementations.
182+
183+
Configuration is passed to the workflow implementation via the `config` attribute.
184+
"""
185+
186+
declaration: WorkflowDeclaration
163187

164188
def get_service_name(self, namespace: str) -> str:
165189
return f"{namespace}{self.config.name.lower()}"
@@ -191,6 +215,7 @@ def create_action_name(self, namespace: str, step: Step[Any]) -> str:
191215
return self.get_service_name(namespace) + ":" + step.name
192216

193217
def __init__(self) -> None:
218+
self.config = self.declaration.config
194219
self.config.name = self.config.name or str(self.__class__.__name__)
195220

196221
def get_name(self, namespace: str) -> str:

hatchet_sdk/worker/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
)
3737

3838
if TYPE_CHECKING:
39-
from hatchet_sdk.v2 import Workflow
39+
from hatchet_sdk.v2 import BaseWorkflowImpl
4040

4141
T = TypeVar("T")
4242

@@ -110,7 +110,7 @@ def register_workflow_from_opts(
110110
logger.error(e)
111111
sys.exit(1)
112112

113-
def register_workflow(self, workflow: Union["Workflow", Any]) -> None:
113+
def register_workflow(self, workflow: Union["BaseWorkflowImpl", Any]) -> None:
114114
namespace = self.client.config.namespace
115115

116116
try:

0 commit comments

Comments
 (0)