Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit e2187a0

Browse files
authored
fix: sync iter (#225)
* fix: sync iter * release: 0.38.3 * fix: ensure we have a loop
1 parent b748bf6 commit e2187a0

File tree

3 files changed

+74
-1
lines changed

3 files changed

+74
-1
lines changed

examples/fanout/sync_stream.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import asyncio
2+
import base64
3+
import json
4+
import os
5+
import random
6+
7+
from dotenv import load_dotenv
8+
9+
from hatchet_sdk import new_client
10+
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
11+
from hatchet_sdk.clients.run_event_listener import StepRunEventType
12+
from hatchet_sdk.v2.hatchet import Hatchet
13+
14+
15+
def main():
16+
load_dotenv()
17+
hatchet = Hatchet()
18+
19+
# Generate a random stream key to use to track all
20+
# stream events for this workflow run.
21+
22+
streamKey = "streamKey"
23+
streamVal = f"sk-{random.randint(1, 100)}"
24+
25+
# Specify the stream key as additional metadata
26+
# when running the workflow.
27+
28+
# This key gets propagated to all child workflows
29+
# and can have an arbitrary property name.
30+
31+
workflowRun = hatchet.admin.run_workflow(
32+
"Parent",
33+
{"n": 2},
34+
options={"additional_metadata": {streamKey: streamVal}},
35+
)
36+
37+
# Stream all events for the additional meta key value
38+
listener = hatchet.listener.stream_by_additional_metadata(streamKey, streamVal)
39+
40+
for event in listener:
41+
print(event.type, event.payload)
42+
43+
print("DONE.")
44+
45+
46+
if __name__ == "__main__":
47+
main()

hatchet_sdk/clients/run_event_listener.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,31 @@ def abort(self):
9696
def __aiter__(self):
9797
return self._generator()
9898

99+
async def __anext__(self):
100+
return await self._generator().__anext__()
101+
102+
def __iter__(self):
103+
try:
104+
loop = asyncio.get_event_loop()
105+
except RuntimeError as e:
106+
if str(e).startswith("There is no current event loop in thread"):
107+
loop = asyncio.new_event_loop()
108+
asyncio.set_event_loop(loop)
109+
else:
110+
raise e
111+
112+
async_iter = self.__aiter__()
113+
114+
while True:
115+
try:
116+
future = asyncio.ensure_future(async_iter.__anext__())
117+
yield loop.run_until_complete(future)
118+
except StopAsyncIteration:
119+
break
120+
except Exception as e:
121+
print(f"Error in synchronous iterator: {e}")
122+
break
123+
99124
async def _generator(self) -> AsyncGenerator[StepRunEvent, None]:
100125
while True:
101126
if self.stop_signal:
@@ -167,6 +192,7 @@ async def _generator(self) -> AsyncGenerator[StepRunEvent, None]:
167192
# Unknown error, report and break
168193
# logger.error(f"Failed to receive message: {e}")
169194
break
195+
# Raise StopAsyncIteration to properly end the generator
170196

171197
async def retry_subscribe(self):
172198
retries = 0

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "0.38.2"
3+
version = "0.38.3"
44
description = ""
55
authors = ["Alexander Belanger <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)