Skip to content

Commit 92dda8c

Browse files
l0lawrenceswathipil
andcommitted
[EGv2] Binary mode (#32922)
* [EGv2] Build Release (#30325) * move old sdk under legacy * gen typespec code * naming changes from archboard * samples * update patch naming * update imports with new gen * update samples * update client naming on aio * update receive op * update async to close client * update receive() * update gen code * moving around samples * updating samples * update samples * update patch and samples * patch internalmodels * spacing * updating model patch * update patch models * add both models back * update docstring * update docs * updating patch for receive * old EG models * add reject samples * patch * update format * update patch * eventgrid_client exceptions * update test imports * update total sample * receive patch fix * add in more tests * update test file * remove locktoken model * remove LockToken in patch * remove event delivery delay * eg client exceptions * .8.5 generation, and deliveryCount * rename sample * update version for beta * changelog * updating for gen * regen * generate via commit * publish result * fix docstring * publish docstring * return type * publish result * return publish result -- is none * format * update Publish result model * deliverycount patch * update from main * add copyright * added to readme * remove from readme * force publish_result response * update patch tp unindent * cspell * update mypy.ini * import order * mark livetest * update operations init * rename async * mypy * ignore mypy * pylint * pylint * ignore pylint for now to avoid gen code errors * ignore samples until ARM setup * update patches * remove publish result * remove PublishResult * remove publishresult * comma Co-authored-by: swathipil <[email protected]> * update publishResult * change to .value * gen code " to ' * remove comment * ran black * update changelog * update sample readme * gen code without query name * gen code * update tsp commit * remove publishresult * readme disclaimer * update changelog --------- Co-authored-by: swathipil <[email protected]> * Beta LiveTests (#30728) * add bicep file for tests * update output * update test * secret sanitization * refactor failing test * update conftest * update assets and sanitizers * update preparer loc * update conftest * conftest * update conftest * remove variables for now * update assets * update tests * try to update regex * update recordings * update conftest * update preparer * update test * update exception test * update tests * update asset * update conftest * pr comments * default needs to be eastus * import * [EGv2] Build Release (#30325) * move old sdk under legacy * gen typespec code * naming changes from archboard * samples * update patch naming * update imports with new gen * update samples * update client naming on aio * update receive op * update async to close client * update receive() * update gen code * moving around samples * updating samples * update samples * update patch and samples * patch internalmodels * spacing * updating model patch * update patch models * add both models back * update docstring * update docs * updating patch for receive * old EG models * add reject samples * patch * update format * update patch * eventgrid_client exceptions * update test imports * update total sample * receive patch fix * add in more tests * update test file * remove locktoken model * remove LockToken in patch * remove event delivery delay * eg client exceptions * .8.5 generation, and deliveryCount * rename sample * update version for beta * changelog * updating for gen * regen * generate via commit * publish result * fix docstring * publish docstring * return type * publish result * return publish result -- is none * format * update Publish result model * deliverycount patch * update from main * add copyright * added to readme * remove from readme * force publish_result response * update patch tp unindent * cspell * update mypy.ini * import order * mark livetest * update operations init * rename async * mypy * ignore mypy * pylint * pylint * ignore pylint for now to avoid gen code errors * ignore samples until ARM setup * update patches * remove publish result * remove PublishResult * remove publishresult * comma Co-authored-by: swathipil <[email protected]> * update publishResult * change to .value * gen code " to ' * remove comment * ran black * update changelog * update sample readme * gen code without query name * gen code * update tsp commit * remove publishresult * readme disclaimer * update changelog --------- Co-authored-by: swathipil <[email protected]> * fix merge * dont go to generated before binary * update patch * update patches * eventgrid client patch * changes * add * update test * update tyoe checking * pass through binary_mode for now -- * update patch aio * add async func * update * sys * update kwargs * add Todo and start adding more tests * update * differentiate between binary and not * update binary * no base64 in binary mode * binary * try JSONEncoder on everything if not str/bytes * update test * update test * update changes * whitespace * space * remove commented * str serialize extensions? * xml test * encode extensions as object * update test * update extension serialization for deserialize * move flag to operation level * extra comma * dont raise httpresponse * update patch * accept dict cloud events * spacing * remove content_type check * add live test * remove live test mark * update * use env vars * update test * only run live test * comment * typo * error incorrect * start comments * update test * add sample * update tests * update docstrings to add clarity * update err message * remove generated cloud event * update sample * update * update samples to include dict * update patch * spacing * add comments * formatting * update doc * update tests * update tests * tests * skip tests for now * typo * add dict binary mode * update docstring * update patch to allow throw error * first pass at comments * update patch eror * nit --------- Co-authored-by: swathipil <[email protected]>
1 parent 80b6d30 commit 92dda8c

File tree

11 files changed

+826
-59
lines changed

11 files changed

+826
-59
lines changed

sdk/eventgrid/azure-eventgrid/azure/eventgrid/_operations/_patch.py

Lines changed: 306 additions & 36 deletions
Large diffs are not rendered by default.

sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_operations/_patch.py

Lines changed: 190 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,36 @@
55
"""Customize generated code here.
66
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
77
"""
8-
from typing import List, overload, Union, Any, Optional
8+
from typing import List, overload, Union, Any, Optional, Callable, Dict, TypeVar
9+
import sys
910
from azure.core.messaging import CloudEvent
11+
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error
1012
from azure.core.tracing.decorator_async import distributed_trace_async
13+
from azure.core.pipeline import PipelineResponse
14+
from azure.core.rest import HttpRequest, AsyncHttpResponse
15+
from azure.core.utils import case_insensitive_dict
1116
from ...models._patch import ReceiveResult, ReceiveDetails
12-
from ..._operations._patch import _cloud_event_to_generated
17+
from ..._operations._patch import _to_http_request
1318
from ._operations import EventGridClientOperationsMixin as OperationsMixin
14-
19+
from ... import models as _models
20+
from ..._model_base import _deserialize
21+
if sys.version_info >= (3, 9):
22+
from collections.abc import MutableMapping
23+
else:
24+
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
25+
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
26+
T = TypeVar('T')
27+
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
1528

1629
class EventGridClientOperationsMixin(OperationsMixin):
30+
1731
@overload
1832
async def publish_cloud_events(
1933
self,
2034
topic_name: str,
2135
body: List[CloudEvent],
2236
*,
37+
binary_mode: bool = False,
2338
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
2439
**kwargs: Any
2540
) -> None:
@@ -33,6 +48,10 @@ async def publish_cloud_events(
3348
:type topic_name: str
3449
:param body: Array of Cloud Events being published. Required.
3550
:type body: list[~azure.core.messaging.CloudEvent]
51+
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
52+
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
53+
If 'datacontenttype` is not specified the default content type is `application/cloudevents-batch+json; charset=utf-8`.
54+
Requires CloudEvent data to be passed in as bytes.
3655
:keyword content_type: content type. Default value is "application/cloudevents-batch+json;
3756
charset=utf-8".
3857
:paramtype content_type: str
@@ -49,6 +68,7 @@ async def publish_cloud_events(
4968
topic_name: str,
5069
body: CloudEvent,
5170
*,
71+
binary_mode: bool = False,
5272
content_type: str = "application/cloudevents+json; charset=utf-8",
5373
**kwargs: Any
5474
) -> None:
@@ -62,6 +82,78 @@ async def publish_cloud_events(
6282
:type topic_name: str
6383
:param body: Single Cloud Event being published. Required.
6484
:type body: ~azure.core.messaging.CloudEvent
85+
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
86+
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
87+
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
88+
Requires CloudEvent data to be passed in as bytes.
89+
:keyword content_type: content type. Default value is "application/cloudevents+json;
90+
charset=utf-8".
91+
:paramtype content_type: str
92+
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
93+
will have to context manage the returned stream.
94+
:return: None
95+
:rtype: None
96+
:raises ~azure.core.exceptions.HttpResponseError:
97+
"""
98+
99+
@overload
100+
async def publish_cloud_events(
101+
self,
102+
topic_name: str,
103+
body: Dict[str, Any],
104+
*,
105+
binary_mode: bool = False,
106+
content_type: str = "application/cloudevents+json; charset=utf-8",
107+
**kwargs: Any
108+
) -> None:
109+
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
110+
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
111+
various error codes. For example, 401: which indicates authorization failure, 403: which
112+
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
113+
not found, 400: for bad request, and 500: for internal server error.
114+
115+
:param topic_name: Topic Name. Required.
116+
:type topic_name: str
117+
:param body: Single Cloud Event being published. Required.
118+
:type body: dict[str, Any]
119+
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
120+
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
121+
If `datacontenttype` is not specified, the default content type is `application/cloudevents+json; charset=utf-8`.
122+
Requires CloudEvent data to be passed in as bytes.
123+
:keyword content_type: content type. Default value is "application/cloudevents+json;
124+
charset=utf-8".
125+
:paramtype content_type: str
126+
:keyword bool stream: Whether to stream the response of this operation. Defaults to False. You
127+
will have to context manage the returned stream.
128+
:return: None
129+
:rtype: None
130+
:raises ~azure.core.exceptions.HttpResponseError:
131+
"""
132+
133+
@overload
134+
async def publish_cloud_events(
135+
self,
136+
topic_name: str,
137+
body: List[Dict[str, Any]],
138+
*,
139+
binary_mode: bool = False,
140+
content_type: str = "application/cloudevents-batch+json; charset=utf-8",
141+
**kwargs: Any
142+
) -> None:
143+
"""Publish Single Cloud Event to namespace topic. In case of success, the server responds with an
144+
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
145+
various error codes. For example, 401: which indicates authorization failure, 403: which
146+
indicates quota exceeded or message is too large, 410: which indicates that specific topic is
147+
not found, 400: for bad request, and 500: for internal server error.
148+
149+
:param topic_name: Topic Name. Required.
150+
:type topic_name: str
151+
:param body: Batch of Cloud Events being published. Required.
152+
:type body: list[dict[str, Any]]
153+
:keyword bool binary_mode: Whether to publish a CloudEvent in binary mode. Defaults to False.
154+
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
155+
If 'datacontenttype` is not specified, the default content type is `application/cloudevents-batch+json; charset=utf-8`.
156+
Requires CloudEvent data to be passed in as bytes.
65157
:keyword content_type: content type. Default value is "application/cloudevents+json;
66158
charset=utf-8".
67159
:paramtype content_type: str
@@ -74,7 +166,12 @@ async def publish_cloud_events(
74166

75167
@distributed_trace_async
76168
async def publish_cloud_events(
77-
self, topic_name: str, body: Union[List[CloudEvent], CloudEvent], **kwargs
169+
self,
170+
topic_name: str,
171+
body: Union[List[CloudEvent], CloudEvent, List[Dict[str, Any]], Dict[str, Any]],
172+
*,
173+
binary_mode: bool = False,
174+
**kwargs
78175
) -> None:
79176
"""Publish Batch Cloud Event or Events to namespace topic. In case of success, the server responds with an
80177
HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return
@@ -85,7 +182,11 @@ async def publish_cloud_events(
85182
:param topic_name: Topic Name. Required.
86183
:type topic_name: str
87184
:param body: Cloud Event or Array of Cloud Events being published. Required.
88-
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent]
185+
:type body: ~azure.core.messaging.CloudEvent or list[~azure.core.messaging.CloudEvent] or dict[str, any] or list[dict[str, any]]
186+
:keyword bool binary_mode: Whether to publish the events in binary mode. Defaults to False.
187+
When True and `datacontenttype` is specified in CloudEvent, content type is set to `datacontenttype`.
188+
If not specified, the default content type is "application/cloudevents+json; charset=utf-8".
189+
Requires CloudEvent data to be passed in as bytes.
89190
:keyword content_type: content type. Default value is "application/cloudevents+json;
90191
charset=utf-8".
91192
:paramtype content_type: str
@@ -95,16 +196,30 @@ async def publish_cloud_events(
95196
:rtype: None
96197
:raises ~azure.core.exceptions.HttpResponseError:
97198
"""
199+
200+
# Check that the body is a CloudEvent or list of CloudEvents even if dict
201+
if isinstance(body, dict) or (isinstance(body, list) and isinstance(body[0], dict)):
202+
try:
203+
if isinstance(body, list):
204+
body = [CloudEvent.from_dict(event) for event in body]
205+
else:
206+
body = CloudEvent.from_dict(body)
207+
except AttributeError:
208+
raise TypeError("Incorrect type for body. Expected CloudEvent,"
209+
" list of CloudEvents, dict, or list of dicts."
210+
" If dict passed, must follow the CloudEvent format.")
211+
212+
98213
if isinstance(body, CloudEvent):
99214
kwargs["content_type"] = "application/cloudevents+json; charset=utf-8"
100-
internal_body = _cloud_event_to_generated(body)
101-
await self._publish_cloud_event(topic_name, internal_body, **kwargs)
102-
else:
215+
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
216+
elif isinstance(body, list):
103217
kwargs["content_type"] = "application/cloudevents-batch+json; charset=utf-8"
104-
internal_body_list = []
105-
for item in body:
106-
internal_body_list.append(_cloud_event_to_generated(item))
107-
await self._publish_cloud_events(topic_name, internal_body_list, **kwargs)
218+
await self._publish(topic_name, body, self._config.api_version, binary_mode, **kwargs)
219+
else:
220+
raise TypeError("Incorrect type for body. Expected CloudEvent,"
221+
" list of CloudEvents, dict, or list of dicts."
222+
" If dict passed, must follow the CloudEvent format.")
108223

109224
@distributed_trace_async
110225
async def receive_cloud_events(
@@ -158,6 +273,69 @@ async def receive_cloud_events(
158273
receive_result_deserialized = ReceiveResult(value=detail_items)
159274
return receive_result_deserialized
160275

276+
async def _publish(self, topic_name: str, event: Any, api_version, binary_mode, **kwargs: Any) -> None:
277+
278+
error_map = {
279+
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
280+
}
281+
error_map.update(kwargs.pop('error_map', {}) or {})
282+
283+
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
284+
_params = kwargs.pop("params", {}) or {}
285+
286+
cls: ClsType[_models._models.PublishResult] = kwargs.pop( # pylint: disable=protected-access
287+
'cls', None
288+
)
289+
290+
content_type: str = kwargs.pop('content_type', _headers.pop('content-type', "application/cloudevents+json; charset=utf-8"))
291+
292+
# Given that we know the cloud event is binary mode, we can convert it to a HTTP request
293+
http_request = _to_http_request(
294+
topic_name=topic_name,
295+
api_version=api_version,
296+
headers=_headers,
297+
params=_params,
298+
content_type=content_type,
299+
event=event,
300+
binary_mode=binary_mode,
301+
**kwargs
302+
)
303+
304+
_stream = kwargs.pop("stream", False)
305+
306+
path_format_arguments = {
307+
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
308+
}
309+
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)
310+
311+
_stream = kwargs.pop("stream", False)
312+
pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
313+
http_request,
314+
stream=_stream,
315+
**kwargs
316+
)
317+
318+
response = pipeline_response.http_response
319+
320+
if response.status_code not in [200]:
321+
if _stream:
322+
await response.read() # Load the body in memory and close the socket
323+
map_error(status_code=response.status_code, response=response, error_map=error_map)
324+
raise HttpResponseError(response=response)
325+
326+
if _stream:
327+
deserialized = response.iter_bytes()
328+
else:
329+
deserialized = _deserialize(
330+
_models._models.PublishResult, # pylint: disable=protected-access
331+
response.json()
332+
)
333+
334+
if cls:
335+
return cls(pipeline_response, deserialized, {}) # type: ignore
336+
337+
return deserialized # type: ignore
338+
161339

162340
__all__: List[str] = [
163341
"EventGridClientOperationsMixin"
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# --------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
# --------------------------------------------------------------------------
6+
import os
7+
import asyncio
8+
import json
9+
from azure.core.credentials import AzureKeyCredential
10+
from azure.eventgrid.aio import EventGridClient
11+
from azure.eventgrid.models import *
12+
from azure.core.messaging import CloudEvent
13+
from azure.core.exceptions import HttpResponseError
14+
15+
16+
EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
17+
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
18+
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
19+
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]
20+
21+
# Create a client
22+
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))
23+
24+
25+
async def run():
26+
async with client:
27+
# Publish a CloudEvent
28+
try:
29+
# Publish CloudEvent in binary mode with str encoded as bytes
30+
cloud_event_dict = {"data":b"HI", "source":"https://example.com", "type":"example", "datacontenttype":"text/plain"}
31+
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)
32+
33+
# Publish CloudEvent in binary mode with json encoded as bytes
34+
cloud_event = CloudEvent(data=json.dumps({"hello":"data"}).encode("utf-8"), source="https://example.com", type="example", datacontenttype="application/json")
35+
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event, binary_mode=True)
36+
37+
receive_result = await client.receive_cloud_events(
38+
topic_name=TOPIC_NAME,
39+
event_subscription_name=EVENT_SUBSCRIPTION_NAME,
40+
max_events=10,
41+
max_wait_time=10,
42+
)
43+
for details in receive_result.value:
44+
cloud_event_received = details.event
45+
print("CloudEvent: ", cloud_event_received)
46+
print("Data: ", cloud_event_received.data)
47+
except HttpResponseError:
48+
raise
49+
50+
if __name__ == "__main__":
51+
asyncio.get_event_loop().run_until_complete(run())

sdk/eventgrid/azure-eventgrid/samples/async_samples/eventgrid_client_samples/sample_publish_operation_async.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,30 @@
1414

1515
EVENTGRID_KEY: str = os.environ["EVENTGRID_KEY"]
1616
EVENTGRID_ENDPOINT: str = os.environ["EVENTGRID_ENDPOINT"]
17-
TOPIC_NAME: str = os.environ["TOPIC_NAME"]
18-
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENT_SUBSCRIPTION_NAME"]
17+
TOPIC_NAME: str = os.environ["EVENTGRID_TOPIC_NAME"]
18+
EVENT_SUBSCRIPTION_NAME: str = os.environ["EVENTGRID_EVENT_SUBSCRIPTION_NAME"]
1919

2020
# Create a client
2121
client = EventGridClient(EVENTGRID_ENDPOINT, AzureKeyCredential(EVENTGRID_KEY))
2222

2323

2424
async def run():
2525
async with client:
26+
27+
# Publish a CloudEvent as dict
28+
try:
29+
cloud_event_dict = {"data": "hello", "source": "https://example.com", "type": "example"}
30+
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=cloud_event_dict)
31+
except HttpResponseError:
32+
raise
33+
34+
# Publish a list of CloudEvents as dict
35+
try:
36+
await client.publish_cloud_events(topic_name=TOPIC_NAME, body=[cloud_event_dict, cloud_event_dict])
37+
except HttpResponseError:
38+
raise
39+
40+
2641
# Publish a CloudEvent
2742
try:
2843
cloud_event = CloudEvent(

0 commit comments

Comments
 (0)