Skip to content

Commit 3f25833

Browse files
committed
Fix usage of overwrite, from job to the schedule request.
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 1cbeef8 commit 3f25833

File tree

8 files changed

+68
-36
lines changed

8 files changed

+68
-36
lines changed

dapr/aio/clients/grpc/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,13 +1851,14 @@ async def get_metadata(self) -> GetMetadataResponse:
18511851
headers=await call.initial_metadata(),
18521852
)
18531853

1854-
async def schedule_job_alpha1(self, job: Job) -> DaprResponse:
1854+
async def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
18551855
"""Schedules a job to be triggered at a specified time or interval.
18561856
18571857
This is an Alpha API and is subject to change.
18581858
18591859
Args:
18601860
job (Job): The job to schedule. Must have a name and either schedule or due_time.
1861+
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
18611862
18621863
Returns:
18631864
DaprResponse: Empty response indicating successful scheduling.
@@ -1879,7 +1880,7 @@ async def schedule_job_alpha1(self, job: Job) -> DaprResponse:
18791880

18801881
# Convert job to proto using the Job class private method
18811882
job_proto = job._get_proto()
1882-
request = api_v1.ScheduleJobRequest(job=job_proto)
1883+
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)
18831884

18841885
try:
18851886
call = self._stub.ScheduleJobAlpha1(request)

dapr/clients/grpc/_jobs.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class Job:
9999
when the job is triggered. If not provided, an empty Any proto will be used.
100100
failure_policy (Optional[FailurePolicy]): The failure policy to apply when the job fails
101101
to trigger. If not provided, the default behavior is determined by the Dapr runtime.
102-
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
103102
"""
104103

105104
name: str
@@ -109,7 +108,6 @@ class Job:
109108
ttl: Optional[str] = None
110109
data: Optional[GrpcAny] = None
111110
failure_policy: Optional[FailurePolicy] = None
112-
overwrite: bool = False
113111

114112
def _get_proto(self):
115113
"""Convert this Job instance to a Dapr Job proto message.
@@ -123,7 +121,7 @@ def _get_proto(self):
123121
from google.protobuf.any_pb2 import Any as GrpcAny
124122

125123
# Build the job proto
126-
job_proto = api_v1.Job(name=self.name, overwrite=self.overwrite)
124+
job_proto = api_v1.Job(name=self.name)
127125

128126
if self.schedule:
129127
job_proto.schedule = self.schedule
@@ -133,7 +131,6 @@ def _get_proto(self):
133131
job_proto.due_time = self.due_time
134132
if self.ttl:
135133
job_proto.ttl = self.ttl
136-
# overwrite is already set in the constructor above
137134

138135
# data field is required, set empty Any if not provided
139136
if self.data:
@@ -184,5 +181,4 @@ def _from_proto(cls, job_proto):
184181
ttl=job_proto.ttl if job_proto.HasField('ttl') else None,
185182
data=job_proto.data if job_proto.HasField('data') and job_proto.data.value else None,
186183
failure_policy=failure_policy,
187-
overwrite=job_proto.overwrite,
188184
)

dapr/clients/grpc/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,13 +1778,14 @@ def converse_alpha1(
17781778
except RpcError as err:
17791779
raise DaprGrpcError(err) from err
17801780

1781-
def schedule_job_alpha1(self, job: Job) -> DaprResponse:
1781+
def schedule_job_alpha1(self, job: Job, overwrite: bool = False) -> DaprResponse:
17821782
"""Schedules a job to be triggered at a specified time or interval.
17831783
17841784
This is an Alpha API and is subject to change.
17851785
17861786
Args:
17871787
job (Job): The job to schedule. Must have a name and either schedule or due_time.
1788+
overwrite (bool): If true, allows this job to overwrite an existing job with the same name.
17881789
17891790
Returns:
17901791
DaprResponse: Empty response indicating successful scheduling.
@@ -1806,7 +1807,7 @@ def schedule_job_alpha1(self, job: Job) -> DaprResponse:
18061807

18071808
# Convert job to proto using the Job class private method
18081809
job_proto = job._get_proto()
1809-
request = api_v1.ScheduleJobRequest(job=job_proto)
1810+
request = api_v1.ScheduleJobRequest(job=job_proto, overwrite=overwrite)
18101811

18111812
try:
18121813
_, call = self.retry_policy.run_rpc(self._stub.ScheduleJobAlpha1.with_call, request)

examples/jobs/job_management.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ def main():
1616
with DaprClient() as client:
1717
# Example 0: Simple job without data (works without protobuf)
1818
print('0. Scheduling a simple job without data...', flush=True)
19-
simple_job = Job(name='simple-job', schedule='@every 30s', overwrite=True)
19+
simple_job = Job(name='simple-job', schedule='@every 30s')
2020

2121
try:
22-
client.schedule_job_alpha1(simple_job)
22+
client.schedule_job_alpha1(job=simple_job, overwrite=True)
2323
print(f'✓ Simple job scheduled successfully', flush=True)
2424
except Exception as e:
2525
print(f'✗ Failed to schedule simple job: {e}', flush=True)
@@ -33,11 +33,10 @@ def main():
3333
schedule='@every 30s',
3434
data=job_data,
3535
ttl='5m',
36-
overwrite=True,
3736
)
3837

3938
try:
40-
client.schedule_job_alpha1(recurring_job)
39+
client.schedule_job_alpha1(job=recurring_job, overwrite=True)
4140
print(f'✓ Recurring job scheduled successfully', flush=True)
4241
except Exception as e:
4342
print(f'✗ Failed to schedule recurring job: {e}', flush=True)
@@ -68,11 +67,10 @@ def main():
6867
schedule='@every 45s',
6968
data=create_job_data('Job with drop failure policy'),
7069
failure_policy=DropFailurePolicy(),
71-
overwrite=True,
7270
)
7371

7472
try:
75-
client.schedule_job_alpha1(drop_policy_job)
73+
client.schedule_job_alpha1(job=drop_policy_job, overwrite=True)
7674
print(f'✓ Job with drop failure policy scheduled successfully', flush=True)
7775
except Exception as e:
7876
print(f'✗ Failed to schedule job with drop policy: {e}', flush=True)
@@ -83,11 +81,10 @@ def main():
8381
schedule='@every 60s',
8482
data=create_job_data('Job with constant retry policy'),
8583
failure_policy=ConstantFailurePolicy(max_retries=3, interval_seconds=10),
86-
overwrite=True,
8784
)
8885

8986
try:
90-
client.schedule_job_alpha1(constant_policy_job)
87+
client.schedule_job_alpha1(job=constant_policy_job, overwrite=True)
9188
print(f'✓ Job with constant retry policy scheduled successfully', flush=True)
9289
except Exception as e:
9390
print(f'✗ Failed to schedule job with retry policy: {e}', flush=True)

tests/clients/fake_dapr_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self, grpc_port: int = 50001, http_port: int = 8080):
5555
self.workflow_options: Dict[str, str] = {}
5656
self.metadata: Dict[str, str] = {}
5757
self.jobs: Dict[str, api_v1.Job] = {}
58+
self.job_overwrites: Dict[str, bool] = {}
5859
self._next_exception = None
5960

6061
def start(self):
@@ -550,6 +551,7 @@ def ScheduleJobAlpha1(self, request, context):
550551

551552
# Store the job
552553
self.jobs[request.job.name] = request.job
554+
self.job_overwrites[request.job.name] = request.overwrite
553555

554556
return empty_pb2.Empty()
555557

tests/clients/test_dapr_grpc_client.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,9 +1253,31 @@ def test_schedule_job_alpha1_success(self):
12531253
# Verify job was stored in fake server
12541254
self.assertIn('test-job', self._fake_dapr_server.jobs)
12551255
stored_job = self._fake_dapr_server.jobs['test-job']
1256+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job']
12561257
self.assertEqual(stored_job.name, 'test-job')
12571258
self.assertEqual(stored_job.schedule, '@every 1m')
1258-
self.assertEqual(stored_job.overwrite, False)
1259+
self.assertEqual(stored_job_overwrite, False)
1260+
# Verify data field is always set (even if empty)
1261+
self.assertTrue(stored_job.HasField('data'))
1262+
1263+
def test_schedule_job_alpha1_success_with_overwrite(self):
1264+
"""Test successful job scheduling."""
1265+
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
1266+
job = Job(name='test-job', schedule='@every 1m')
1267+
1268+
# Schedule the job
1269+
response = dapr.schedule_job_alpha1(job=job, overwrite=True)
1270+
1271+
# Verify response type
1272+
self.assertIsInstance(response, DaprResponse)
1273+
1274+
# Verify job was stored in fake server
1275+
self.assertIn('test-job', self._fake_dapr_server.jobs)
1276+
stored_job = self._fake_dapr_server.jobs['test-job']
1277+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job']
1278+
self.assertEqual(stored_job.name, 'test-job')
1279+
self.assertEqual(stored_job.schedule, '@every 1m')
1280+
self.assertEqual(stored_job_overwrite, True)
12591281
# Verify data field is always set (even if empty)
12601282
self.assertTrue(stored_job.HasField('data'))
12611283

@@ -1280,12 +1302,12 @@ def test_schedule_job_alpha1_success_with_data(self):
12801302
# Verify job was stored in fake server with all data
12811303
self.assertIn('test-job-with-data', self._fake_dapr_server.jobs)
12821304
stored_job = self._fake_dapr_server.jobs['test-job-with-data']
1305+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['test-job-with-data']
12831306
self.assertEqual(stored_job.name, 'test-job-with-data')
12841307
self.assertEqual(stored_job.schedule, '@every 2m')
12851308
self.assertEqual(stored_job.repeats, 3)
12861309
self.assertEqual(stored_job.ttl, '10m')
1287-
self.assertEqual(stored_job.overwrite, False)
1288-
1310+
self.assertEqual(stored_job_overwrite, False)
12891311
# Verify data field contains the payload
12901312
self.assertTrue(stored_job.HasField('data'))
12911313
self.assertEqual(
@@ -1323,7 +1345,6 @@ def test_get_job_alpha1_success(self):
13231345
self.assertEqual(retrieved_job.schedule, '@every 1m')
13241346
self.assertEqual(retrieved_job.repeats, 5)
13251347
self.assertEqual(retrieved_job.ttl, '1h')
1326-
self.assertEqual(retrieved_job.overwrite, False)
13271348

13281349
def test_get_job_alpha1_validation_error(self):
13291350
"""Test validation error in job retrieval."""

tests/clients/test_dapr_grpc_client_async.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,9 +1184,33 @@ async def test_schedule_job_alpha1_success(self):
11841184
# Verify job was stored in fake server
11851185
self.assertIn('async-test-job', self._fake_dapr_server.jobs)
11861186
stored_job = self._fake_dapr_server.jobs['async-test-job']
1187+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job']
11871188
self.assertEqual(stored_job.name, 'async-test-job')
11881189
self.assertEqual(stored_job.schedule, '@every 1m')
1189-
self.assertEqual(stored_job.overwrite, False)
1190+
self.assertEqual(stored_job_overwrite, False)
1191+
# Verify data field is always set (even if empty)
1192+
self.assertTrue(stored_job.HasField('data'))
1193+
1194+
await dapr.close()
1195+
1196+
async def test_schedule_job_alpha1_success_with_overwrite(self):
1197+
"""Test successful async job scheduling."""
1198+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1199+
job = Job(name='async-test-job', schedule='@every 1m')
1200+
1201+
# Schedule the job with overwrite
1202+
response = await dapr.schedule_job_alpha1(job=job, overwrite=True)
1203+
1204+
# Verify response type
1205+
self.assertIsInstance(response, DaprResponse)
1206+
1207+
# Verify job was stored in fake server
1208+
self.assertIn('async-test-job', self._fake_dapr_server.jobs)
1209+
stored_job = self._fake_dapr_server.jobs['async-test-job']
1210+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job']
1211+
self.assertEqual(stored_job.name, 'async-test-job')
1212+
self.assertEqual(stored_job.schedule, '@every 1m')
1213+
self.assertEqual(stored_job_overwrite, True)
11901214
# Verify data field is always set (even if empty)
11911215
self.assertTrue(stored_job.HasField('data'))
11921216

@@ -1215,11 +1239,12 @@ async def test_schedule_job_alpha1_success_with_data(self):
12151239
# Verify job was stored in fake server with all data
12161240
self.assertIn('async-test-job-with-data', self._fake_dapr_server.jobs)
12171241
stored_job = self._fake_dapr_server.jobs['async-test-job-with-data']
1242+
stored_job_overwrite = self._fake_dapr_server.job_overwrites['async-test-job-with-data']
12181243
self.assertEqual(stored_job.name, 'async-test-job-with-data')
12191244
self.assertEqual(stored_job.schedule, '@every 2m')
12201245
self.assertEqual(stored_job.repeats, 3)
12211246
self.assertEqual(stored_job.ttl, '10m')
1222-
self.assertEqual(stored_job.overwrite, False)
1247+
self.assertEqual(stored_job_overwrite, False)
12231248

12241249
# Verify data field contains the payload
12251250
self.assertTrue(stored_job.HasField('data'))
@@ -1279,7 +1304,6 @@ async def test_get_job_alpha1_success(self):
12791304
self.assertEqual(retrieved_job.schedule, '@every 1m')
12801305
self.assertEqual(retrieved_job.repeats, 5)
12811306
self.assertEqual(retrieved_job.ttl, '1h')
1282-
self.assertEqual(retrieved_job.overwrite, False)
12831307

12841308
await dapr.close()
12851309

@@ -1353,11 +1377,10 @@ async def test_job_lifecycle(self):
13531377
data=data,
13541378
repeats=10,
13551379
ttl='30m',
1356-
overwrite=True,
13571380
)
13581381

13591382
# 1. Schedule the job
1360-
schedule_response = await dapr.schedule_job_alpha1(job)
1383+
schedule_response = await dapr.schedule_job_alpha1(job=job, overwrite=True)
13611384
self.assertIsInstance(schedule_response, DaprResponse)
13621385

13631386
# 2. Get the job and verify all fields
@@ -1366,7 +1389,6 @@ async def test_job_lifecycle(self):
13661389
self.assertEqual(retrieved_job.schedule, '@every 5m')
13671390
self.assertEqual(retrieved_job.repeats, 10)
13681391
self.assertEqual(retrieved_job.ttl, '30m')
1369-
self.assertTrue(retrieved_job.overwrite)
13701392
self.assertEqual(retrieved_job.data.value, b'{"lifecycle": "test"}')
13711393

13721394
# 3. Delete the job

tests/clients/test_jobs.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def test_job_creation(self):
2424
self.assertIsNone(job.due_time)
2525
self.assertIsNone(job.ttl)
2626
self.assertIsNone(job.data)
27-
self.assertEqual(job.overwrite, False)
2827

2928
# Test job with all fields
3029
data = GrpcAny()
@@ -37,15 +36,13 @@ def test_job_creation(self):
3736
due_time='2024-01-01T00:00:00Z',
3837
ttl='1h',
3938
data=data,
40-
overwrite=True,
4139
)
4240
self.assertEqual(job_full.name, 'full-job')
4341
self.assertEqual(job_full.schedule, '0 0 * * *')
4442
self.assertEqual(job_full.repeats, 5)
4543
self.assertEqual(job_full.due_time, '2024-01-01T00:00:00Z')
4644
self.assertEqual(job_full.ttl, '1h')
4745
self.assertEqual(job_full.data, data)
48-
self.assertEqual(job_full.overwrite, True)
4946

5047
def test_job_get_proto_full(self):
5148
"""Test _get_proto() method with all fields."""
@@ -59,7 +56,6 @@ def test_job_get_proto_full(self):
5956
due_time='2024-01-01T00:00:00Z',
6057
ttl='1h',
6158
data=data,
62-
overwrite=True,
6359
)
6460
job_proto = job._get_proto()
6561

@@ -70,7 +66,6 @@ def test_job_get_proto_full(self):
7066
self.assertEqual(job_proto.repeats, 5)
7167
self.assertEqual(job_proto.due_time, '2024-01-01T00:00:00Z')
7268
self.assertEqual(job_proto.ttl, '1h')
73-
self.assertTrue(job_proto.overwrite)
7469

7570
# Verify data field
7671
self.assertTrue(job_proto.HasField('data'))
@@ -88,7 +83,7 @@ def test_job_get_proto_no_data(self):
8883
def test_job_from_proto_no_data(self):
8984
"""Test _from_proto() method with minimal proto."""
9085
# Create minimal proto
91-
job_proto = api_v1.Job(name='test-job', overwrite=False)
86+
job_proto = api_v1.Job(name='test-job')
9287
job_proto.data.CopyFrom(GrpcAny()) # Empty data
9388

9489
# Convert to Job
@@ -101,7 +96,6 @@ def test_job_from_proto_no_data(self):
10196
self.assertIsNone(job.due_time)
10297
self.assertIsNone(job.ttl)
10398
self.assertIsNone(job.data) # Empty data becomes None
104-
self.assertEqual(job.overwrite, False)
10599

106100
def test_job_from_proto_full(self):
107101
"""Test _from_proto() method with all fields."""
@@ -115,7 +109,6 @@ def test_job_from_proto_full(self):
115109
repeats=5,
116110
due_time='2024-01-01T00:00:00Z',
117111
ttl='1h',
118-
overwrite=True,
119112
)
120113
job_proto.data.CopyFrom(data)
121114

@@ -129,7 +122,6 @@ def test_job_from_proto_full(self):
129122
self.assertEqual(job.due_time, '2024-01-01T00:00:00Z')
130123
self.assertEqual(job.ttl, '1h')
131124
self.assertEqual(job.data.value, b'{"message": "test"}')
132-
self.assertTrue(job.overwrite)
133125

134126
def test_job_with_drop_failure_policy(self):
135127
"""Test Job with DropFailurePolicy."""

0 commit comments

Comments
 (0)