Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.

Commit 3d78307

Browse files
authored
Fix Tenacity Retry Logic (hatchet-dev#298)
* fix: retry logic in tenacity utils * drive-by: ignore python version * fix: remaining error handling issues * chore: version * fix: catch addtl error type
1 parent 99f625f commit 3d78307

File tree

5 files changed

+100
-131
lines changed

5 files changed

+100
-131
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,4 @@ cython_debug/
162162
#.idea/
163163

164164
openapitools.json
165+
.python-version

hatchet_sdk/clients/admin.py

Lines changed: 92 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@ async def run_workflow(
252252
workflow_listener=self.pooled_workflow_listener,
253253
workflow_run_event_listener=self.listener_client,
254254
)
255-
except grpc.RpcError as e:
255+
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
256256
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
257257
raise DedupeViolationErr(e.details())
258258

259-
raise ValueError(f"gRPC error: {e}")
259+
raise e
260260

261261
@tenacity_retry
262262
async def run_workflows(
@@ -266,56 +266,49 @@ async def run_workflows(
266266
) -> List[WorkflowRunRef]:
267267
if len(workflows) == 0:
268268
raise ValueError("No workflows to run")
269-
try:
270-
if not self.pooled_workflow_listener:
271-
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
272269

273-
namespace = self.namespace
270+
if not self.pooled_workflow_listener:
271+
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
274272

275-
if (
276-
options is not None
277-
and "namespace" in options
278-
and options["namespace"] is not None
279-
):
280-
namespace = options["namespace"]
281-
del options["namespace"]
273+
namespace = self.namespace
282274

283-
workflow_run_requests: TriggerWorkflowRequest = []
275+
if (
276+
options is not None
277+
and "namespace" in options
278+
and options["namespace"] is not None
279+
):
280+
namespace = options["namespace"]
281+
del options["namespace"]
284282

285-
for workflow in workflows:
286-
workflow_name = workflow["workflow_name"]
287-
input_data = workflow["input"]
288-
options = workflow["options"]
283+
workflow_run_requests: TriggerWorkflowRequest = []
289284

290-
if namespace != "" and not workflow_name.startswith(self.namespace):
291-
workflow_name = f"{namespace}{workflow_name}"
285+
for workflow in workflows:
286+
workflow_name = workflow["workflow_name"]
287+
input_data = workflow["input"]
288+
options = workflow["options"]
292289

293-
# Prepare and trigger workflow for each workflow name and input
294-
request = self._prepare_workflow_request(
295-
workflow_name, input_data, options
296-
)
297-
workflow_run_requests.append(request)
290+
if namespace != "" and not workflow_name.startswith(self.namespace):
291+
workflow_name = f"{namespace}{workflow_name}"
298292

299-
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
293+
# Prepare and trigger workflow for each workflow name and input
294+
request = self._prepare_workflow_request(workflow_name, input_data, options)
295+
workflow_run_requests.append(request)
300296

301-
resp: BulkTriggerWorkflowResponse = (
302-
await self.aio_client.BulkTriggerWorkflow(
303-
request,
304-
metadata=get_metadata(self.token),
305-
)
306-
)
297+
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
307298

308-
return [
309-
WorkflowRunRef(
310-
workflow_run_id=workflow_run_id,
311-
workflow_listener=self.pooled_workflow_listener,
312-
workflow_run_event_listener=self.listener_client,
313-
)
314-
for workflow_run_id in resp.workflow_run_ids
315-
]
299+
resp: BulkTriggerWorkflowResponse = await self.aio_client.BulkTriggerWorkflow(
300+
request,
301+
metadata=get_metadata(self.token),
302+
)
316303

317-
except grpc.RpcError as e:
318-
raise ValueError(f"gRPC error: {e}")
304+
return [
305+
WorkflowRunRef(
306+
workflow_run_id=workflow_run_id,
307+
workflow_listener=self.pooled_workflow_listener,
308+
workflow_run_event_listener=self.listener_client,
309+
)
310+
for workflow_run_id in resp.workflow_run_ids
311+
]
319312

320313
@tenacity_retry
321314
async def put_workflow(
@@ -324,15 +317,12 @@ async def put_workflow(
324317
workflow: CreateWorkflowVersionOpts | WorkflowMeta,
325318
overrides: CreateWorkflowVersionOpts | None = None,
326319
) -> WorkflowVersion:
327-
try:
328-
opts = self._prepare_put_workflow_request(name, workflow, overrides)
320+
opts = self._prepare_put_workflow_request(name, workflow, overrides)
329321

330-
return await self.aio_client.PutWorkflow(
331-
opts,
332-
metadata=get_metadata(self.token),
333-
)
334-
except grpc.RpcError as e:
335-
raise ValueError(f"Could not put workflow: {e}")
322+
return await self.aio_client.PutWorkflow(
323+
opts,
324+
metadata=get_metadata(self.token),
325+
)
336326

337327
@tenacity_retry
338328
async def put_rate_limit(
@@ -341,17 +331,14 @@ async def put_rate_limit(
341331
limit: int,
342332
duration: RateLimitDuration = RateLimitDuration.SECOND,
343333
):
344-
try:
345-
await self.aio_client.PutRateLimit(
346-
PutRateLimitRequest(
347-
key=key,
348-
limit=limit,
349-
duration=duration,
350-
),
351-
metadata=get_metadata(self.token),
352-
)
353-
except grpc.RpcError as e:
354-
raise ValueError(f"Could not put rate limit: {e}")
334+
await self.aio_client.PutRateLimit(
335+
PutRateLimitRequest(
336+
key=key,
337+
limit=limit,
338+
duration=duration,
339+
),
340+
metadata=get_metadata(self.token),
341+
)
355342

356343
@tenacity_retry
357344
async def schedule_workflow(
@@ -383,11 +370,11 @@ async def schedule_workflow(
383370
request,
384371
metadata=get_metadata(self.token),
385372
)
386-
except grpc.RpcError as e:
373+
except (grpc.aio.AioRpcError, grpc.RpcError) as e:
387374
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
388375
raise DedupeViolationErr(e.details())
389376

390-
raise ValueError(f"gRPC error: {e}")
377+
raise e
391378

392379

393380
class AdminClient(AdminClientBase):
@@ -408,17 +395,14 @@ def put_workflow(
408395
workflow: CreateWorkflowVersionOpts | WorkflowMeta,
409396
overrides: CreateWorkflowVersionOpts | None = None,
410397
) -> WorkflowVersion:
411-
try:
412-
opts = self._prepare_put_workflow_request(name, workflow, overrides)
398+
opts = self._prepare_put_workflow_request(name, workflow, overrides)
413399

414-
resp: WorkflowVersion = self.client.PutWorkflow(
415-
opts,
416-
metadata=get_metadata(self.token),
417-
)
400+
resp: WorkflowVersion = self.client.PutWorkflow(
401+
opts,
402+
metadata=get_metadata(self.token),
403+
)
418404

419-
return resp
420-
except grpc.RpcError as e:
421-
raise ValueError(f"Could not put workflow: {e}")
405+
return resp
422406

423407
@tenacity_retry
424408
def put_rate_limit(
@@ -427,17 +411,14 @@ def put_rate_limit(
427411
limit: int,
428412
duration: Union[RateLimitDuration.Value, str] = RateLimitDuration.SECOND,
429413
):
430-
try:
431-
self.client.PutRateLimit(
432-
PutRateLimitRequest(
433-
key=key,
434-
limit=limit,
435-
duration=duration,
436-
),
437-
metadata=get_metadata(self.token),
438-
)
439-
except grpc.RpcError as e:
440-
raise ValueError(f"Could not put rate limit: {e}")
414+
self.client.PutRateLimit(
415+
PutRateLimitRequest(
416+
key=key,
417+
limit=limit,
418+
duration=duration,
419+
),
420+
metadata=get_metadata(self.token),
421+
)
441422

442423
@tenacity_retry
443424
def schedule_workflow(
@@ -469,11 +450,11 @@ def schedule_workflow(
469450
request,
470451
metadata=get_metadata(self.token),
471452
)
472-
except grpc.RpcError as e:
453+
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
473454
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
474455
raise DedupeViolationErr(e.details())
475456

476-
raise ValueError(f"gRPC error: {e}")
457+
raise e
477458

478459
## TODO: `options` is treated as a dict (wrong type hint)
479460
## TODO: `any` type hint should come from `typing`
@@ -541,55 +522,49 @@ def run_workflow(
541522
workflow_listener=self.pooled_workflow_listener,
542523
workflow_run_event_listener=self.listener_client,
543524
)
544-
except grpc.RpcError as e:
525+
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
545526
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
546527
raise DedupeViolationErr(e.details())
547528

548-
raise ValueError(f"gRPC error: {e}")
529+
raise e
549530

550531
@tenacity_retry
551532
def run_workflows(
552533
self, workflows: List[WorkflowRunDict], options: TriggerWorkflowOptions = None
553534
) -> list[WorkflowRunRef]:
554535
workflow_run_requests: TriggerWorkflowRequest = []
555-
try:
556-
if not self.pooled_workflow_listener:
557-
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
536+
if not self.pooled_workflow_listener:
537+
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
558538

559-
for workflow in workflows:
560-
workflow_name = workflow["workflow_name"]
561-
input_data = workflow["input"]
562-
options = workflow["options"]
539+
for workflow in workflows:
540+
workflow_name = workflow["workflow_name"]
541+
input_data = workflow["input"]
542+
options = workflow["options"]
563543

564-
namespace = self.namespace
565-
566-
if (
567-
options is not None
568-
and "namespace" in options
569-
and options["namespace"] is not None
570-
):
571-
namespace = options["namespace"]
572-
del options["namespace"]
544+
namespace = self.namespace
573545

574-
if namespace != "" and not workflow_name.startswith(self.namespace):
575-
workflow_name = f"{namespace}{workflow_name}"
546+
if (
547+
options is not None
548+
and "namespace" in options
549+
and options["namespace"] is not None
550+
):
551+
namespace = options["namespace"]
552+
del options["namespace"]
576553

577-
# Prepare and trigger workflow for each workflow name and input
578-
request = self._prepare_workflow_request(
579-
workflow_name, input_data, options
580-
)
554+
if namespace != "" and not workflow_name.startswith(self.namespace):
555+
workflow_name = f"{namespace}{workflow_name}"
581556

582-
workflow_run_requests.append(request)
557+
# Prepare and trigger workflow for each workflow name and input
558+
request = self._prepare_workflow_request(workflow_name, input_data, options)
583559

584-
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
560+
workflow_run_requests.append(request)
585561

586-
resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow(
587-
request,
588-
metadata=get_metadata(self.token),
589-
)
562+
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
590563

591-
except grpc.RpcError as e:
592-
raise ValueError(f"gRPC error: {e}")
564+
resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow(
565+
request,
566+
metadata=get_metadata(self.token),
567+
)
593568

594569
return [
595570
WorkflowRunRef(

hatchet_sdk/clients/events.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,7 @@ def push(self, event_key, payload, options: PushEventOptions = None) -> Event:
125125

126126
span.add_event("Pushing event", attributes={"key": namespaced_event_key})
127127

128-
try:
129-
return self.client.Push(request, metadata=get_metadata(self.token))
130-
except grpc.RpcError as e:
131-
raise ValueError(f"gRPC error: {e}")
128+
return self.client.Push(request, metadata=get_metadata(self.token))
132129

133130
@tenacity_retry
134131
def bulk_push(
@@ -188,13 +185,9 @@ def bulk_push(
188185
bulk_request = BulkPushEventRequest(events=bulk_events)
189186

190187
span.add_event("Pushing bulk events")
191-
try:
192-
response = self.client.BulkPush(
193-
bulk_request, metadata=get_metadata(self.token)
194-
)
195-
return response.events
196-
except grpc.RpcError as e:
197-
raise ValueError(f"gRPC error: {e}")
188+
response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token))
189+
190+
return response.events
198191

199192
def log(self, message: str, step_run_id: str):
200193
try:

hatchet_sdk/clients/rest/tenacity_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ def tenacity_alert_retry(retry_state: tenacity.RetryCallState) -> None:
2828

2929

3030
def tenacity_should_retry(ex: Exception) -> bool:
31-
if isinstance(ex, grpc.aio.AioRpcError):
32-
if ex.code in [
31+
if isinstance(ex, (grpc.aio.AioRpcError, grpc.RpcError)):
32+
if ex.code() in [
3333
grpc.StatusCode.UNIMPLEMENTED,
3434
grpc.StatusCode.NOT_FOUND,
3535
]:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "0.43.2"
3+
version = "0.43.3"
44
description = ""
55
authors = ["Alexander Belanger <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)