diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index 46358437..43afdd0b 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -227,6 +227,56 @@ with DaprClient() as d: resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}') ``` + +Send [CloudEvents](https://cloudevents.io/) messages with a json payload: +```python +from dapr.clients import DaprClient +import json + +with DaprClient() as d: + cloud_event = { + 'specversion': '1.0', + 'type': 'com.example.event', + 'source': 'my-service', + 'id': 'myid', + 'data': {'id': 1, 'message': 'hello world'}, + 'datacontenttype': 'application/json', + } + + # Set the data content type to 'application/cloudevents+json' + resp = d.publish_event( + pubsub_name='pubsub', + topic_name='TOPIC_CE', + data=json.dumps(cloud_event), + data_content_type='application/cloudevents+json', + ) +``` + +Publish [CloudEvents](https://cloudevents.io/) messages with plain text payload: +```python +from dapr.clients import DaprClient +import json + +with DaprClient() as d: + cloud_event = { + 'specversion': '1.0', + 'type': 'com.example.event', + 'source': 'my-service', + 'id': "myid", + 'data': 'hello world', + 'datacontenttype': 'text/plain', + } + + # Set the data content type to 'application/cloudevents+json' + resp = d.publish_event( + pubsub_name='pubsub', + topic_name='TOPIC_CE', + data=json.dumps(cloud_event), + data_content_type='application/cloudevents+json', + ) +``` + + #### Subscribe to messages ```python diff --git a/examples/pubsub-simple/README.md b/examples/pubsub-simple/README.md index f4730a42..8abfad96 100644 --- a/examples/pubsub-simple/README.md +++ b/examples/pubsub-simple/README.md @@ -37,6 +37,11 @@ expected_stdout_lines: - '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"' - '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD' - '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D' + - '== APP == Subscriber received: TOPIC_CE' + - '== APP == Subscriber received a json cloud event: id=8, message="hello world", content_type="application/json"' + - '== APP == Subscriber received: TOPIC_CE' + - '== APP == Subscriber received plain text cloud event: hello world, content_type="text/plain"' + output_match_mode: substring background: true match_order: none @@ -45,7 +50,7 @@ sleep: 3 ```bash # 1. Start Subscriber (expose gRPC server receiver on port 50051) -dapr run --app-id python-subscriber --app-protocol grpc --app-port 50051 python3 subscriber.py +dapr run --app-id python-subscriber --app-protocol grpc --app-port 50051 -- python3 subscriber.py ``` @@ -60,6 +65,10 @@ expected_stdout_lines: - "== APP == {'id': 3, 'message': 'hello world'}" - "== APP == {'id': 4, 'message': 'hello world'}" - "== APP == {'id': 5, 'message': 'hello world'}" + - "== APP == {'id': 6, 'message': 'hello world'}" + - "== APP == {'id': 7, 'message': 'hello world'}" + - "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-8', 'data': {'id': 8, 'message': 'hello world'}, 'datacontenttype': 'application/json'}" + - "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-10', 'data': 'hello world', 'datacontenttype': 'text/plain'}" background: true sleep: 15 --> diff --git a/examples/pubsub-simple/publisher.py b/examples/pubsub-simple/publisher.py index f6681f9e..e5954c65 100644 --- a/examples/pubsub-simple/publisher.py +++ b/examples/pubsub-simple/publisher.py @@ -66,3 +66,51 @@ # Print the request print(req_data, flush=True) + + ## Send a cloud event with json data + id = 8 + cloud_event = { + 'specversion': '1.0', + 'type': 'com.example.event', + 'source': 'my-service', + 'id': f'abc-{id}', + 'data': {'id': id, 'message': 'hello world'}, + 'datacontenttype': 'application/json', + } + + # Set the data content type to 'application/cloudevents+json' + resp = d.publish_event( + pubsub_name='pubsub', + topic_name='TOPIC_CE', + data=json.dumps(cloud_event), + data_content_type='application/cloudevents+json', + ) + + # Print the request + print(cloud_event, flush=True) + + time.sleep(0.5) + + # Send a cloud event with plain text data + id = 10 + cloud_event = { + 'specversion': '1.0', + 'type': 'com.example.event', + 'source': 'my-service', + 'id': f'abc-{id}', + 'data': 'hello world', + 'datacontenttype': 'text/plain', + } + + # Set the data content type to 'application/cloudevents+json' + resp = d.publish_event( + pubsub_name='pubsub', + topic_name='TOPIC_CE', + data=json.dumps(cloud_event), + data_content_type='application/cloudevents+json', + ) + + # Print the request + print(cloud_event, flush=True) + + time.sleep(0.5) diff --git a/examples/pubsub-simple/subscriber.py b/examples/pubsub-simple/subscriber.py index b905aaa6..daa11bc8 100644 --- a/examples/pubsub-simple/subscriber.py +++ b/examples/pubsub-simple/subscriber.py @@ -40,6 +40,42 @@ def mytopic(event: v1.Event) -> TopicEventResponse: return TopicEventResponse('success') +@app.subscribe(pubsub_name='pubsub', topic='TOPIC_CE') +def receive_cloud_events(event: v1.Event) -> TopicEventResponse: + print('Subscriber received: ' + event.Subject(), flush=True) + + content_type = event.content_type + data = event.Data() + + try: + if content_type == 'application/json': + # Handle JSON data + json_data = json.loads(data) + print( + f'Subscriber received a json cloud event: id={json_data["id"]}, message="{json_data["message"]}", ' + f'content_type="{event.content_type}"', + flush=True, + ) + elif content_type == 'text/plain': + # Handle plain text data + if isinstance(data, bytes): + data = data.decode('utf-8') + print( + f'Subscriber received plain text cloud event: {data}, ' + f'content_type="{content_type}"', + flush=True, + ) + else: + print(f'Received unknown content type: {content_type}', flush=True) + return TopicEventResponse('fail') + + except Exception as e: + print('Failed to process event data:', e, flush=True) + return TopicEventResponse('fail') + + return TopicEventResponse('success') + + @app.subscribe(pubsub_name='pubsub', topic='TOPIC_D', dead_letter_topic='TOPIC_D_DEAD') def fail_and_send_to_dead_topic(event: v1.Event) -> TopicEventResponse: return TopicEventResponse('retry')