Skip to content

Commit e542624

Browse files
Adds support for the Jobs API (#804)
* First commit. Sync client. Updated protos Signed-off-by: Elena Kolevska <[email protected]> * Adds examples Signed-off-by: Elena Kolevska <[email protected]> * Fixes type linter errors Signed-off-by: Elena Kolevska <[email protected]> * Fixes ruff Signed-off-by: Elena Kolevska <[email protected]> * Fixes broken link Signed-off-by: Elena Kolevska <[email protected]> * Adds unit tests for Job class Signed-off-by: Elena Kolevska <[email protected]> * Adds async client Signed-off-by: Elena Kolevska <[email protected]> * Adds Jobs API callback support Signed-off-by: Elena Kolevska <[email protected]> * Revert "Fixes type linter errors" Signed-off-by: Elena Kolevska <[email protected]> * Adds support for failure policies Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]>
1 parent b2f2988 commit e542624

File tree

21 files changed

+1940
-186
lines changed

21 files changed

+1940
-186
lines changed

dapr/aio/clients/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from dapr.clients.base import DaprActorClientBase
1919
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2020
from dapr.aio.clients.grpc.client import DaprGrpcClientAsync, MetadataTuple, InvokeMethodResponse
21+
from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy
2122
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2223
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2324
from dapr.conf import settings
@@ -29,6 +30,10 @@
2930
'DaprActorHttpClient',
3031
'DaprInternalError',
3132
'ERROR_CODE_UNKNOWN',
33+
'Job',
34+
'FailurePolicy',
35+
'DropFailurePolicy',
36+
'ConstantFailurePolicy',
3237
]
3338

3439
from grpc.aio import ( # type: ignore

dapr/aio/clients/grpc/client.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
TransactionalStateOperation,
7979
ConversationInput,
8080
)
81+
from dapr.clients.grpc._jobs import Job
8182
from dapr.clients.grpc._response import (
8283
BindingResponse,
8384
ConversationResponse,
@@ -1847,6 +1848,107 @@ async def get_metadata(self) -> GetMetadataResponse:
18471848
headers=await call.initial_metadata(),
18481849
)
18491850

1851+
async def schedule_job_alpha1(self, job: Job) -> DaprResponse:
1852+
"""Schedules a job to be triggered at a specified time or interval.
1853+
1854+
This is an Alpha API and is subject to change.
1855+
1856+
Args:
1857+
job (Job): The job to schedule. Must have a name and either schedule or due_time.
1858+
1859+
Returns:
1860+
DaprResponse: Empty response indicating successful scheduling.
1861+
1862+
Raises:
1863+
ValueError: If job name is empty or both schedule and due_time are missing.
1864+
DaprGrpcError: If the Dapr runtime returns an error.
1865+
"""
1866+
# Warnings and input validation
1867+
warn(
1868+
'The Jobs API is an Alpha version and is subject to change.',
1869+
UserWarning,
1870+
stacklevel=2,
1871+
)
1872+
validateNotBlankString(job_name=job.name)
1873+
1874+
if not job.schedule and not job.due_time:
1875+
raise ValueError('Job must have either schedule or due_time specified')
1876+
1877+
# Convert job to proto using the Job class private method
1878+
job_proto = job._get_proto()
1879+
request = api_v1.ScheduleJobRequest(job=job_proto)
1880+
1881+
try:
1882+
call = self._stub.ScheduleJobAlpha1(request)
1883+
await call
1884+
return DaprResponse(headers=await call.initial_metadata())
1885+
except grpc.aio.AioRpcError as err:
1886+
raise DaprGrpcError(err) from err
1887+
1888+
async def get_job_alpha1(self, name: str) -> Job:
1889+
"""Gets a scheduled job by name.
1890+
1891+
This is an Alpha API and is subject to change.
1892+
1893+
Args:
1894+
name (str): The name of the job to retrieve.
1895+
1896+
Returns:
1897+
Job: The job details retrieved from the scheduler.
1898+
1899+
Raises:
1900+
ValueError: If job name is empty.
1901+
DaprGrpcError: If the Dapr runtime returns an error.
1902+
"""
1903+
# Warnings and input validation
1904+
warn(
1905+
'The Jobs API is an Alpha version and is subject to change.',
1906+
UserWarning,
1907+
stacklevel=2,
1908+
)
1909+
validateNotBlankString(job_name=name)
1910+
1911+
request = api_v1.GetJobRequest(name=name)
1912+
1913+
try:
1914+
call = self._stub.GetJobAlpha1(request)
1915+
response = await call
1916+
return Job._from_proto(response.job)
1917+
except grpc.aio.AioRpcError as err:
1918+
raise DaprGrpcError(err) from err
1919+
1920+
async def delete_job_alpha1(self, name: str) -> DaprResponse:
1921+
"""Deletes a scheduled job by name.
1922+
1923+
This is an Alpha API and is subject to change.
1924+
1925+
Args:
1926+
name (str): The name of the job to delete.
1927+
1928+
Returns:
1929+
DaprResponse: Empty response indicating successful deletion.
1930+
1931+
Raises:
1932+
ValueError: If job name is empty.
1933+
DaprGrpcError: If the Dapr runtime returns an error.
1934+
"""
1935+
# Warnings and input validation
1936+
warn(
1937+
'The Jobs API is an Alpha version and is subject to change.',
1938+
UserWarning,
1939+
stacklevel=2,
1940+
)
1941+
validateNotBlankString(job_name=name)
1942+
1943+
request = api_v1.DeleteJobRequest(name=name)
1944+
1945+
try:
1946+
call = self._stub.DeleteJobAlpha1(request)
1947+
await call
1948+
return DaprResponse(headers=await call.initial_metadata())
1949+
except grpc.aio.AioRpcError as err:
1950+
raise DaprGrpcError(err) from err
1951+
18501952
async def set_metadata(self, attributeName: str, attributeValue: str) -> DaprResponse:
18511953
"""Adds a custom (extended) metadata attribute to the Dapr sidecar
18521954
information stored by the Metadata endpoint.

dapr/clients/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dapr.clients.base import DaprActorClientBase
2020
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2121
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
22+
from dapr.clients.grpc._jobs import Job, FailurePolicy, DropFailurePolicy, ConstantFailurePolicy
2223
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2324
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2425
from dapr.clients.retry import RetryPolicy
@@ -32,6 +33,10 @@
3233
'DaprActorHttpClient',
3334
'DaprInternalError',
3435
'ERROR_CODE_UNKNOWN',
36+
'Job',
37+
'FailurePolicy',
38+
'DropFailurePolicy',
39+
'ConstantFailurePolicy',
3540
]
3641

3742

dapr/clients/grpc/_jobs.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2025 The Dapr Authors
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
"""
15+
This module contains the Job class and related utilities for the Dapr Jobs API.
16+
"""
17+
18+
from abc import ABC, abstractmethod
19+
from dataclasses import dataclass
20+
from typing import Optional
21+
22+
from google.protobuf.any_pb2 import Any as GrpcAny
23+
from google.protobuf.duration_pb2 import Duration as GrpcDuration
24+
25+
26+
class FailurePolicy(ABC):
27+
"""Abstract base class for job failure policies."""
28+
29+
@abstractmethod
30+
def _to_proto(self):
31+
"""Convert this failure policy to its protobuf representation."""
32+
pass
33+
34+
35+
class DropFailurePolicy(FailurePolicy):
36+
"""A failure policy that drops the job when it fails to trigger.
37+
38+
When a job fails to trigger, it will be dropped and not retried.
39+
"""
40+
41+
def _to_proto(self):
42+
"""Convert to protobuf JobFailurePolicy with drop policy."""
43+
from dapr.proto.common.v1 import common_pb2
44+
45+
return common_pb2.JobFailurePolicy(drop=common_pb2.JobFailurePolicyDrop())
46+
47+
48+
class ConstantFailurePolicy(FailurePolicy):
49+
"""A failure policy that retries the job at constant intervals.
50+
51+
When a job fails to trigger, it will be retried after a constant interval,
52+
up to a maximum number of retries (if specified).
53+
54+
Args:
55+
max_retries (Optional[int]): Maximum number of retries. If None, retries indefinitely.
56+
interval_seconds (Optional[int]): Interval between retries in seconds. Defaults to 30.
57+
"""
58+
59+
def __init__(self, max_retries: Optional[int] = None, interval_seconds: Optional[int] = 30):
60+
self.max_retries = max_retries
61+
self.interval_seconds = interval_seconds
62+
63+
def _to_proto(self):
64+
"""Convert to protobuf JobFailurePolicy with constant policy."""
65+
from dapr.proto.common.v1 import common_pb2
66+
67+
constant_policy = common_pb2.JobFailurePolicyConstant()
68+
69+
if self.interval_seconds is not None:
70+
constant_policy.interval.CopyFrom(GrpcDuration(seconds=self.interval_seconds))
71+
72+
if self.max_retries is not None:
73+
constant_policy.max_retries = self.max_retries
74+
75+
return common_pb2.JobFailurePolicy(constant=constant_policy)
76+
77+
78+
@dataclass
79+
class Job:
80+
"""Represents a Dapr job for scheduling.
81+
82+
At least one of schedule or due_time must be provided but can also be provided together.
83+
84+
Attributes:
85+
name (str): The unique name for the job.
86+
schedule (Optional[str]): Schedule at which the job is to be run.
87+
Accepts both systemd timer style cron expressions, as well as human
88+
readable '@' prefixed period strings.
89+
repeats (Optional[int]): The optional number of times in which the job should be
90+
triggered. If not set, the job will run indefinitely or until expiration.
91+
due_time (Optional[str]): The optional time at which the job should be active, or the
92+
"one shot" time if other scheduling type fields are not provided. Accepts
93+
a "point in time" string in the format of RFC3339, Go duration string
94+
(calculated from job creation time), or non-repeating ISO8601.
95+
ttl (Optional[str]): The optional time to live or expiration of the job. Accepts a
96+
"point in time" string in the format of RFC3339, Go duration string
97+
(calculated from job creation time), or non-repeating ISO8601.
98+
data (Optional[GrpcAny]): The serialized job payload that will be sent to the recipient
99+
when the job is triggered. If not provided, an empty Any proto will be used.
100+
failure_policy (Optional[FailurePolicy]): The failure policy to apply when the job fails
101+
to trigger. If not provided, the default behavior is determined by the Dapr runtime.
102+
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
103+
"""
104+
105+
name: str
106+
schedule: Optional[str] = None
107+
repeats: Optional[int] = None
108+
due_time: Optional[str] = None
109+
ttl: Optional[str] = None
110+
data: Optional[GrpcAny] = None
111+
failure_policy: Optional[FailurePolicy] = None
112+
overwrite: bool = False
113+
114+
def _get_proto(self):
115+
"""Convert this Job instance to a Dapr Job proto message.
116+
117+
This is an internal method for SDK use only. Not part of the public API.
118+
119+
Returns:
120+
api_v1.Job: The proto representation of this job.
121+
"""
122+
from dapr.proto.runtime.v1 import dapr_pb2 as api_v1
123+
from google.protobuf.any_pb2 import Any as GrpcAny
124+
125+
# Build the job proto
126+
job_proto = api_v1.Job(name=self.name, overwrite=self.overwrite)
127+
128+
if self.schedule:
129+
job_proto.schedule = self.schedule
130+
if self.repeats is not None:
131+
job_proto.repeats = self.repeats
132+
if self.due_time:
133+
job_proto.due_time = self.due_time
134+
if self.ttl:
135+
job_proto.ttl = self.ttl
136+
# overwrite is already set in the constructor above
137+
138+
# data field is required, set empty Any if not provided
139+
if self.data:
140+
job_proto.data.CopyFrom(self.data)
141+
else:
142+
# Set empty Any proto
143+
job_proto.data.CopyFrom(GrpcAny())
144+
145+
# Set failure policy if provided
146+
if self.failure_policy:
147+
job_proto.failure_policy.CopyFrom(self.failure_policy._to_proto())
148+
149+
return job_proto
150+
151+
@classmethod
152+
def _from_proto(cls, job_proto):
153+
"""Create a Job instance from a Dapr Job proto message.
154+
155+
This is an internal method for SDK use only. Not part of the public API.
156+
157+
Args:
158+
job_proto (api_v1.Job): The proto message to convert.
159+
160+
Returns:
161+
Job: A new Job instance.
162+
"""
163+
# Parse failure policy if present
164+
failure_policy: Optional[FailurePolicy] = None
165+
if job_proto.HasField('failure_policy'):
166+
policy = job_proto.failure_policy
167+
if policy.HasField('drop'):
168+
failure_policy = DropFailurePolicy()
169+
elif policy.HasField('constant'):
170+
constant = policy.constant
171+
max_retries = constant.max_retries if constant.HasField('max_retries') else None
172+
interval_seconds = None
173+
if constant.HasField('interval'):
174+
interval_seconds = constant.interval.seconds
175+
failure_policy = ConstantFailurePolicy(
176+
max_retries=max_retries, interval_seconds=interval_seconds
177+
)
178+
179+
return cls(
180+
name=job_proto.name,
181+
schedule=job_proto.schedule if job_proto.HasField('schedule') else None,
182+
repeats=job_proto.repeats if job_proto.HasField('repeats') else None,
183+
due_time=job_proto.due_time if job_proto.HasField('due_time') else None,
184+
ttl=job_proto.ttl if job_proto.HasField('ttl') else None,
185+
data=job_proto.data if job_proto.HasField('data') and job_proto.data.value else None,
186+
failure_policy=failure_policy,
187+
overwrite=job_proto.overwrite,
188+
)

dapr/clients/grpc/_request.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,30 @@ class ConversationInput:
435435
content: str
436436
role: Optional[str] = None
437437
scrub_pii: Optional[bool] = None
438+
439+
440+
class JobEvent:
441+
"""Represents a job event received from Dapr runtime.
442+
443+
This matches the Go SDK's common.JobEvent structure and represents
444+
a job that is currently being executed, not a job definition.
445+
446+
Args:
447+
name (str): The name/type of the job being executed.
448+
data (bytes): The raw job data payload.
449+
"""
450+
451+
def __init__(self, name: str, data: bytes = b''):
452+
self.name = name
453+
self.data = data
454+
455+
def get_data_as_string(self, encoding: str = 'utf-8') -> str:
456+
"""Get the job data as a string.
457+
458+
Args:
459+
encoding (str): The encoding to use for decoding bytes. Defaults to 'utf-8'.
460+
461+
Returns:
462+
str: The job data as a string, or empty string if no data.
463+
"""
464+
return self.data.decode(encoding) if self.data else ''

0 commit comments

Comments
 (0)