Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1851,13 +1851,14 @@ async def get_metadata(self) -> GetMetadataResponse:
headers=await call.initial_metadata(),
)

async def schedule_job_alpha1(self, job: Job) -> DaprResponse:
async def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Schedules a job to be triggered at a specified time or interval.

This is an Alpha API and is subject to change.

Args:
job (Job): The job to schedule. Must have a name and either schedule or due_time.
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.

Returns:
DaprResponse: Empty response indicating successful scheduling.
Expand All @@ -1879,7 +1880,7 @@ async def schedule_job_alpha1(self, job: Job) -> DaprResponse:

# Convert job to proto using the Job class private method
job_proto = job._get_proto()
request = api_v1.ScheduleJobRequest(job=job_proto)
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)

try:
call = self._stub.ScheduleJobAlpha1(request)
Expand Down
6 changes: 1 addition & 5 deletions dapr/clients/grpc/_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class Job:
when the job is triggered. If not provided, an empty Any proto will be used.
failure_policy (Optional[FailurePolicy]): The failure policy to apply when the job fails
to trigger. If not provided, the default behavior is determined by the Dapr runtime.
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
"""

name: str
Expand All @@ -109,7 +108,6 @@ class Job:
ttl: Optional[str] = None
data: Optional[GrpcAny] = None
failure_policy: Optional[FailurePolicy] = None
overwrite: bool = False

def _get_proto(self):
"""Convert this Job instance to a Dapr Job proto message.
Expand All @@ -123,7 +121,7 @@ def _get_proto(self):
from google.protobuf.any_pb2 import Any as GrpcAny

# Build the job proto
job_proto = api_v1.Job(name=self.name, overwrite=self.overwrite)
job_proto = api_v1.Job(name=self.name)

if self.schedule:
job_proto.schedule = self.schedule
Expand All @@ -133,7 +131,6 @@ def _get_proto(self):
job_proto.due_time = self.due_time
if self.ttl:
job_proto.ttl = self.ttl
# overwrite is already set in the constructor above

# data field is required, set empty Any if not provided
if self.data:
Expand Down Expand Up @@ -184,5 +181,4 @@ def _from_proto(cls, job_proto):
ttl=job_proto.ttl if job_proto.HasField('ttl') else None,
data=job_proto.data if job_proto.HasField('data') and job_proto.data.value else None,
failure_policy=failure_policy,
overwrite=job_proto.overwrite,
)
5 changes: 3 additions & 2 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1778,13 +1778,14 @@ def converse_alpha1(
except RpcError as err:
raise DaprGrpcError(err) from err

def schedule_job_alpha1(self, job: Job) -> DaprResponse:
def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
"""Schedules a job to be triggered at a specified time or interval.

This is an Alpha API and is subject to change.

Args:
job (Job): The job to schedule. Must have a name and either schedule or due_time.
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.

Returns:
DaprResponse: Empty response indicating successful scheduling.
Expand All @@ -1806,7 +1807,7 @@ def schedule_job_alpha1(self, job: Job) -> DaprResponse:

# Convert job to proto using the Job class private method
job_proto = job._get_proto()
request = api_v1.ScheduleJobRequest(job=job_proto)
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)

try:
_, call = self.retry_policy.run_rpc(self._stub.ScheduleJobAlpha1.with_call, request)
Expand Down
651 changes: 324 additions & 327 deletions dapr/proto/runtime/v1/dapr_pb2.py

Large diffs are not rendered by default.

37 changes: 10 additions & 27 deletions dapr/proto/runtime/v1/dapr_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import google.protobuf.struct_pb2
import google.protobuf.timestamp_pb2
import sys
import typing
Expand Down Expand Up @@ -3166,7 +3167,6 @@ class Job(google.protobuf.message.Message):
DUE_TIME_FIELD_NUMBER: builtins.int
TTL_FIELD_NUMBER: builtins.int
DATA_FIELD_NUMBER: builtins.int
OVERWRITE_FIELD_NUMBER: builtins.int
FAILURE_POLICY_FIELD_NUMBER: builtins.int
name: builtins.str
"""The unique name for the job."""
Expand Down Expand Up @@ -3207,8 +3207,6 @@ class Job(google.protobuf.message.Message):
"point in time" string in the format of RFC3339, Go duration string
(calculated from job creation time), or non-repeating ISO8601.
"""
overwrite: builtins.bool
"""If true, allows this job to overwrite an existing job with the same name."""
@property
def data(self) -> google.protobuf.any_pb2.Any:
"""payload is the serialized job payload that will be sent to the recipient
Expand All @@ -3228,11 +3226,10 @@ class Job(google.protobuf.message.Message):
due_time: builtins.str | None = ...,
ttl: builtins.str | None = ...,
data: google.protobuf.any_pb2.Any | None = ...,
overwrite: builtins.bool = ...,
failure_policy: dapr.proto.common.v1.common_pb2.JobFailurePolicy | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["_due_time", b"_due_time", "_failure_policy", b"_failure_policy", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "failure_policy", b"failure_policy", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_due_time", b"_due_time", "_failure_policy", b"_failure_policy", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "failure_policy", b"failure_policy", "name", b"name", "overwrite", b"overwrite", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> None: ...
def ClearField(self, field_name: typing.Literal["_due_time", b"_due_time", "_failure_policy", b"_failure_policy", "_repeats", b"_repeats", "_schedule", b"_schedule", "_ttl", b"_ttl", "data", b"data", "due_time", b"due_time", "failure_policy", b"failure_policy", "name", b"name", "repeats", b"repeats", "schedule", b"schedule", "ttl", b"ttl"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing.Literal["_due_time", b"_due_time"]) -> typing.Literal["due_time"] | None: ...
@typing.overload
Expand All @@ -3253,6 +3250,9 @@ class ScheduleJobRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

JOB_FIELD_NUMBER: builtins.int
OVERWRITE_FIELD_NUMBER: builtins.int
overwrite: builtins.bool
"""If true, allows this job to overwrite an existing job with the same name."""
@property
def job(self) -> global___Job:
"""The job details."""
Expand All @@ -3261,9 +3261,10 @@ class ScheduleJobRequest(google.protobuf.message.Message):
self,
*,
job: global___Job | None = ...,
overwrite: builtins.bool = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["job", b"job"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["job", b"job"]) -> None: ...
def ClearField(self, field_name: typing.Literal["job", b"job", "overwrite", b"overwrite"]) -> None: ...

global___ScheduleJobRequest = ScheduleJobRequest

Expand Down Expand Up @@ -4103,24 +4104,6 @@ class ConversationToolsFunction(google.protobuf.message.Message):

DESCRIPTOR: google.protobuf.descriptor.Descriptor

@typing.final
class ParametersEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

KEY_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
key: builtins.str
@property
def value(self) -> google.protobuf.any_pb2.Any: ...
def __init__(
self,
*,
key: builtins.str = ...,
value: google.protobuf.any_pb2.Any | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["value", b"value"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["key", b"key", "value", b"value"]) -> None: ...

NAME_FIELD_NUMBER: builtins.int
DESCRIPTION_FIELD_NUMBER: builtins.int
PARAMETERS_FIELD_NUMBER: builtins.int
Expand All @@ -4131,7 +4114,7 @@ class ConversationToolsFunction(google.protobuf.message.Message):
used by the model to choose when and how to call the function.
"""
@property
def parameters(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, google.protobuf.any_pb2.Any]:
def parameters(self) -> google.protobuf.struct_pb2.Struct:
"""The parameters the functions accepts, described as a JSON Schema object.
See the [guide](https://platform.openai.com/docs/guides/function-calling) for examples,
and the [JSON Schema reference](https://json-schema.org/understanding-json-schema/) for documentation about the format.
Expand All @@ -4143,9 +4126,9 @@ class ConversationToolsFunction(google.protobuf.message.Message):
*,
name: builtins.str = ...,
description: builtins.str | None = ...,
parameters: collections.abc.Mapping[builtins.str, google.protobuf.any_pb2.Any] | None = ...,
parameters: google.protobuf.struct_pb2.Struct | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["_description", b"_description", "description", b"description"]) -> builtins.bool: ...
def HasField(self, field_name: typing.Literal["_description", b"_description", "description", b"description", "parameters", b"parameters"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_description", b"_description", "description", b"description", "name", b"name", "parameters", b"parameters"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["_description", b"_description"]) -> typing.Literal["description"] | None: ...

Expand Down
13 changes: 5 additions & 8 deletions examples/jobs/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def main():
with DaprClient() as client:
# Example 0: Simple job without data (works without protobuf)
print('0. Scheduling a simple job without data...', flush=True)
simple_job = Job(name='simple-job', schedule='@every 30s', overwrite=True)
simple_job = Job(name='simple-job', schedule='@every 30s')

try:
client.schedule_job_alpha1(simple_job)
client.schedule_job_alpha1(job=simple_job, overwrite=True)
print(f'✓ Simple job scheduled successfully', flush=True)
except Exception as e:
print(f'✗ Failed to schedule simple job: {e}', flush=True)
Expand All @@ -33,11 +33,10 @@ def main():
schedule='@every 30s',
data=job_data,
ttl='5m',
overwrite=True,
)

try:
client.schedule_job_alpha1(recurring_job)
client.schedule_job_alpha1(job=recurring_job, overwrite=True)
print(f'✓ Recurring job scheduled successfully', flush=True)
except Exception as e:
print(f'✗ Failed to schedule recurring job: {e}', flush=True)
Expand Down Expand Up @@ -68,11 +67,10 @@ def main():
schedule='@every 45s',
data=create_job_data('Job with drop failure policy'),
failure_policy=DropFailurePolicy(),
overwrite=True,
)

try:
client.schedule_job_alpha1(drop_policy_job)
client.schedule_job_alpha1(job=drop_policy_job, overwrite=True)
print(f'✓ Job with drop failure policy scheduled successfully', flush=True)
except Exception as e:
print(f'✗ Failed to schedule job with drop policy: {e}', flush=True)
Expand All @@ -83,11 +81,10 @@ def main():
schedule='@every 60s',
data=create_job_data('Job with constant retry policy'),
failure_policy=ConstantFailurePolicy(max_retries=3, interval_seconds=10),
overwrite=True,
)

try:
client.schedule_job_alpha1(constant_policy_job)
client.schedule_job_alpha1(job=constant_policy_job, overwrite=True)
print(f'✓ Job with constant retry policy scheduled successfully', flush=True)
except Exception as e:
print(f'✗ Failed to schedule job with retry policy: {e}', flush=True)
Expand Down
2 changes: 2 additions & 0 deletions tests/clients/fake_dapr_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, grpc_port: int = 50001, http_port: int = 8080):
self.workflow_options: Dict[str, str] = {}
self.metadata: Dict[str, str] = {}
self.jobs: Dict[str, api_v1.Job] = {}
self.job_overwrites: Dict[str, bool] = {}
self._next_exception = None

def start(self):
Expand Down Expand Up @@ -550,6 +551,7 @@ def ScheduleJobAlpha1(self, request, context):

# Store the job
self.jobs[request.job.name] = request.job
self.job_overwrites[request.job.name] = request.overwrite

return empty_pb2.Empty()

Expand Down
29 changes: 25 additions & 4 deletions tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,9 +1253,31 @@ def test_schedule_job_alpha1_success(self):
# Verify job was stored in fake server
self.assertIn('test-job', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['test-job']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job']
self.assertEqual(stored_job.name, 'test-job')
self.assertEqual(stored_job.schedule, '@every 1m')
self.assertEqual(stored_job.overwrite, False)
self.assertEqual(stored_job_overwrite, False)
# Verify data field is always set (even if empty)
self.assertTrue(stored_job.HasField('data'))

def test_schedule_job_alpha1_success_with_overwrite(self):
"""Test successful job scheduling."""
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
job = Job(name='test-job', schedule='@every 1m')

# Schedule the job
response = dapr.schedule_job_alpha1(job=job, overwrite=True)

# Verify response type
self.assertIsInstance(response, DaprResponse)

# Verify job was stored in fake server
self.assertIn('test-job', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['test-job']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job']
self.assertEqual(stored_job.name, 'test-job')
self.assertEqual(stored_job.schedule, '@every 1m')
self.assertEqual(stored_job_overwrite, True)
# Verify data field is always set (even if empty)
self.assertTrue(stored_job.HasField('data'))

Expand All @@ -1280,12 +1302,12 @@ def test_schedule_job_alpha1_success_with_data(self):
# Verify job was stored in fake server with all data
self.assertIn('test-job-with-data', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['test-job-with-data']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job-with-data']
self.assertEqual(stored_job.name, 'test-job-with-data')
self.assertEqual(stored_job.schedule, '@every 2m')
self.assertEqual(stored_job.repeats, 3)
self.assertEqual(stored_job.ttl, '10m')
self.assertEqual(stored_job.overwrite, False)

self.assertEqual(stored_job_overwrite, False)
# Verify data field contains the payload
self.assertTrue(stored_job.HasField('data'))
self.assertEqual(
Expand Down Expand Up @@ -1323,7 +1345,6 @@ def test_get_job_alpha1_success(self):
self.assertEqual(retrieved_job.schedule, '@every 1m')
self.assertEqual(retrieved_job.repeats, 5)
self.assertEqual(retrieved_job.ttl, '1h')
self.assertEqual(retrieved_job.overwrite, False)

def test_get_job_alpha1_validation_error(self):
"""Test validation error in job retrieval."""
Expand Down
34 changes: 28 additions & 6 deletions tests/clients/test_dapr_grpc_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,9 +1184,33 @@ async def test_schedule_job_alpha1_success(self):
# Verify job was stored in fake server
self.assertIn('async-test-job', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['async-test-job']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job']
self.assertEqual(stored_job.name, 'async-test-job')
self.assertEqual(stored_job.schedule, '@every 1m')
self.assertEqual(stored_job.overwrite, False)
self.assertEqual(stored_job_overwrite, False)
# Verify data field is always set (even if empty)
self.assertTrue(stored_job.HasField('data'))

await dapr.close()

async def test_schedule_job_alpha1_success_with_overwrite(self):
"""Test successful async job scheduling."""
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
job = Job(name='async-test-job', schedule='@every 1m')

# Schedule the job with overwrite
response = await dapr.schedule_job_alpha1(job=job, overwrite=True)

# Verify response type
self.assertIsInstance(response, DaprResponse)

# Verify job was stored in fake server
self.assertIn('async-test-job', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['async-test-job']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job']
self.assertEqual(stored_job.name, 'async-test-job')
self.assertEqual(stored_job.schedule, '@every 1m')
self.assertEqual(stored_job_overwrite, True)
# Verify data field is always set (even if empty)
self.assertTrue(stored_job.HasField('data'))

Expand Down Expand Up @@ -1215,11 +1239,12 @@ async def test_schedule_job_alpha1_success_with_data(self):
# Verify job was stored in fake server with all data
self.assertIn('async-test-job-with-data', self._fake_dapr_server.jobs)
stored_job = self._fake_dapr_server.jobs['async-test-job-with-data']
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job-with-data']
self.assertEqual(stored_job.name, 'async-test-job-with-data')
self.assertEqual(stored_job.schedule, '@every 2m')
self.assertEqual(stored_job.repeats, 3)
self.assertEqual(stored_job.ttl, '10m')
self.assertEqual(stored_job.overwrite, False)
self.assertEqual(stored_job_overwrite, False)

# Verify data field contains the payload
self.assertTrue(stored_job.HasField('data'))
Expand Down Expand Up @@ -1279,7 +1304,6 @@ async def test_get_job_alpha1_success(self):
self.assertEqual(retrieved_job.schedule, '@every 1m')
self.assertEqual(retrieved_job.repeats, 5)
self.assertEqual(retrieved_job.ttl, '1h')
self.assertEqual(retrieved_job.overwrite, False)

await dapr.close()

Expand Down Expand Up @@ -1353,11 +1377,10 @@ async def test_job_lifecycle(self):
data=data,
repeats=10,
ttl='30m',
overwrite=True,
)

# 1. Schedule the job
schedule_response = await dapr.schedule_job_alpha1(job)
schedule_response = await dapr.schedule_job_alpha1(job=job, overwrite=True)
self.assertIsInstance(schedule_response, DaprResponse)

# 2. Get the job and verify all fields
Expand All @@ -1366,7 +1389,6 @@ async def test_job_lifecycle(self):
self.assertEqual(retrieved_job.schedule, '@every 5m')
self.assertEqual(retrieved_job.repeats, 10)
self.assertEqual(retrieved_job.ttl, '30m')
self.assertTrue(retrieved_job.overwrite)
self.assertEqual(retrieved_job.data.value, b'{"lifecycle": "test"}')

# 3. Delete the job
Expand Down
Loading