Skip to content

Commit b2aa265

Browse files
authored
[ServiceBus/EventHub] unique message/correlation ID for mgmt ops (Azure#39516)
* [SB] Msg IDs on mgmt/cbs ops should be unique * copy over sb pyamqp to eh * add tests * remove disable local auth for tests ci run
1 parent a440dba commit b2aa265

File tree

13 files changed

+145
-46
lines changed

13 files changed

+145
-46
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_platform.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,25 +83,10 @@ def _versionatom(s):
8383
KNOWN_TCP_OPTS.remove("TCP_MAXSEG")
8484
KNOWN_TCP_OPTS.remove("TCP_USER_TIMEOUT")
8585

86-
if sys.version_info < (2, 7, 7): # pragma: no cover
87-
import functools
88-
89-
def _to_bytes_arg(fun):
90-
@functools.wraps(fun)
91-
def _inner(s, *args, **kwargs):
92-
return fun(s.encode(), *args, **kwargs)
93-
94-
return _inner
95-
96-
pack = _to_bytes_arg(struct.pack)
97-
pack_into = _to_bytes_arg(struct.pack_into)
98-
unpack = _to_bytes_arg(struct.unpack)
99-
unpack_from = _to_bytes_arg(struct.unpack_from)
100-
else:
101-
pack = struct.pack
102-
pack_into = struct.pack_into
103-
unpack = struct.unpack
104-
unpack_from = struct.unpack_from
86+
pack = struct.pack
87+
pack_into = struct.pack_into
88+
unpack = struct.unpack
89+
unpack_from = struct.unpack_from
10590

10691
__all__ = [
10792
"LINUX_VERSION",

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_cbs_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import logging
88
from datetime import datetime
9+
from uuid import uuid4
910
from typing import Any, Optional, Union
1011

1112
from ..utils import utc_now, utc_from_timestamp
@@ -72,7 +73,7 @@ async def _put_token(
7273
) -> None:
7374
message = Message( # type: ignore # TODO: missing positional args header, etc.
7475
value=token,
75-
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
76+
properties=Properties(message_id=uuid4()), # type: ignore
7677
application_properties={
7778
CBS_NAME: audience,
7879
CBS_OPERATION: CBS_PUT_TOKEN,
@@ -87,7 +88,6 @@ async def _put_token(
8788
operation=CBS_PUT_TOKEN,
8889
type=token_type,
8990
)
90-
self._mgmt_link.next_message_id += 1
9191

9292
async def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
9393
if self.state in (CbsState.CLOSED, CbsState.ERROR):

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_connection_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Connection: # pylint:disable=too-many-instance-attributes
7070
and 1 for transport type AmqpOverWebsocket.
7171
"""
7272

73-
def __init__( # pylint:disable=too-many-locals
73+
def __init__( # pylint:disable=too-many-locals
7474
self,
7575
endpoint: str,
7676
*,

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_management_link_async.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import time
88
import logging
9+
from uuid import uuid4
910
from functools import partial
1011
from typing import Optional, Union
1112

@@ -35,7 +36,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
3536
"""
3637

3738
def __init__(self, session, endpoint, **kwargs):
38-
self.next_message_id = 0
3939
self.state = ManagementLinkState.IDLE
4040
self._pending_operations = []
4141
self._session = session
@@ -228,17 +228,16 @@ async def execute_operation(
228228
message.application_properties["locales"] = locales
229229
try:
230230
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
231-
new_properties = message.properties._replace(message_id=self.next_message_id)
231+
new_properties = message.properties._replace(message_id=uuid4())
232232
except AttributeError:
233-
new_properties = Properties(message_id=self.next_message_id)
233+
new_properties = Properties(message_id=uuid4())
234234
message = message._replace(properties=new_properties)
235235
expire_time = (time.time() + timeout) if timeout else None
236236
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)
237237

238238
on_send_complete = partial(self._on_send_complete, message_delivery)
239239

240240
await self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
241-
self.next_message_id += 1
242241
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))
243242

244243
async def close(self):

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/cbs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# -------------------------------------------------------------------------
66

77
import logging
8+
from uuid import uuid4
89
from datetime import datetime
910
from typing import Any, Optional, Tuple, Union
1011

@@ -88,7 +89,7 @@ def __init__(
8889
def _put_token(self, token: str, token_type: str, audience: str, expires_on: Optional[datetime] = None) -> None:
8990
message = Message( # type: ignore # TODO: missing positional args header, etc.
9091
value=token,
91-
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
92+
properties=Properties(message_id=uuid4()), # type: ignore
9293
application_properties={
9394
CBS_NAME: audience,
9495
CBS_OPERATION: CBS_PUT_TOKEN,
@@ -103,7 +104,6 @@ def _put_token(self, token: str, token_type: str, audience: str, expires_on: Opt
103104
operation=CBS_PUT_TOKEN,
104105
type=token_type,
105106
)
106-
self._mgmt_link.next_message_id += 1
107107

108108
def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
109109
if self.state in (CbsState.CLOSED, CbsState.ERROR):

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# --------------------------------------------------------------------------
1+
# -------------------------------------------------------------------------
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# Licensed under the MIT License. See License.txt in the project root for
44
# license information.

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/management_link.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import time
88
import logging
9+
from uuid import uuid4
910
from functools import partial
1011
from collections import namedtuple
1112
from typing import Optional, Union
@@ -37,7 +38,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
3738
"""
3839

3940
def __init__(self, session, endpoint, **kwargs):
40-
self.next_message_id = 0
4141
self.state = ManagementLinkState.IDLE
4242
self._pending_operations = []
4343
self._session = session
@@ -228,17 +228,16 @@ def execute_operation(
228228
message.application_properties["locales"] = locales
229229
try:
230230
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
231-
new_properties = message.properties._replace(message_id=self.next_message_id)
231+
new_properties = message.properties._replace(message_id=uuid4())
232232
except AttributeError:
233-
new_properties = Properties(message_id=self.next_message_id)
233+
new_properties = Properties(message_id=uuid4())
234234
message = message._replace(properties=new_properties)
235235
expire_time = (time.time() + timeout) if timeout else None
236236
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)
237237

238238
on_send_complete = partial(self._on_send_complete, message_delivery)
239239

240240
self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
241-
self.next_message_id += 1
242241
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))
243242

244243
def close(self):

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_cbs_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import logging
88
from datetime import datetime
9+
from uuid import uuid4
910
from typing import Any, Optional, Union
1011

1112
from ..utils import utc_now, utc_from_timestamp
@@ -72,7 +73,7 @@ async def _put_token(
7273
) -> None:
7374
message = Message( # type: ignore # TODO: missing positional args header, etc.
7475
value=token,
75-
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
76+
properties=Properties(message_id=uuid4()), # type: ignore
7677
application_properties={
7778
CBS_NAME: audience,
7879
CBS_OPERATION: CBS_PUT_TOKEN,
@@ -87,7 +88,6 @@ async def _put_token(
8788
operation=CBS_PUT_TOKEN,
8889
type=token_type,
8990
)
90-
self._mgmt_link.next_message_id += 1
9191

9292
async def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
9393
if self.state in (CbsState.CLOSED, CbsState.ERROR):

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_management_link_async.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import time
88
import logging
9+
from uuid import uuid4
910
from functools import partial
1011
from typing import Optional, Union
1112

@@ -35,7 +36,6 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
3536
"""
3637

3738
def __init__(self, session, endpoint, **kwargs):
38-
self.next_message_id = 0
3939
self.state = ManagementLinkState.IDLE
4040
self._pending_operations = []
4141
self._session = session
@@ -228,17 +228,16 @@ async def execute_operation(
228228
message.application_properties["locales"] = locales
229229
try:
230230
# TODO: namedtuple is immutable, which may push us to re-think about the namedtuple approach for Message
231-
new_properties = message.properties._replace(message_id=self.next_message_id)
231+
new_properties = message.properties._replace(message_id=uuid4())
232232
except AttributeError:
233-
new_properties = Properties(message_id=self.next_message_id)
233+
new_properties = Properties(message_id=uuid4())
234234
message = message._replace(properties=new_properties)
235235
expire_time = (time.time() + timeout) if timeout else None
236236
message_delivery = _MessageDelivery(message, MessageDeliveryState.WaitingToBeSent, expire_time)
237237

238238
on_send_complete = partial(self._on_send_complete, message_delivery)
239239

240240
await self._request_link.send_transfer(message, on_send_complete=on_send_complete, timeout=timeout)
241-
self.next_message_id += 1
242241
self._pending_operations.append(PendingManagementOperation(message, on_execute_operation_complete))
243242

244243
async def close(self):

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/cbs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# -------------------------------------------------------------------------
66

77
import logging
8+
from uuid import uuid4
89
from datetime import datetime
910
from typing import Any, Optional, Tuple, Union
1011

@@ -88,7 +89,7 @@ def __init__(
8889
def _put_token(self, token: str, token_type: str, audience: str, expires_on: Optional[datetime] = None) -> None:
8990
message = Message( # type: ignore # TODO: missing positional args header, etc.
9091
value=token,
91-
properties=Properties(message_id=self._mgmt_link.next_message_id), # type: ignore
92+
properties=Properties(message_id=uuid4()), # type: ignore
9293
application_properties={
9394
CBS_NAME: audience,
9495
CBS_OPERATION: CBS_PUT_TOKEN,
@@ -103,7 +104,6 @@ def _put_token(self, token: str, token_type: str, audience: str, expires_on: Opt
103104
operation=CBS_PUT_TOKEN,
104105
type=token_type,
105106
)
106-
self._mgmt_link.next_message_id += 1
107107

108108
def _on_amqp_management_open_complete(self, management_open_result: ManagementOpenResult) -> None:
109109
if self.state in (CbsState.CLOSED, CbsState.ERROR):

0 commit comments

Comments
 (0)