Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit b664ce5

Browse files
authored
Merge pull request #2 from hatchet-dev/feat--typed-dicts
Remove typed dicts in favor of Pydantic
2 parents 0a208d2 + 292fd4d commit b664ce5

File tree

28 files changed

+379
-391
lines changed

28 files changed

+379
-391
lines changed

examples/affinity-workers/event.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dotenv import load_dotenv
22

3+
from hatchet_sdk.clients.events import PushEventOptions
34
from hatchet_sdk.hatchet import Hatchet
45

56
load_dotenv()
@@ -9,5 +10,5 @@
910
hatchet.event.push(
1011
"affinity:run",
1112
{"test": "test"},
12-
options={"additional_metadata": {"hello": "moon"}},
13+
options=PushEventOptions(additional_metadata={"hello": "moon"}),
1314
)

examples/affinity-workers/worker.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from dotenv import load_dotenv
22

33
from hatchet_sdk import Context, Hatchet, WorkerLabelComparator
4+
from hatchet_sdk.labels import DesiredWorkerLabel
45

56
load_dotenv()
67

@@ -11,12 +12,12 @@
1112
class AffinityWorkflow:
1213
@hatchet.step(
1314
desired_worker_labels={
14-
"model": {"value": "fancy-ai-model-v2", "weight": 10},
15-
"memory": {
16-
"value": 256,
17-
"required": True,
18-
"comparator": WorkerLabelComparator.LESS_THAN,
19-
},
15+
"model": DesiredWorkerLabel(value="fancy-ai-model-v2", weight=10),
16+
"memory": DesiredWorkerLabel(
17+
value=256,
18+
required=True,
19+
comparator=WorkerLabelComparator.LESS_THAN,
20+
),
2021
},
2122
)
2223
async def step(self, context: Context) -> dict[str, str | None]:

examples/bulk_fanout/stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async def main() -> None:
3131
workflowRun = hatchet.admin.run_workflow(
3232
"Parent",
3333
{"n": 2},
34-
options={"additional_metadata": {streamKey: streamVal}},
34+
options=TriggerWorkflowOptions(additional_metadata={streamKey: streamVal}),
3535
)
3636

3737
# Stream all events for the additional meta key value

examples/bulk_fanout/worker.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dotenv import load_dotenv
55

66
from hatchet_sdk import Context, Hatchet
7-
from hatchet_sdk.clients.admin import ChildWorkflowRunDict
7+
from hatchet_sdk.clients.admin import ChildTriggerWorkflowOptions, ChildWorkflowRunDict
88

99
load_dotenv()
1010

@@ -22,18 +22,17 @@ async def spawn(self, context: Context) -> dict[str, list[Any]]:
2222

2323
n = context.workflow_input().get("n", 100)
2424

25-
child_workflow_runs: list[ChildWorkflowRunDict] = []
26-
27-
for i in range(n):
28-
29-
child_workflow_runs.append(
30-
{
31-
"workflow_name": "BulkChild",
32-
"input": {"a": str(i)},
33-
"key": f"child{i}",
34-
"options": {"additional_metadata": {"hello": "earth"}},
35-
}
25+
child_workflow_runs = [
26+
ChildWorkflowRunDict(
27+
workflow_name="BulkChild",
28+
input={"a": str(i)},
29+
key=f"child{i}",
30+
options=ChildTriggerWorkflowOptions(
31+
additional_metadata={"hello": "earth"}
32+
),
3633
)
34+
for i in range(n)
35+
]
3736

3837
if len(child_workflow_runs) == 0:
3938
return {}

examples/dedupe/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from dotenv import load_dotenv
66

7-
from hatchet_sdk import Context, Hatchet
7+
from hatchet_sdk import ChildTriggerWorkflowOptions, Context, Hatchet
88
from hatchet_sdk.clients.admin import DedupeViolationErr
99
from hatchet_sdk.loader import ClientConfig
1010

@@ -29,7 +29,9 @@ async def spawn(self, context: Context) -> dict[str, list[Any]]:
2929
"DedupeChild",
3030
{"a": str(i)},
3131
key=f"child{i}",
32-
options={"additional_metadata": {"dedupe": "test"}},
32+
options=ChildTriggerWorkflowOptions(
33+
additional_metadata={"dedupe": "test"}
34+
),
3335
)
3436
).result()
3537
)

examples/durable_sticky_with_affinity/worker.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33

44
from dotenv import load_dotenv
55

6-
from hatchet_sdk import Context, StickyStrategy, WorkerLabelComparator
6+
from hatchet_sdk import (
7+
ChildTriggerWorkflowOptions,
8+
Context,
9+
StickyStrategy,
10+
WorkerLabelComparator,
11+
)
12+
from hatchet_sdk.labels import DesiredWorkerLabel
713
from hatchet_sdk.v2.callable import DurableContext
814
from hatchet_sdk.v2.hatchet import Hatchet
915

@@ -15,17 +21,17 @@
1521
@hatchet.durable(
1622
sticky=StickyStrategy.HARD,
1723
desired_worker_labels={
18-
"running_workflow": {
19-
"value": "True",
20-
"required": True,
21-
"comparator": WorkerLabelComparator.NOT_EQUAL,
22-
},
24+
"running_workflow": DesiredWorkerLabel(
25+
value="True",
26+
required=True,
27+
comparator=WorkerLabelComparator.NOT_EQUAL,
28+
),
2329
},
2430
)
2531
async def my_durable_func(context: DurableContext) -> dict[str, Any]:
2632
try:
2733
ref = await context.aio.spawn_workflow(
28-
"StickyChildWorkflow", {}, options={"sticky": True}
34+
"StickyChildWorkflow", {}, options=ChildTriggerWorkflowOptions(sticky=True)
2935
)
3036
result = await ref.result()
3137
except Exception as e:
@@ -39,11 +45,11 @@ async def my_durable_func(context: DurableContext) -> dict[str, Any]:
3945
class StickyChildWorkflow:
4046
@hatchet.step(
4147
desired_worker_labels={
42-
"running_workflow": {
43-
"value": "True",
44-
"required": True,
45-
"comparator": WorkerLabelComparator.NOT_EQUAL,
46-
},
48+
"running_workflow": DesiredWorkerLabel(
49+
value="True",
50+
required=True,
51+
comparator=WorkerLabelComparator.NOT_EQUAL,
52+
),
4753
},
4854
)
4955
async def child(self, context: Context) -> dict[str, str | None]:

examples/events/test_event.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,34 @@ async def test_async_event_push(aiohatchet: Hatchet) -> None:
2424
@pytest.mark.asyncio(scope="session")
2525
async def test_async_event_bulk_push(aiohatchet: Hatchet) -> None:
2626

27-
events: List[BulkPushEventWithMetadata] = [
28-
{
29-
"key": "event1",
30-
"payload": {"message": "This is event 1"},
31-
"additional_metadata": {"source": "test", "user_id": "user123"},
32-
},
33-
{
34-
"key": "event2",
35-
"payload": {"message": "This is event 2"},
36-
"additional_metadata": {"source": "test", "user_id": "user456"},
37-
},
38-
{
39-
"key": "event3",
40-
"payload": {"message": "This is event 3"},
41-
"additional_metadata": {"source": "test", "user_id": "user789"},
42-
},
27+
events = [
28+
BulkPushEventWithMetadata(
29+
key="event1",
30+
payload={"message": "This is event 1"},
31+
additional_metadata={"source": "test", "user_id": "user123"},
32+
),
33+
BulkPushEventWithMetadata(
34+
key="event2",
35+
payload={"message": "This is event 2"},
36+
additional_metadata={"source": "test", "user_id": "user456"},
37+
),
38+
BulkPushEventWithMetadata(
39+
key="event3",
40+
payload={"message": "This is event 3"},
41+
additional_metadata={"source": "test", "user_id": "user789"},
42+
),
4343
]
44-
opts: BulkPushEventOptions = {"namespace": "bulk-test"}
44+
opts = BulkPushEventOptions(namespace="bulk-test")
4545

4646
e = await aiohatchet.event.async_bulk_push(events, opts)
4747

4848
assert len(e) == 3
4949

5050
# Sort both lists of events by their key to ensure comparison order
51-
sorted_events = sorted(events, key=lambda x: x["key"])
51+
sorted_events = sorted(events, key=lambda x: x.key)
5252
sorted_returned_events = sorted(e, key=lambda x: x.key)
5353
namespace = "bulk-test"
5454

5555
# Check that the returned events match the original events
5656
for original_event, returned_event in zip(sorted_events, sorted_returned_events):
57-
assert returned_event.key == namespace + original_event["key"]
57+
assert returned_event.key == namespace + original_event.key

examples/fanout/stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async def main() -> None:
3131
workflowRun = hatchet.admin.run_workflow(
3232
"Parent",
3333
{"n": 2},
34-
options={"additional_metadata": {streamKey: streamVal}},
34+
options=TriggerWorkflowOptions(additional_metadata={streamKey: streamVal}),
3535
)
3636

3737
# Stream all events for the additional meta key value

examples/fanout/sync_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def main() -> None:
3131
workflowRun = hatchet.admin.run_workflow(
3232
"Parent",
3333
{"n": 2},
34-
options={"additional_metadata": {streamKey: streamVal}},
34+
options=TriggerWorkflowOptions(additional_metadata={streamKey: streamVal}),
3535
)
3636

3737
# Stream all events for the additional meta key value

examples/fanout/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from dotenv import load_dotenv
55

6-
from hatchet_sdk import Context, Hatchet
6+
from hatchet_sdk import ChildTriggerWorkflowOptions, Context, Hatchet
77

88
load_dotenv()
99

@@ -28,7 +28,9 @@ async def spawn(self, context: Context) -> dict[str, Any]:
2828
"Child",
2929
{"a": str(i)},
3030
key=f"child{i}",
31-
options={"additional_metadata": {"hello": "earth"}},
31+
options=ChildTriggerWorkflowOptions(
32+
additional_metadata={"hello": "earth"}
33+
),
3234
)
3335
).result()
3436
)

0 commit comments

Comments
 (0)