Skip to content

Commit 69faf28

Browse files
Adds support for failure policies
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 3fcef31 commit 69faf28

File tree

12 files changed

+391
-73
lines changed

12 files changed

+391
-73
lines changed

dapr/aio/clients/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from dapr.clients.base import DaprActorClientBase
1919
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2020
from dapr.aio.clients.grpc.client import DaprGrpcClientAsync, MetadataTuple, InvokeMethodResponse
21-
from dapr.clients.grpc._jobs import Job
21+
from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy
2222
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2323
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2424
from dapr.conf import settings
@@ -31,6 +31,9 @@
3131
'DaprInternalError',
3232
'ERROR_CODE_UNKNOWN',
3333
'Job',
34+
'FailurePolicy',
35+
'DropFailurePolicy',
36+
'ConstantFailurePolicy',
3437
]
3538

3639
from grpc.aio import ( # type: ignore

dapr/clients/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from dapr.clients.base import DaprActorClientBase
2020
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2121
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
22-
from dapr.clients.grpc._jobs import Job
22+
from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy
2323
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2424
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2525
from dapr.clients.retry import RetryPolicy
@@ -34,6 +34,9 @@
3434
'DaprInternalError',
3535
'ERROR_CODE_UNKNOWN',
3636
'Job',
37+
'FailurePolicy',
38+
'DropFailurePolicy',
39+
'ConstantFailurePolicy',
3740
]
3841

3942

dapr/clients/grpc/_jobs.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,64 @@
1515
This module contains the Job class and related utilities for the Dapr Jobs API.
1616
"""
1717

18+
from abc import ABC, abstractmethod
1819
from dataclasses import dataclass
1920
from typing import Optional
2021

2122
from google.protobuf.any_pb2 import Any as GrpcAny
23+
from google.protobuf.duration_pb2 import Duration as GrpcDuration
24+
25+
26+
class FailurePolicy(ABC):
27+
"""Abstract base class for job failure policies."""
28+
29+
@abstractmethod
30+
def _to_proto(self):
31+
"""Convert this failure policy to its protobuf representation."""
32+
pass
33+
34+
35+
class DropFailurePolicy(FailurePolicy):
36+
"""A failure policy that drops the job when it fails to trigger.
37+
38+
When a job fails to trigger, it will be dropped and not retried.
39+
"""
40+
41+
def _to_proto(self):
42+
"""Convert to protobuf JobFailurePolicy with drop policy."""
43+
from dapr.proto.common.v1 import common_pb2
44+
45+
return common_pb2.JobFailurePolicy(drop=common_pb2.JobFailurePolicyDrop())
46+
47+
48+
class ConstantFailurePolicy(FailurePolicy):
49+
"""A failure policy that retries the job at constant intervals.
50+
51+
When a job fails to trigger, it will be retried after a constant interval,
52+
up to a maximum number of retries (if specified).
53+
54+
Args:
55+
max_retries (Optional[int]): Maximum number of retries. If None, retries indefinitely.
56+
interval_seconds (Optional[int]): Interval between retries in seconds. Defaults to 30.
57+
"""
58+
59+
def __init__(self, max_retries: Optional[int] = None, interval_seconds: Optional[int] = 30):
60+
self.max_retries = max_retries
61+
self.interval_seconds = interval_seconds
62+
63+
def _to_proto(self):
64+
"""Convert to protobuf JobFailurePolicy with constant policy."""
65+
from dapr.proto.common.v1 import common_pb2
66+
67+
constant_policy = common_pb2.JobFailurePolicyConstant()
68+
69+
if self.interval_seconds is not None:
70+
constant_policy.interval.CopyFrom(GrpcDuration(seconds=self.interval_seconds))
71+
72+
if self.max_retries is not None:
73+
constant_policy.max_retries = self.max_retries
74+
75+
return common_pb2.JobFailurePolicy(constant=constant_policy)
2276

2377

2478
@dataclass
@@ -43,6 +97,8 @@ class Job:
4397
(calculated from job creation time), or non-repeating ISO8601.
4498
data (Optional[GrpcAny]): The serialized job payload that will be sent to the recipient
4599
when the job is triggered. If not provided, an empty Any proto will be used.
100+
failure_policy (Optional[FailurePolicy]): The failure policy to apply when the job fails
101+
to trigger. If not provided, the default behavior is determined by the Dapr runtime.
46102
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
47103
"""
48104

@@ -52,6 +108,7 @@ class Job:
52108
due_time: Optional[str] = None
53109
ttl: Optional[str] = None
54110
data: Optional[GrpcAny] = None
111+
failure_policy: Optional[FailurePolicy] = None
55112
overwrite: bool = False
56113

57114
def _get_proto(self):
@@ -85,6 +142,10 @@ def _get_proto(self):
85142
# Set empty Any proto
86143
job_proto.data.CopyFrom(GrpcAny())
87144

145+
# Set failure policy if provided
146+
if self.failure_policy:
147+
job_proto.failure_policy.CopyFrom(self.failure_policy._to_proto())
148+
88149
return job_proto
89150

90151
@classmethod
@@ -99,12 +160,29 @@ def _from_proto(cls, job_proto):
99160
Returns:
100161
Job: A new Job instance.
101162
"""
163+
# Parse failure policy if present
164+
failure_policy: Optional[FailurePolicy] = None
165+
if job_proto.HasField('failure_policy'):
166+
policy = job_proto.failure_policy
167+
if policy.HasField('drop'):
168+
failure_policy = DropFailurePolicy()
169+
elif policy.HasField('constant'):
170+
constant = policy.constant
171+
max_retries = constant.max_retries if constant.HasField('max_retries') else None
172+
interval_seconds = None
173+
if constant.HasField('interval'):
174+
interval_seconds = constant.interval.seconds
175+
failure_policy = ConstantFailurePolicy(
176+
max_retries=max_retries, interval_seconds=interval_seconds
177+
)
178+
102179
return cls(
103180
name=job_proto.name,
104181
schedule=job_proto.schedule if job_proto.HasField('schedule') else None,
105182
repeats=job_proto.repeats if job_proto.HasField('repeats') else None,
106183
due_time=job_proto.due_time if job_proto.HasField('due_time') else None,
107184
ttl=job_proto.ttl if job_proto.HasField('ttl') else None,
108185
data=job_proto.data if job_proto.HasField('data') and job_proto.data.value else None,
186+
failure_policy=failure_policy,
109187
overwrite=job_proto.overwrite,
110188
)

dapr/proto/common/v1/common_pb2.py

Lines changed: 34 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dapr/proto/common/v1/common_pb2.pyi

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import builtins
1818
import collections.abc
1919
import google.protobuf.any_pb2
2020
import google.protobuf.descriptor
21+
import google.protobuf.duration_pb2
2122
import google.protobuf.internal.containers
2223
import google.protobuf.internal.enum_type_wrapper
2324
import google.protobuf.message
@@ -373,3 +374,67 @@ class ConfigurationItem(google.protobuf.message.Message):
373374
def ClearField(self, field_name: typing.Literal["metadata", b"metadata", "value", b"value", "version", b"version"]) -> None: ...
374375

375376
global___ConfigurationItem = ConfigurationItem
377+
378+
@typing.final
379+
class JobFailurePolicy(google.protobuf.message.Message):
380+
"""JobFailurePolicy defines the policy to apply when a job fails to trigger."""
381+
382+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
383+
384+
DROP_FIELD_NUMBER: builtins.int
385+
CONSTANT_FIELD_NUMBER: builtins.int
386+
@property
387+
def drop(self) -> global___JobFailurePolicyDrop: ...
388+
@property
389+
def constant(self) -> global___JobFailurePolicyConstant: ...
390+
def __init__(
391+
self,
392+
*,
393+
drop: global___JobFailurePolicyDrop | None = ...,
394+
constant: global___JobFailurePolicyConstant | None = ...,
395+
) -> None: ...
396+
def HasField(self, field_name: typing.Literal["constant", b"constant", "drop", b"drop", "policy", b"policy"]) -> builtins.bool: ...
397+
def ClearField(self, field_name: typing.Literal["constant", b"constant", "drop", b"drop", "policy", b"policy"]) -> None: ...
398+
def WhichOneof(self, oneof_group: typing.Literal["policy", b"policy"]) -> typing.Literal["drop", "constant"] | None: ...
399+
400+
global___JobFailurePolicy = JobFailurePolicy
401+
402+
@typing.final
403+
class JobFailurePolicyDrop(google.protobuf.message.Message):
404+
"""JobFailurePolicyDrop is a policy which drops the job tick when the job fails to trigger."""
405+
406+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
407+
408+
def __init__(
409+
self,
410+
) -> None: ...
411+
412+
global___JobFailurePolicyDrop = JobFailurePolicyDrop
413+
414+
@typing.final
415+
class JobFailurePolicyConstant(google.protobuf.message.Message):
416+
"""JobFailurePolicyConstant is a policy which retries the job at a consistent interval when the job fails to trigger."""
417+
418+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
419+
420+
INTERVAL_FIELD_NUMBER: builtins.int
421+
MAX_RETRIES_FIELD_NUMBER: builtins.int
422+
max_retries: builtins.int
423+
"""max_retries is the optional maximum number of retries to attempt before giving up.
424+
If unset, the Job will be retried indefinitely.
425+
"""
426+
@property
427+
def interval(self) -> google.protobuf.duration_pb2.Duration:
428+
"""interval is the constant delay to wait before retrying the job."""
429+
430+
def __init__(
431+
self,
432+
*,
433+
interval: google.protobuf.duration_pb2.Duration | None = ...,
434+
max_retries: builtins.int | None = ...,
435+
) -> None: ...
436+
def HasField(self, field_name: typing.Literal["_max_retries", b"_max_retries", "interval", b"interval", "max_retries", b"max_retries"]) -> builtins.bool: ...
437+
def ClearField(self, field_name: typing.Literal["_max_retries", b"_max_retries", "interval", b"interval", "max_retries", b"max_retries"]) -> None: ...
438+
def WhichOneof(self, oneof_group: typing.Literal["_max_retries", b"_max_retries"]) -> typing.Literal["max_retries"] | None: ...
439+
440+
global___JobFailurePolicyConstant = JobFailurePolicyConstant

dapr/proto/runtime/v1/dapr_pb2.py

Lines changed: 30 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)