Skip to content

Commit bd04980

Browse files
First commit. Sync client. Updated protos
Signed-off-by: Elena Kolevska <[email protected]>
1 parent ea45284 commit bd04980

File tree

7 files changed

+582
-154
lines changed

7 files changed

+582
-154
lines changed

dapr/clients/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dapr.clients.base import DaprActorClientBase
2020
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2121
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
22+
from dapr.clients.grpc._jobs import Job
2223
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2324
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2425
from dapr.clients.retry import RetryPolicy
@@ -32,6 +33,7 @@
3233
'DaprActorHttpClient',
3334
'DaprInternalError',
3435
'ERROR_CODE_UNKNOWN',
36+
'Job',
3537
]
3638

3739

dapr/clients/grpc/_jobs.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# Copyright 2025 The Dapr Authors
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
"""
15+
This module contains the Job class and related utilities for the Dapr Jobs API.
16+
"""
17+
18+
from dataclasses import dataclass
19+
from typing import Optional
20+
21+
from google.protobuf.any_pb2 import Any as GrpcAny
22+
23+
24+
@dataclass
25+
class Job:
26+
"""Represents a Dapr job for scheduling.
27+
28+
At least one of schedule or due_time must be provided but can also be provided together.
29+
30+
Attributes:
31+
name (str): The unique name for the job.
32+
schedule (Optional[str]): Schedule at which the job is to be run.
33+
Accepts both systemd timer style cron expressions, as well as human
34+
readable '@' prefixed period strings.
35+
repeats (Optional[int]): The optional number of times in which the job should be
36+
triggered. If not set, the job will run indefinitely or until expiration.
37+
due_time (Optional[str]): The optional time at which the job should be active, or the
38+
"one shot" time if other scheduling type fields are not provided. Accepts
39+
a "point in time" string in the format of RFC3339, Go duration string
40+
(calculated from job creation time), or non-repeating ISO8601.
41+
ttl (Optional[str]): The optional time to live or expiration of the job. Accepts a
42+
"point in time" string in the format of RFC3339, Go duration string
43+
(calculated from job creation time), or non-repeating ISO8601.
44+
data (Optional[GrpcAny]): The serialized job payload that will be sent to the recipient
45+
when the job is triggered. If not provided, an empty Any proto will be used.
46+
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
47+
"""
48+
49+
name: str
50+
schedule: Optional[str] = None
51+
repeats: Optional[int] = None
52+
due_time: Optional[str] = None
53+
ttl: Optional[str] = None
54+
data: Optional[GrpcAny] = None
55+
overwrite: bool = False
56+
57+
def _get_proto(self):
58+
"""Convert this Job instance to a Dapr Job proto message.
59+
60+
This is an internal method for SDK use only. Not part of the public API.
61+
62+
Returns:
63+
api_v1.Job: The proto representation of this job.
64+
"""
65+
from dapr.proto.runtime.v1 import dapr_pb2 as api_v1
66+
from google.protobuf.any_pb2 import Any as GrpcAny
67+
68+
# Build the job proto
69+
job_proto = api_v1.Job(name=self.name, overwrite=self.overwrite)
70+
71+
if self.schedule:
72+
job_proto.schedule = self.schedule
73+
if self.repeats is not None:
74+
job_proto.repeats = self.repeats
75+
if self.due_time:
76+
job_proto.due_time = self.due_time
77+
if self.ttl:
78+
job_proto.ttl = self.ttl
79+
# overwrite is already set in the constructor above
80+
81+
# data field is required, set empty Any if not provided
82+
if self.data:
83+
job_proto.data.CopyFrom(self.data)
84+
else:
85+
# Set empty Any proto
86+
job_proto.data.CopyFrom(GrpcAny())
87+
88+
return job_proto
89+
90+
@classmethod
91+
def _from_proto(cls, job_proto):
92+
"""Create a Job instance from a Dapr Job proto message.
93+
94+
This is an internal method for SDK use only. Not part of the public API.
95+
96+
Args:
97+
job_proto (api_v1.Job): The proto message to convert.
98+
99+
Returns:
100+
Job: A new Job instance.
101+
"""
102+
return cls(
103+
name=job_proto.name,
104+
schedule=job_proto.schedule if job_proto.HasField('schedule') else None,
105+
repeats=job_proto.repeats if job_proto.HasField('repeats') else None,
106+
due_time=job_proto.due_time if job_proto.HasField('due_time') else None,
107+
ttl=job_proto.ttl if job_proto.HasField('ttl') else None,
108+
data=job_proto.data if job_proto.HasField('data') else None,
109+
overwrite=job_proto.overwrite,
110+
)

dapr/clients/grpc/client.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
DecryptRequestIterator,
6868
ConversationInput,
6969
)
70+
from dapr.clients.grpc._jobs import Job
7071
from dapr.clients.grpc._response import (
7172
BindingResponse,
7273
DaprResponse,
@@ -1773,6 +1774,101 @@ def converse_alpha1(
17731774
except RpcError as err:
17741775
raise DaprGrpcError(err) from err
17751776

1777+
def schedule_job_alpha1(self, job: Job) -> DaprResponse:
1778+
"""Schedules a job to be triggered at a specified time or interval.
1779+
1780+
This is an Alpha API and is subject to change.
1781+
1782+
Args:
1783+
job (Job): The job to schedule. Must have a name and either schedule or due_time.
1784+
1785+
Returns:
1786+
DaprResponse: Empty response indicating successful scheduling.
1787+
1788+
Raises:
1789+
ValueError: If job name is empty or both schedule and due_time are missing.
1790+
DaprGrpcError: If the Dapr runtime returns an error.
1791+
"""
1792+
# Warnings and input validation
1793+
warn(
1794+
'The Jobs API is an Alpha version and is subject to change.',
1795+
UserWarning,
1796+
stacklevel=2,
1797+
)
1798+
validateNotBlankString(job_name=job.name)
1799+
1800+
# Convert job to proto using the Job class private method
1801+
job_proto = job._get_proto()
1802+
request = api_v1.ScheduleJobRequest(job=job_proto)
1803+
1804+
try:
1805+
_, call = self.retry_policy.run_rpc(self._stub.ScheduleJobAlpha1.with_call, request)
1806+
return DaprResponse(headers=call.initial_metadata())
1807+
except RpcError as err:
1808+
raise DaprGrpcError(err) from err
1809+
1810+
def get_job_alpha1(self, name: str) -> Job:
1811+
"""Gets a scheduled job by name.
1812+
1813+
This is an Alpha API and is subject to change.
1814+
1815+
Args:
1816+
name (str): The name of the job to retrieve.
1817+
1818+
Returns:
1819+
Job: The job details retrieved from the scheduler.
1820+
1821+
Raises:
1822+
ValueError: If job name is empty.
1823+
DaprGrpcError: If the Dapr runtime returns an error.
1824+
"""
1825+
# Warnings and input validation
1826+
warn(
1827+
'The Jobs API is an Alpha version and is subject to change.',
1828+
UserWarning,
1829+
stacklevel=2,
1830+
)
1831+
validateNotBlankString(job_name=name)
1832+
1833+
request = api_v1.GetJobRequest(name=name)
1834+
1835+
try:
1836+
response, call = self.retry_policy.run_rpc(self._stub.GetJobAlpha1.with_call, request)
1837+
return Job._from_proto(response.job)
1838+
except RpcError as err:
1839+
raise DaprGrpcError(err) from err
1840+
1841+
def delete_job_alpha1(self, name: str) -> DaprResponse:
1842+
"""Deletes a scheduled job by name.
1843+
1844+
This is an Alpha API and is subject to change.
1845+
1846+
Args:
1847+
name (str): The name of the job to delete.
1848+
1849+
Returns:
1850+
DaprResponse: Empty response indicating successful deletion.
1851+
1852+
Raises:
1853+
ValueError: If job name is empty.
1854+
DaprGrpcError: If the Dapr runtime returns an error.
1855+
"""
1856+
# Warnings and input validation
1857+
warn(
1858+
'The Jobs API is an Alpha version and is subject to change.',
1859+
UserWarning,
1860+
stacklevel=2,
1861+
)
1862+
validateNotBlankString(job_name=name)
1863+
1864+
request = api_v1.DeleteJobRequest(name=name)
1865+
1866+
try:
1867+
_, call = self.retry_policy.run_rpc(self._stub.DeleteJobAlpha1.with_call, request)
1868+
return DaprResponse(headers=call.initial_metadata())
1869+
except RpcError as err:
1870+
raise DaprGrpcError(err) from err
1871+
17761872
def wait(self, timeout_s: float):
17771873
"""Waits for sidecar to be available within the timeout.
17781874

dapr/proto/runtime/v1/dapr_pb2.py

Lines changed: 150 additions & 148 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dapr/proto/runtime/v1/dapr_pb2.pyi

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ class GetMetadataResponse(google.protobuf.message.Message):
16131613
RUNTIME_VERSION_FIELD_NUMBER: builtins.int
16141614
ENABLED_FEATURES_FIELD_NUMBER: builtins.int
16151615
ACTOR_RUNTIME_FIELD_NUMBER: builtins.int
1616+
SCHEDULER_FIELD_NUMBER: builtins.int
16161617
id: builtins.str
16171618
runtime_version: builtins.str
16181619
@property
@@ -1632,9 +1633,9 @@ class GetMetadataResponse(google.protobuf.message.Message):
16321633
@property
16331634
def enabled_features(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
16341635
@property
1635-
def actor_runtime(self) -> global___ActorRuntime:
1636-
"""TODO: Cassie: probably add scheduler runtime status"""
1637-
1636+
def actor_runtime(self) -> global___ActorRuntime: ...
1637+
@property
1638+
def scheduler(self) -> global___MetadataScheduler: ...
16381639
def __init__(
16391640
self,
16401641
*,
@@ -1648,12 +1649,36 @@ class GetMetadataResponse(google.protobuf.message.Message):
16481649
runtime_version: builtins.str = ...,
16491650
enabled_features: collections.abc.Iterable[builtins.str] | None = ...,
16501651
actor_runtime: global___ActorRuntime | None = ...,
1652+
scheduler: global___MetadataScheduler | None = ...,
16511653
) -> None: ...
1652-
def HasField(self, field_name: typing.Literal["actor_runtime", b"actor_runtime", "app_connection_properties", b"app_connection_properties"]) -> builtins.bool: ...
1653-
def ClearField(self, field_name: typing.Literal["active_actors_count", b"active_actors_count", "actor_runtime", b"actor_runtime", "app_connection_properties", b"app_connection_properties", "enabled_features", b"enabled_features", "extended_metadata", b"extended_metadata", "http_endpoints", b"http_endpoints", "id", b"id", "registered_components", b"registered_components", "runtime_version", b"runtime_version", "subscriptions", b"subscriptions"]) -> None: ...
1654+
def HasField(self, field_name: typing.Literal["_scheduler", b"_scheduler", "actor_runtime", b"actor_runtime", "app_connection_properties", b"app_connection_properties", "scheduler", b"scheduler"]) -> builtins.bool: ...
1655+
def ClearField(self, field_name: typing.Literal["_scheduler", b"_scheduler", "active_actors_count", b"active_actors_count", "actor_runtime", b"actor_runtime", "app_connection_properties", b"app_connection_properties", "enabled_features", b"enabled_features", "extended_metadata", b"extended_metadata", "http_endpoints", b"http_endpoints", "id", b"id", "registered_components", b"registered_components", "runtime_version", b"runtime_version", "scheduler", b"scheduler", "subscriptions", b"subscriptions"]) -> None: ...
1656+
def WhichOneof(self, oneof_group: typing.Literal["_scheduler", b"_scheduler"]) -> typing.Literal["scheduler"] | None: ...
16541657

16551658
global___GetMetadataResponse = GetMetadataResponse
16561659

1660+
@typing.final
1661+
class MetadataScheduler(google.protobuf.message.Message):
1662+
"""MetadataScheduler is a message that contains the list of addresses of the
1663+
scheduler connections.
1664+
"""
1665+
1666+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
1667+
1668+
CONNECTED_ADDRESSES_FIELD_NUMBER: builtins.int
1669+
@property
1670+
def connected_addresses(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
1671+
"""connected_addresses the list of addresses of the scheduler connections."""
1672+
1673+
def __init__(
1674+
self,
1675+
*,
1676+
connected_addresses: collections.abc.Iterable[builtins.str] | None = ...,
1677+
) -> None: ...
1678+
def ClearField(self, field_name: typing.Literal["connected_addresses", b"connected_addresses"]) -> None: ...
1679+
1680+
global___MetadataScheduler = MetadataScheduler
1681+
16571682
@typing.final
16581683
class ActorRuntime(google.protobuf.message.Message):
16591684
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -3141,6 +3166,7 @@ class Job(google.protobuf.message.Message):
31413166
DUE_TIME_FIELD_NUMBER: builtins.int
31423167
TTL_FIELD_NUMBER: builtins.int
31433168
DATA_FIELD_NUMBER: builtins.int
3169+
OVERWRITE_FIELD_NUMBER: builtins.int
31443170
name: builtins.str
31453171
"""The unique name for the job."""
31463172
schedule: builtins.str
@@ -3180,6 +3206,8 @@ class Job(google.protobuf.message.Message):
31803206
"point in time" string in the format of RFC3339, Go duration string
31813207
(calculated from job creation time), or non-repeating ISO8601.
31823208
"""
3209+
overwrite: builtins.bool
3210+
"""If true, allows this job to overwrite an existing job with the same name."""
31833211
@property
31843212
def data(self) -> google.protobuf.any_pb2.Any:
31853213
"""payload is the serialized job payload that will be sent to the recipient
@@ -3195,9 +3223,10 @@ class Job(google.protobuf.message.Message):
31953223
due_time: builtins.str | None = ...,
31963224
ttl: builtins.str | None = ...,
31973225
data: google.protobuf.any_pb2.Any | None = ...,
3226+
overwrite: builtins.bool = ...,
31983227
) -> None: ...
31993228
def HasField(self, field_name: typing.Literal["_due_time", b"_due_time", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> builtins.bool: ...
3200-
def ClearField(self, field_name: typing.Literal["_due_time", b"_due_time", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "name", b"name", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> None: ...
3229+
def ClearField(self, field_name: typing.Literal["_due_time", b"_due_time", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "name", b"name", "overwrite", b"overwrite", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> None: ...
32013230
@typing.overload
32023231
def WhichOneof(self, oneof_group: typing.Literal["_due_time", b"_due_time"]) -> typing.Literal["due_time"] | None: ...
32033232
@typing.overload

tests/clients/fake_dapr_server.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(self, grpc_port: int = 50001, http_port: int = 8080):
5454
self.workflow_status = {}
5555
self.workflow_options: Dict[str, str] = {}
5656
self.metadata: Dict[str, str] = {}
57+
self.jobs: Dict[str, api_v1.Job] = {}
5758
self._next_exception = None
5859

5960
def start(self):
@@ -536,6 +537,44 @@ def ConverseAlpha1(self, request, context):
536537

537538
return api_v1.ConversationResponse(contextID=request.contextID, outputs=outputs)
538539

540+
def ScheduleJobAlpha1(self, request, context):
541+
self.check_for_exception(context)
542+
543+
# Validate job name
544+
if not request.job.name:
545+
raise ValueError('Job name is required')
546+
547+
# Store the job
548+
self.jobs[request.job.name] = request.job
549+
550+
return empty_pb2.Empty()
551+
552+
def GetJobAlpha1(self, request, context):
553+
self.check_for_exception(context)
554+
555+
# Validate job name
556+
if not request.name:
557+
raise ValueError('Job name is required')
558+
559+
# Check if job exists
560+
if request.name not in self.jobs:
561+
raise Exception(f'Job "{request.name}" not found')
562+
563+
return api_v1.GetJobResponse(job=self.jobs[request.name])
564+
565+
def DeleteJobAlpha1(self, request, context):
566+
self.check_for_exception(context)
567+
568+
# Validate job name
569+
if not request.name:
570+
raise ValueError('Job name is required')
571+
572+
# Check if job exists (optional - some implementations might not error)
573+
if request.name in self.jobs:
574+
del self.jobs[request.name]
575+
576+
return empty_pb2.Empty()
577+
539578
def SetMetadata(self, request: SetMetadataRequest, context):
540579
self.metadata[request.key] = request.value
541580
return empty_pb2.Empty()

0 commit comments

Comments
 (0)