Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit d983ee1

Browse files
macwilksrhinos
andauthored
Improve Tenacity Verbosity and Limit Retries to Specific Errors (#167)
* Tenacity Patches * Ensure We Reraise Exceptions * Bump Version * Create Common Tenacity Function That Reuses Similar Params --------- Co-authored-by: srhinos <[email protected]>
1 parent 4657229 commit d983ee1

File tree

6 files changed

+50
-47
lines changed

6 files changed

+50
-47
lines changed

hatchet_sdk/clients/admin.py

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import grpc
66
from google.protobuf import timestamp_pb2
7-
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
87

8+
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
99
from hatchet_sdk.clients.run_event_listener import new_listener
1010
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
1111
from hatchet_sdk.connection import new_conn
@@ -169,10 +169,7 @@ async def run(
169169
wrr.workflow_run_id, wrr.workflow_listener, wrr.workflow_run_event_listener
170170
)
171171

172-
@retry(
173-
wait=wait_exponential_jitter(),
174-
stop=stop_after_attempt(5),
175-
)
172+
@tenacity_retry
176173
async def run_workflow(
177174
self, workflow_name: str, input: any, options: TriggerWorkflowOptions = None
178175
) -> WorkflowRunRef:
@@ -200,10 +197,7 @@ async def run_workflow(
200197

201198
raise ValueError(f"gRPC error: {e}")
202199

203-
@retry(
204-
wait=wait_exponential_jitter(),
205-
stop=stop_after_attempt(5),
206-
)
200+
@tenacity_retry
207201
async def put_workflow(
208202
self,
209203
name: str,
@@ -220,10 +214,7 @@ async def put_workflow(
220214
except grpc.RpcError as e:
221215
raise ValueError(f"Could not put workflow: {e}")
222216

223-
@retry(
224-
wait=wait_exponential_jitter(),
225-
stop=stop_after_attempt(5),
226-
)
217+
@tenacity_retry
227218
async def put_rate_limit(
228219
self,
229220
key: str,
@@ -242,10 +233,7 @@ async def put_rate_limit(
242233
except grpc.RpcError as e:
243234
raise ValueError(f"Could not put rate limit: {e}")
244235

245-
@retry(
246-
wait=wait_exponential_jitter(),
247-
stop=stop_after_attempt(5),
248-
)
236+
@tenacity_retry
249237
async def schedule_workflow(
250238
self,
251239
name: str,
@@ -279,10 +267,7 @@ def __init__(self, config: ClientConfig):
279267
self.listener_client = new_listener(config)
280268
self.namespace = config.namespace
281269

282-
@retry(
283-
wait=wait_exponential_jitter(),
284-
stop=stop_after_attempt(5),
285-
)
270+
@tenacity_retry
286271
def put_workflow(
287272
self,
288273
name: str,
@@ -301,10 +286,7 @@ def put_workflow(
301286
except grpc.RpcError as e:
302287
raise ValueError(f"Could not put workflow: {e}")
303288

304-
@retry(
305-
wait=wait_exponential_jitter(),
306-
stop=stop_after_attempt(5),
307-
)
289+
@tenacity_retry
308290
def put_rate_limit(
309291
self,
310292
key: str,
@@ -323,10 +305,7 @@ def put_rate_limit(
323305
except grpc.RpcError as e:
324306
raise ValueError(f"Could not put rate limit: {e}")
325307

326-
@retry(
327-
wait=wait_exponential_jitter(),
328-
stop=stop_after_attempt(5),
329-
)
308+
@tenacity_retry
330309
def schedule_workflow(
331310
self,
332311
name: str,
@@ -349,10 +328,7 @@ def schedule_workflow(
349328

350329
raise ValueError(f"gRPC error: {e}")
351330

352-
@retry(
353-
wait=wait_exponential_jitter(),
354-
stop=stop_after_attempt(5),
355-
)
331+
@tenacity_retry
356332
def run_workflow(
357333
self, workflow_name: str, input: any, options: TriggerWorkflowOptions = None
358334
) -> WorkflowRunRef:

hatchet_sdk/clients/events.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import grpc
66
from google.protobuf import timestamp_pb2
7-
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
87

8+
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
99
from hatchet_sdk.contracts.events_pb2 import (
1010
Event,
1111
PushEventRequest,
@@ -43,10 +43,7 @@ def __init__(self, client: EventsServiceStub, config: ClientConfig):
4343
self.token = config.token
4444
self.namespace = config.namespace
4545

46-
@retry(
47-
wait=wait_exponential_jitter(),
48-
stop=stop_after_attempt(5),
49-
)
46+
@tenacity_retry
5047
def push(self, event_key, payload, options: PushEventOptions = None) -> Event:
5148

5249
namespaced_event_key = self.namespace + event_key
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import grpc
2+
import tenacity
3+
4+
from hatchet_sdk.logger import logger
5+
6+
7+
def tenacity_retry(func):
8+
return tenacity.retry(
9+
reraise=True,
10+
wait=tenacity.wait_exponential_jitter(),
11+
stop=tenacity.stop_after_attempt(5),
12+
before_sleep=tenacity_alert_retry,
13+
retry=tenacity.retry_if_exception(tenacity_should_retry),
14+
)(func)
15+
16+
17+
def tenacity_alert_retry(retry_state: tenacity.RetryCallState) -> None:
18+
"""Called between tenacity retries."""
19+
logger.debug(
20+
f"Retrying {retry_state.fn}: attempt "
21+
f"{retry_state.attempt_number} ended with: {retry_state.outcome}",
22+
)
23+
24+
25+
def tenacity_should_retry(ex: Exception) -> bool:
26+
if isinstance(ex, grpc.aio.AioRpcError):
27+
if ex.code in [
28+
grpc.StatusCode.UNIMPLEMENTED,
29+
grpc.StatusCode.NOT_FOUND,
30+
]:
31+
return False
32+
return True
33+
else:
34+
return False

hatchet_sdk/context/context.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
import traceback
44
from concurrent.futures import Future, ThreadPoolExecutor
55

6-
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
7-
86
from hatchet_sdk.clients.events import EventClient
7+
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_retry
98
from hatchet_sdk.clients.rest_client import RestApi
109
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
1110
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
@@ -85,10 +84,7 @@ def __init__(
8584
self.spawn_index = -1
8685
self.worker = worker
8786

88-
@retry(
89-
wait=wait_exponential_jitter(),
90-
stop=stop_after_attempt(5),
91-
)
87+
@tenacity_retry
9288
async def spawn_workflow(
9389
self,
9490
workflow_name: str,

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "0.36.12"
3+
version = "0.36.13"
44
description = ""
55
authors = ["Alexander Belanger <[email protected]>"]
66
readme = "README.md"
@@ -20,7 +20,7 @@ aiostream = "^0.5.2"
2020
nest-asyncio = "^1.6.0"
2121
aiohttp = "^3.10.5"
2222
aiohttp-retry = "^2.8.3"
23-
tenacity = ">=8.0.0"
23+
tenacity = ">=8.4.1"
2424

2525
[tool.poetry.group.dev.dependencies]
2626
pytest = "^8.2.2"

0 commit comments

Comments
 (0)