Skip to content

Commit 963a673

Browse files
Adds Jobs API callback support
Signed-off-by: Elena Kolevska <[email protected]>
1 parent bca79b7 commit 963a673

File tree

7 files changed

+347
-6
lines changed

7 files changed

+347
-6
lines changed

dapr/clients/grpc/_request.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,3 +428,30 @@ class ConversationInput:
428428
content: str
429429
role: Optional[str] = None
430430
scrub_pii: Optional[bool] = None
431+
432+
433+
class JobEvent:
434+
"""Represents a job event received from Dapr runtime.
435+
436+
This matches the Go SDK's common.JobEvent structure and represents
437+
a job that is currently being executed, not a job definition.
438+
439+
Args:
440+
name (str): The name/type of the job being executed.
441+
data (bytes): The raw job data payload.
442+
"""
443+
444+
def __init__(self, name: str, data: bytes = b''):
445+
self.name = name
446+
self.data = data
447+
448+
def get_data_as_string(self, encoding: str = 'utf-8') -> str:
449+
"""Get the job data as a string.
450+
451+
Args:
452+
encoding (str): The encoding to use for decoding bytes. Defaults to 'utf-8'.
453+
454+
Returns:
455+
str: The job data as a string, or empty string if no data.
456+
"""
457+
return self.data.decode(encoding) if self.data else ''

examples/jobs/README.md

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ It demonstrates the following APIs:
66
- **get_job_alpha1**: Retrieve details about a scheduled job
77
- **delete_job_alpha1**: Delete a scheduled job
88

9-
It creates a client using `DaprClient` and calls all the Jobs API methods available as example.
9+
It includes two examples that showcase different aspects of the Jobs API:
10+
11+
1. **`job_management.py`** - Focuses on job scheduling patterns and management operations
12+
2. **`job_processing.py`** - Shows the complete workflow including job event handling
1013

1114
> **Note:** The Jobs API is currently in Alpha and subject to change. Make sure to use the latest proto bindings.
1215
@@ -47,11 +50,84 @@ timeout_seconds: 10
4750
-->
4851

4952
```bash
50-
dapr run --app-id jobs-example -- python3 simple_job.py
53+
dapr run --app-id jobs-example -- python3 job_management.py
54+
```
55+
56+
<!-- END_STEP -->
57+
58+
## Example 2: Complete Workflow with Job Event Handling
59+
60+
This example (`job_processing.py`) demonstrates the complete Jobs API workflow in a single application that both schedules jobs and handles job events when they trigger. This shows:
61+
62+
- How to register job event handlers using `@app.job_event()` decorators
63+
- How to schedule jobs from the same application that handles them
64+
- How to process job events with structured data
65+
- Complete end-to-end job lifecycle (schedule → trigger → handle)
66+
67+
Run the following command in a terminal/command-prompt:
68+
69+
<!-- STEP
70+
name: Run complete workflow example
71+
expected_stdout_lines:
72+
- "== APP == Dapr Jobs Example"
73+
- "== APP == Starting gRPC server on port 50051..."
74+
- "== APP == Scheduling jobs..."
75+
- "== APP == ✓ hello-job scheduled"
76+
- "== APP == ✓ data-job scheduled"
77+
- "== APP == Jobs scheduled! Waiting for execution..."
78+
- "== APP == Job event received: hello-job"
79+
- "== APP == Job data: None"
80+
- "== APP == Hello job processing completed!"
81+
- "== APP == Data job event received: data-job"
82+
- "== APP == Processing data_processing task with priority high"
83+
- "== APP == Processing 42 items..."
84+
- "== APP == Data job processing completed!"
85+
background: true
86+
sleep: 15
87+
-->
88+
89+
```bash
90+
# Start the complete workflow example (schedules jobs and handles job events)
91+
dapr run --app-id jobs-workflow --app-protocol grpc --app-port 50051 python3 job_processing.py
5192
```
5293

5394
<!-- END_STEP -->
5495

96+
## Cleanup
97+
98+
<!-- STEP
99+
expected_stdout_lines:
100+
- '✅ app stopped successfully: jobs-workflow'
101+
name: Shutdown dapr
102+
-->
103+
104+
```bash
105+
dapr stop --app-id jobs-workflow
106+
```
107+
108+
<!-- END_STEP -->
109+
110+
## Example Comparison
111+
112+
| Feature | `job_management.py` | `job_processing.py` |
113+
|---------|---------------------|---------------------|
114+
| **Purpose** | Job scheduling and management | Complete workflow with event handling |
115+
| **Job Scheduling** | ✅ Multiple patterns | ✅ Simple patterns |
116+
| **Job Event Handling** | ❌ No | ✅ Yes |
117+
| **Job Data Processing** | ❌ No | ✅ Yes |
118+
| **Use Case** | Learning job scheduling | Production job processing |
119+
| **Complexity** | Simple | Moderate |
120+
121+
**Use `job_management.py` when:**
122+
- Learning how to schedule different types of jobs
123+
- Testing job scheduling patterns
124+
- Managing jobs without processing them
125+
126+
**Use `job_processing.py` when:**
127+
- Building applications that process job events
128+
- Need complete end-to-end job workflow
129+
- Want to see job event handling in action
130+
55131
The output should be as follows:
56132

57133
```
@@ -101,6 +177,29 @@ The Jobs API supports multiple schedule formats:
101177
- **data**: Payload data to send when the job is triggered (optional, empty Any proto used if not provided)
102178
- **overwrite**: If true, allows this job to overwrite an existing job with the same name (default: false)
103179

180+
## Handling Job Events
181+
182+
To handle job events when they're triggered, create a callback service using the gRPC extension:
183+
184+
```python
185+
from dapr.ext.grpc import App, JobEvent
186+
187+
app = App()
188+
189+
@app.job_event('my-job')
190+
def handle_my_job(job_event: JobEvent) -> None:
191+
print(f"Job {job_event.name} triggered!")
192+
data_str = job_event.get_data_as_string()
193+
print(f"Job data: {data_str}")
194+
# Process the job...
195+
196+
app.run(50051)
197+
```
198+
199+
The callback service must:
200+
- Use the `@app.job_event('job-name')` decorator to register handlers
201+
- Accept a `JobEvent` object parameter containing job execution data
202+
- Run on a gRPC port (default 50051) that Dapr can reach
104203

105204
## Additional Information
106205

File renamed without changes.

examples/jobs/job_processing.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# ------------------------------------------------------------
2+
# Copyright 2024 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
# ------------------------------------------------------------
13+
14+
import json
15+
import threading
16+
import time
17+
from datetime import datetime, timedelta
18+
from dapr.ext.grpc import App, JobEvent
19+
from dapr.clients import DaprClient, Job
20+
21+
try:
22+
from google.protobuf.any_pb2 import Any as GrpcAny
23+
24+
PROTOBUF_AVAILABLE = True
25+
except ImportError:
26+
PROTOBUF_AVAILABLE = False
27+
print(
28+
'Warning: protobuf not available, jobs with data will be scheduled without data', flush=True
29+
)
30+
31+
app = App()
32+
33+
34+
def create_job_data(data_dict):
35+
"""Create job data from a dictionary."""
36+
if not PROTOBUF_AVAILABLE:
37+
return None
38+
39+
data = GrpcAny()
40+
data.value = json.dumps(data_dict).encode('utf-8')
41+
return data
42+
43+
44+
# Job event handlers
45+
@app.job_event('hello-job')
46+
def handle_hello_job(job_event: JobEvent) -> None:
47+
"""Handle the 'hello-job' job event."""
48+
print(f'Job event received: {job_event.name}', flush=True)
49+
50+
if job_event.data:
51+
data_str = job_event.get_data_as_string()
52+
print(f'Job data: {data_str}', flush=True)
53+
else:
54+
print('Job data: None', flush=True)
55+
56+
print('Hello job processing completed!', flush=True)
57+
58+
59+
@app.job_event('data-job')
60+
def handle_data_job(job_event: JobEvent) -> None:
61+
"""Handle the 'data-job' job event with structured data."""
62+
print(f'Data job event received: {job_event.name}', flush=True)
63+
64+
if job_event.data:
65+
try:
66+
data_str = job_event.get_data_as_string()
67+
job_data = json.loads(data_str)
68+
69+
task_type = job_data.get('task_type', 'unknown')
70+
priority = job_data.get('priority', 'normal')
71+
items = job_data.get('items', 0)
72+
73+
print(f'Processing {task_type} task with priority {priority}', flush=True)
74+
print(f'Processing {items} items...', flush=True)
75+
print('Data job processing completed!', flush=True)
76+
77+
except json.JSONDecodeError as e:
78+
print(f'Failed to parse job data: {e}', flush=True)
79+
else:
80+
print('No data provided for data job', flush=True)
81+
82+
83+
def schedule_jobs():
84+
"""Schedule test jobs after the server starts."""
85+
# Wait for the server to fully start
86+
time.sleep(5)
87+
88+
print('Scheduling jobs...', flush=True)
89+
90+
try:
91+
# Create Dapr client
92+
with DaprClient() as client:
93+
# Calculate due times
94+
due_time_1 = '1s'
95+
due_time_2 = '3s'
96+
97+
# Job 1: Simple hello job (no data)
98+
print(f'Scheduling hello-job for {due_time_1}...', flush=True)
99+
hello_job = Job(name='hello-job', due_time=due_time_1)
100+
client.schedule_job_alpha1(hello_job)
101+
print('✓ hello-job scheduled', flush=True)
102+
103+
# Job 2: Data processing job (with JSON data)
104+
print(f'Scheduling data-job for {due_time_2}...', flush=True)
105+
job_data = {
106+
'task_type': 'data_processing',
107+
'priority': 'high',
108+
'items': 42,
109+
'source': 'test_data',
110+
}
111+
112+
data_job = Job(name='data-job', due_time=due_time_2, data=create_job_data(job_data))
113+
client.schedule_job_alpha1(data_job)
114+
print('✓ data-job scheduled', flush=True)
115+
116+
print('Jobs scheduled! Waiting for execution...', flush=True)
117+
118+
except Exception as e:
119+
print(f'✗ Failed to schedule jobs: {e}', flush=True)
120+
121+
122+
if __name__ == '__main__':
123+
print('Dapr Jobs Example', flush=True)
124+
print('This server will:', flush=True)
125+
print('1. Register job event handlers', flush=True)
126+
print('2. Schedule test jobs', flush=True)
127+
print('3. Process job events when they trigger', flush=True)
128+
129+
# Schedule jobs in a background thread after server starts
130+
threading.Thread(target=schedule_jobs, daemon=True).start()
131+
132+
print('Starting gRPC server on port 50051...', flush=True)
133+
app.run(50051)

ext/dapr-ext-grpc/dapr/ext/grpc/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
limitations under the License.
1414
"""
1515

16-
from dapr.clients.grpc._request import InvokeMethodRequest, BindingRequest
16+
from dapr.clients.grpc._request import InvokeMethodRequest, BindingRequest, JobEvent
1717
from dapr.clients.grpc._response import InvokeMethodResponse, TopicEventResponse
18+
from dapr.clients.grpc._jobs import Job
1819

1920
from dapr.ext.grpc.app import App, Rule # type:ignore
2021

@@ -26,4 +27,6 @@
2627
'InvokeMethodResponse',
2728
'BindingRequest',
2829
'TopicEventResponse',
30+
'Job',
31+
'JobEvent',
2932
]

0 commit comments

Comments
 (0)