Skip to content

Commit 70470de

Browse files
authored
[ServiceBus/EventHub] lock pending deliveries on send (Azure#38067)
* [ServiceBus/EventHub] lock pending deliveries on send * remove misc logging * changelog + test * fix tests, remove session lock * remove logging from test * sync with sb * add todo in sender.py tfor temporary fix
1 parent d17f44b commit 70470de

File tree

4 files changed

+99
-38
lines changed

4 files changed

+99
-38
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import uuid
88
import logging
99
import time
10+
from threading import Lock
1011

1112
from ._encode import encode_payload
1213
from .link import Link
@@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs):
4546
kwargs["source_address"] = "sender-link-{}".format(name)
4647
super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs)
4748
self._pending_deliveries = []
49+
self.lock = Lock()
4850

4951
@classmethod
5052
def from_incoming_frame(cls, session, handle, frame):
@@ -139,21 +141,24 @@ def _on_session_state_change(self):
139141
super()._on_session_state_change()
140142

141143
def update_pending_deliveries(self):
142-
if self.current_link_credit <= 0:
143-
self.current_link_credit = self.link_credit
144-
self._outgoing_flow()
145-
now = time.time()
146-
pending = []
147-
for delivery in self._pending_deliveries:
148-
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
149-
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
150-
continue
151-
if not delivery.sent:
152-
sent_and_settled = self._outgoing_transfer(delivery)
153-
if sent_and_settled:
144+
# TODO: Temporary fix until connection.listen removed from keep alive thread.
145+
with self.lock:
146+
if self.current_link_credit <= 0:
147+
self.current_link_credit = self.link_credit
148+
self._outgoing_flow()
149+
now = time.time()
150+
pending = []
151+
152+
for delivery in self._pending_deliveries:
153+
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
154+
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
154155
continue
155-
pending.append(delivery)
156-
self._pending_deliveries = pending
156+
if not delivery.sent:
157+
sent_and_settled = self._outgoing_transfer(delivery)
158+
if sent_and_settled:
159+
continue
160+
pending.append(delivery)
161+
self._pending_deliveries = pending
157162

158163
def send_transfer(self, message, *, send_async=False, **kwargs):
159164
self._check_if_closed()

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
### Bugs Fixed
1111

12+
- Fixed a bug where sending large messages with synchronous client caused a frame buffer offset error ([#37916](https://github.com/Azure/azure-sdk-for-python/issues/37916))
13+
1214
### Other Changes
1315

1416
## 7.13.0 (2024-11-12)

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import uuid
88
import logging
99
import time
10+
from threading import Lock
1011

1112
from ._encode import encode_payload
1213
from .link import Link
@@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs):
4546
kwargs["source_address"] = "sender-link-{}".format(name)
4647
super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs)
4748
self._pending_deliveries = []
49+
self.lock = Lock()
4850

4951
@classmethod
5052
def from_incoming_frame(cls, session, handle, frame):
@@ -139,21 +141,24 @@ def _on_session_state_change(self):
139141
super()._on_session_state_change()
140142

141143
def update_pending_deliveries(self):
142-
if self.current_link_credit <= 0:
143-
self.current_link_credit = self.link_credit
144-
self._outgoing_flow()
145-
now = time.time()
146-
pending = []
147-
for delivery in self._pending_deliveries:
148-
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
149-
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
150-
continue
151-
if not delivery.sent:
152-
sent_and_settled = self._outgoing_transfer(delivery)
153-
if sent_and_settled:
144+
# TODO: Temporary fix until connection.listen removed from keep alive thread.
145+
with self.lock:
146+
if self.current_link_credit <= 0:
147+
self.current_link_credit = self.link_credit
148+
self._outgoing_flow()
149+
now = time.time()
150+
pending = []
151+
152+
for delivery in self._pending_deliveries:
153+
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
154+
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
154155
continue
155-
pending.append(delivery)
156-
self._pending_deliveries = pending
156+
if not delivery.sent:
157+
sent_and_settled = self._outgoing_transfer(delivery)
158+
if sent_and_settled:
159+
continue
160+
pending.append(delivery)
161+
self._pending_deliveries = pending
157162

158163
def send_transfer(self, message, *, send_async=False, **kwargs):
159164
self._check_if_closed()

sdk/servicebus/azure-servicebus/tests/test_topic.py

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
import logging
88
import pytest
9+
import time
10+
import json
11+
import sys
912

1013
from devtools_testutils import AzureMgmtRecordedTestCase, RandomNameResourceGroupPreparer, get_credential
1114

@@ -36,7 +39,7 @@ class TestServiceBusTopics(AzureMgmtRecordedTestCase):
3639
@CachedServiceBusTopicPreparer(name_prefix="servicebustest")
3740
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
3841
@ArgPasser()
39-
def test_topic_by_servicebus_client_conn_str_send_basic(
42+
def test_topic_by_servicebus_client_send_basic(
4043
self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs
4144
):
4245
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
@@ -58,15 +61,15 @@ def test_topic_by_servicebus_client_conn_str_send_basic(
5861
@CachedServiceBusTopicPreparer(name_prefix="servicebustest")
5962
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
6063
@ArgPasser()
61-
def test_topic_by_sas_token_credential_conn_str_send_basic(
62-
self,
63-
uamqp_transport,
64-
*,
65-
servicebus_namespace=None,
66-
servicebus_namespace_key_name=None,
67-
servicebus_namespace_primary_key=None,
68-
servicebus_topic=None,
69-
**kwargs,
64+
def test_topic_by_sas_token_credential_send_basic(
65+
self,
66+
uamqp_transport,
67+
*,
68+
servicebus_namespace=None,
69+
servicebus_namespace_key_name=None,
70+
servicebus_namespace_primary_key=None,
71+
servicebus_topic=None,
72+
**kwargs
7073
):
7174
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
7275
with ServiceBusClient(
@@ -111,3 +114,49 @@ def test_topic_by_servicebus_client_list_topics(
111114
topics = client.list_topics()
112115
assert len(topics) >= 1
113116
# assert all(isinstance(t, TopicClient) for t in topics)
117+
118+
@pytest.mark.liveTest
119+
@pytest.mark.live_test_only
120+
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
121+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
122+
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
123+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
124+
@ArgPasser()
125+
def test_topic_by_servicebus_client_send_large_messages_w_sleep(self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs):
126+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
127+
credential = get_credential()
128+
129+
# message of 100 kb - requires multiple transfer frames
130+
size = 100
131+
large_dict = {
132+
"key": "A" * 1024
133+
}
134+
for i in range(size):
135+
large_dict[f"key_{i}"] = "A" * 1024
136+
body = json.dumps(large_dict)
137+
138+
sb_client = ServiceBusClient(
139+
fully_qualified_namespace=fully_qualified_namespace,
140+
credential=credential,
141+
logging_enable=True,
142+
uamqp_transport=uamqp_transport
143+
)
144+
145+
# This issue doesn't repro unless logging is added here w/ this socket timeout,
146+
# seemingly due to slowing down and some threading behavior.
147+
# Adding in the logging here to make sure this bug is being hit and tested.
148+
sender = sb_client.get_topic_sender(servicebus_topic.name, socket_timeout=60)
149+
for i in range(10):
150+
try:
151+
time.sleep(10)
152+
logging.info("sender created for %d", i)
153+
size_in_bytes = sys.getsizeof(body)
154+
155+
# Convert bytes to kilobytes (KB)
156+
size_in_kb = size_in_bytes / 1024
157+
logging.info(f"size of body: {size_in_kb:.2f} KB")
158+
sender.send_messages(ServiceBusMessage(body))
159+
logging.info(f"Message sent %d successfully", i)
160+
except Exception as e:
161+
logging.error(f"Error sending message %d: %s", i, str(e))
162+
raise

0 commit comments

Comments
 (0)