Skip to content

Commit bca79b7

Browse files
Adds async client
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 1471f7c commit bca79b7

File tree

4 files changed

+319
-1
lines changed

4 files changed

+319
-1
lines changed

dapr/aio/clients/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from dapr.clients.base import DaprActorClientBase
1919
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
2020
from dapr.aio.clients.grpc.client import DaprGrpcClientAsync, MetadataTuple, InvokeMethodResponse
21+
from dapr.clients.grpc._jobs import Job
2122
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
2223
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
2324
from dapr.conf import settings
@@ -29,6 +30,7 @@
2930
'DaprActorHttpClient',
3031
'DaprInternalError',
3132
'ERROR_CODE_UNKNOWN',
33+
'Job',
3234
]
3335

3436
from grpc.aio import ( # type: ignore

dapr/aio/clients/grpc/client.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
TransactionalStateOperation,
7979
ConversationInput,
8080
)
81+
from dapr.clients.grpc._jobs import Job
8182
from dapr.clients.grpc._response import (
8283
BindingResponse,
8384
ConversationResponse,
@@ -1847,6 +1848,107 @@ async def get_metadata(self) -> GetMetadataResponse:
18471848
headers=await call.initial_metadata(),
18481849
)
18491850

1851+
async def schedule_job_alpha1(self, job: Job) -> DaprResponse:
1852+
"""Schedules a job to be triggered at a specified time or interval.
1853+
1854+
This is an Alpha API and is subject to change.
1855+
1856+
Args:
1857+
job (Job): The job to schedule. Must have a name and either schedule or due_time.
1858+
1859+
Returns:
1860+
DaprResponse: Empty response indicating successful scheduling.
1861+
1862+
Raises:
1863+
ValueError: If job name is empty or both schedule and due_time are missing.
1864+
DaprGrpcError: If the Dapr runtime returns an error.
1865+
"""
1866+
# Warnings and input validation
1867+
warn(
1868+
'The Jobs API is an Alpha version and is subject to change.',
1869+
UserWarning,
1870+
stacklevel=2,
1871+
)
1872+
validateNotBlankString(job_name=job.name)
1873+
1874+
if not job.schedule and not job.due_time:
1875+
raise ValueError('Job must have either schedule or due_time specified')
1876+
1877+
# Convert job to proto using the Job class private method
1878+
job_proto = job._get_proto()
1879+
request = api_v1.ScheduleJobRequest(job=job_proto)
1880+
1881+
try:
1882+
call = self._stub.ScheduleJobAlpha1(request)
1883+
await call
1884+
return DaprResponse(headers=await call.initial_metadata())
1885+
except grpc.aio.AioRpcError as err:
1886+
raise DaprGrpcError(err) from err
1887+
1888+
async def get_job_alpha1(self, name: str) -> Job:
1889+
"""Gets a scheduled job by name.
1890+
1891+
This is an Alpha API and is subject to change.
1892+
1893+
Args:
1894+
name (str): The name of the job to retrieve.
1895+
1896+
Returns:
1897+
Job: The job details retrieved from the scheduler.
1898+
1899+
Raises:
1900+
ValueError: If job name is empty.
1901+
DaprGrpcError: If the Dapr runtime returns an error.
1902+
"""
1903+
# Warnings and input validation
1904+
warn(
1905+
'The Jobs API is an Alpha version and is subject to change.',
1906+
UserWarning,
1907+
stacklevel=2,
1908+
)
1909+
validateNotBlankString(job_name=name)
1910+
1911+
request = api_v1.GetJobRequest(name=name)
1912+
1913+
try:
1914+
call = self._stub.GetJobAlpha1(request)
1915+
response = await call
1916+
return Job._from_proto(response.job)
1917+
except grpc.aio.AioRpcError as err:
1918+
raise DaprGrpcError(err) from err
1919+
1920+
async def delete_job_alpha1(self, name: str) -> DaprResponse:
1921+
"""Deletes a scheduled job by name.
1922+
1923+
This is an Alpha API and is subject to change.
1924+
1925+
Args:
1926+
name (str): The name of the job to delete.
1927+
1928+
Returns:
1929+
DaprResponse: Empty response indicating successful deletion.
1930+
1931+
Raises:
1932+
ValueError: If job name is empty.
1933+
DaprGrpcError: If the Dapr runtime returns an error.
1934+
"""
1935+
# Warnings and input validation
1936+
warn(
1937+
'The Jobs API is an Alpha version and is subject to change.',
1938+
UserWarning,
1939+
stacklevel=2,
1940+
)
1941+
validateNotBlankString(job_name=name)
1942+
1943+
request = api_v1.DeleteJobRequest(name=name)
1944+
1945+
try:
1946+
call = self._stub.DeleteJobAlpha1(request)
1947+
await call
1948+
return DaprResponse(headers=await call.initial_metadata())
1949+
except grpc.aio.AioRpcError as err:
1950+
raise DaprGrpcError(err) from err
1951+
18501952
async def set_metadata(self, attributeName: str, attributeValue: str) -> DaprResponse:
18511953
"""Adds a custom (extended) metadata attribute to the Dapr sidecar
18521954
information stored by the Metadata endpoint.

examples/jobs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Example - Jobs API
22

3-
This example demonstrates the [Jobs API](https://docs.dapr.io/developing-applications/building-blocks/scheduler/) in Dapr.
3+
This example demonstrates the [Jobs API](https://docs.dapr.io/developing-applications/building-blocks/jobs/) in Dapr.
44
It demonstrates the following APIs:
55
- **schedule_job_alpha1**: Schedule a job to run at specified times
66
- **get_job_alpha1**: Retrieve details about a scheduled job

tests/clients/test_dapr_grpc_client_async.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
from dapr.conf import settings
3131
from dapr.clients.grpc._helpers import to_bytes
3232
from dapr.clients.grpc._request import TransactionalStateOperation, ConversationInput
33+
from dapr.clients.grpc._jobs import Job
3334
from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem
3435
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
3536
from dapr.clients.grpc._response import (
3637
ConfigurationItem,
3738
ConfigurationWatcher,
3839
ConfigurationResponse,
40+
DaprResponse,
3941
UnlockResponseStatus,
4042
)
4143

@@ -1164,6 +1166,218 @@ async def test_converse_alpha1_error_handling(self):
11641166
self.assertTrue('Invalid argument' in str(context.exception))
11651167
await dapr.close()
11661168

1169+
#
1170+
# Tests for Jobs API (Alpha) - Async
1171+
#
1172+
1173+
async def test_schedule_job_alpha1_success(self):
1174+
"""Test successful async job scheduling."""
1175+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1176+
job = Job(name='async-test-job', schedule='@every 1m')
1177+
1178+
# Schedule the job
1179+
response = await dapr.schedule_job_alpha1(job)
1180+
1181+
# Verify response type
1182+
self.assertIsInstance(response, DaprResponse)
1183+
1184+
# Verify job was stored in fake server
1185+
self.assertIn('async-test-job', self._fake_dapr_server.jobs)
1186+
stored_job = self._fake_dapr_server.jobs['async-test-job']
1187+
self.assertEqual(stored_job.name, 'async-test-job')
1188+
self.assertEqual(stored_job.schedule, '@every 1m')
1189+
self.assertEqual(stored_job.overwrite, False)
1190+
# Verify data field is always set (even if empty)
1191+
self.assertTrue(stored_job.HasField('data'))
1192+
1193+
await dapr.close()
1194+
1195+
async def test_schedule_job_alpha1_success_with_data(self):
1196+
"""Test successful async job scheduling with data payload."""
1197+
from google.protobuf.any_pb2 import Any as GrpcAny
1198+
1199+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1200+
1201+
# Create job data
1202+
data = GrpcAny()
1203+
data.value = b'{"message": "Hello from async job!", "priority": "high"}'
1204+
1205+
job = Job(
1206+
name='async-test-job-with-data', schedule='@every 2m', data=data, repeats=3, ttl='10m'
1207+
)
1208+
1209+
# Schedule the job
1210+
response = await dapr.schedule_job_alpha1(job)
1211+
1212+
# Verify response type
1213+
self.assertIsInstance(response, DaprResponse)
1214+
1215+
# Verify job was stored in fake server with all data
1216+
self.assertIn('async-test-job-with-data', self._fake_dapr_server.jobs)
1217+
stored_job = self._fake_dapr_server.jobs['async-test-job-with-data']
1218+
self.assertEqual(stored_job.name, 'async-test-job-with-data')
1219+
self.assertEqual(stored_job.schedule, '@every 2m')
1220+
self.assertEqual(stored_job.repeats, 3)
1221+
self.assertEqual(stored_job.ttl, '10m')
1222+
self.assertEqual(stored_job.overwrite, False)
1223+
1224+
# Verify data field contains the payload
1225+
self.assertTrue(stored_job.HasField('data'))
1226+
self.assertEqual(
1227+
stored_job.data.value, b'{"message": "Hello from async job!", "priority": "high"}'
1228+
)
1229+
1230+
await dapr.close()
1231+
1232+
async def test_schedule_job_alpha1_validation_error(self):
1233+
"""Test async validation error in job scheduling."""
1234+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1235+
1236+
# Test empty job name - this should be caught by client validation
1237+
with self.assertRaises(ValueError):
1238+
job = Job(name='', schedule='@every 1m')
1239+
await dapr.schedule_job_alpha1(job)
1240+
1241+
# Test missing schedule and due_time - this should be caught by client validation
1242+
with self.assertRaises(ValueError):
1243+
job = Job(name='async-test-job')
1244+
await dapr.schedule_job_alpha1(job)
1245+
1246+
await dapr.close()
1247+
1248+
async def test_schedule_jobs_error_handling(self):
1249+
"""Test async error handling for Jobs API using fake server's exception mechanism."""
1250+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1251+
1252+
# Set up fake server to raise an exception on next call
1253+
error_status = status_pb2.Status(
1254+
code=code_pb2.INTERNAL, message='Simulated async server error'
1255+
)
1256+
self._fake_dapr_server.raise_exception_on_next_call(error_status)
1257+
1258+
# Try to schedule a job - should raise DaprGrpcError
1259+
job = Job(name='async-error-test', schedule='@every 1m')
1260+
with self.assertRaises(DaprGrpcError):
1261+
await dapr.schedule_job_alpha1(job)
1262+
1263+
await dapr.close()
1264+
1265+
async def test_get_job_alpha1_success(self):
1266+
"""Test successful async job retrieval."""
1267+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1268+
1269+
# First schedule a job
1270+
original_job = Job(name='async-get-test-job', schedule='@every 1m', repeats=5, ttl='1h')
1271+
await dapr.schedule_job_alpha1(original_job)
1272+
1273+
# Now retrieve it
1274+
retrieved_job = await dapr.get_job_alpha1('async-get-test-job')
1275+
1276+
# Verify response
1277+
self.assertIsInstance(retrieved_job, Job)
1278+
self.assertEqual(retrieved_job.name, 'async-get-test-job')
1279+
self.assertEqual(retrieved_job.schedule, '@every 1m')
1280+
self.assertEqual(retrieved_job.repeats, 5)
1281+
self.assertEqual(retrieved_job.ttl, '1h')
1282+
self.assertEqual(retrieved_job.overwrite, False)
1283+
1284+
await dapr.close()
1285+
1286+
async def test_get_job_alpha1_validation_error(self):
1287+
"""Test async validation error in job retrieval."""
1288+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1289+
1290+
with self.assertRaises(ValueError):
1291+
await dapr.get_job_alpha1('')
1292+
1293+
await dapr.close()
1294+
1295+
async def test_get_job_alpha1_not_found(self):
1296+
"""Test async getting a job that doesn't exist."""
1297+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1298+
1299+
# Setup server to raise an exception
1300+
self._fake_dapr_server.raise_exception_on_next_call(
1301+
status_pb2.Status(code=code_pb2.NOT_FOUND, message='Job not found')
1302+
)
1303+
1304+
with self.assertRaises(DaprGrpcError):
1305+
await dapr.get_job_alpha1('async-non-existent-job')
1306+
1307+
await dapr.close()
1308+
1309+
async def test_delete_job_alpha1_success(self):
1310+
"""Test successful async job deletion."""
1311+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1312+
1313+
# First schedule a job
1314+
job = Job(name='async-delete-test-job', schedule='@every 1m')
1315+
await dapr.schedule_job_alpha1(job)
1316+
1317+
# Verify job exists
1318+
self.assertIn('async-delete-test-job', self._fake_dapr_server.jobs)
1319+
1320+
# Delete the job
1321+
response = await dapr.delete_job_alpha1('async-delete-test-job')
1322+
1323+
# Verify response
1324+
self.assertIsInstance(response, DaprResponse)
1325+
1326+
# Verify job was removed from fake server
1327+
self.assertNotIn('async-delete-test-job', self._fake_dapr_server.jobs)
1328+
1329+
await dapr.close()
1330+
1331+
async def test_delete_job_alpha1_validation_error(self):
1332+
"""Test async validation error in job deletion."""
1333+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1334+
1335+
with self.assertRaises(ValueError):
1336+
await dapr.delete_job_alpha1('')
1337+
1338+
await dapr.close()
1339+
1340+
async def test_job_lifecycle(self):
1341+
"""Test complete async job lifecycle: schedule → get → delete."""
1342+
from google.protobuf.any_pb2 import Any as GrpcAny
1343+
1344+
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
1345+
1346+
# Create job with data
1347+
data = GrpcAny()
1348+
data.value = b'{"lifecycle": "test"}'
1349+
1350+
job = Job(
1351+
name='async-lifecycle-job',
1352+
schedule='@every 5m',
1353+
data=data,
1354+
repeats=10,
1355+
ttl='30m',
1356+
overwrite=True,
1357+
)
1358+
1359+
# 1. Schedule the job
1360+
schedule_response = await dapr.schedule_job_alpha1(job)
1361+
self.assertIsInstance(schedule_response, DaprResponse)
1362+
1363+
# 2. Get the job and verify all fields
1364+
retrieved_job = await dapr.get_job_alpha1('async-lifecycle-job')
1365+
self.assertEqual(retrieved_job.name, 'async-lifecycle-job')
1366+
self.assertEqual(retrieved_job.schedule, '@every 5m')
1367+
self.assertEqual(retrieved_job.repeats, 10)
1368+
self.assertEqual(retrieved_job.ttl, '30m')
1369+
self.assertTrue(retrieved_job.overwrite)
1370+
self.assertEqual(retrieved_job.data.value, b'{"lifecycle": "test"}')
1371+
1372+
# 3. Delete the job
1373+
delete_response = await dapr.delete_job_alpha1('async-lifecycle-job')
1374+
self.assertIsInstance(delete_response, DaprResponse)
1375+
1376+
# 4. Verify job is gone
1377+
self.assertNotIn('async-lifecycle-job', self._fake_dapr_server.jobs)
1378+
1379+
await dapr.close()
1380+
11671381

11681382
if __name__ == '__main__':
11691383
unittest.main()

0 commit comments

Comments
 (0)