Skip to content

Commit 70fc7d5

Browse files
authored
[SB] Missing Link Credit Subtraction (#37427)
* move back down but subtract link credit * date * nit * link credit math * Update sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py * await * comments * pylint * eventhub + mock test * pin dev req * Update sdk/servicebus/azure-servicebus/CHANGELOG.md * mock only outgoing_flow
1 parent 4c47dae commit 70fc7d5

File tree

18 files changed

+137
-35
lines changed

18 files changed

+137
-35
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async def _keep_alive_async(self):
152152
elapsed_time = current_time - start_time
153153
if elapsed_time >= self._keep_alive_interval:
154154
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
155-
batch=self._link.current_link_credit))
155+
batch=self._link.total_link_credit))
156156
start_time = current_time
157157
await asyncio.sleep(1)
158158
except Exception as e: # pylint: disable=broad-except
@@ -759,7 +759,7 @@ async def _client_run_async(self, **kwargs):
759759
:rtype: bool
760760
"""
761761
try:
762-
if self._link.current_link_credit <= 0:
762+
if self._link.total_link_credit <= 0:
763763
await self._link.flow(link_credit=self._link_credit)
764764
await self._connection.listen(wait=self._socket_timeout, **kwargs)
765765
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_link_async.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def __init__(
101101
self._on_link_state_change = kwargs.get("on_link_state_change")
102102
self._on_attach = kwargs.get("on_attach")
103103
self._error: Optional[AMQPLinkError] = None
104+
self.total_link_credit = self.link_credit
104105

105106
async def __aenter__(self) -> "Link":
106107
await self.attach()
@@ -273,5 +274,19 @@ async def detach(self, close: bool = False, error: Optional[AMQPError] = None) -
273274
await self._set_state(LinkState.DETACHED)
274275

275276
async def flow(self, *, link_credit: Optional[int] = None, **kwargs) -> None:
276-
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
277-
await self._outgoing_flow(**kwargs)
277+
# Given the desired link credit `link_credit`, the link credit sent via
278+
# FlowFrame is calculated as follows: The link credit to flow on the wire
279+
# `self.current_link_credit` is the desired link credit `link_credit`
280+
# minus the current link credit on the wire `self.total_link_credit`.
281+
self.current_link_credit = link_credit - self.total_link_credit \
282+
if link_credit is not None else self.link_credit
283+
284+
# If the link credit to flow is greater than 0 (i.e the desired link credit
285+
# is greater than the current link credit on the wire), then we will send a
286+
# flow to issue more link credit. Otherwise link credit on the wire is sufficient.
287+
if self.current_link_credit > 0:
288+
# Calculate the total link credit on the wire, by adding the credit
289+
# we will flow to the total link credit.
290+
self.total_link_credit = self.current_link_credit + self.total_link_credit \
291+
if link_credit is not None else self.link_credit
292+
await self._outgoing_flow(**kwargs)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ async def _incoming_transfer(self, frame):
6262
self.received_delivery_id = frame[1] # delivery_id
6363
# If more is false --> this is the last frame of the message
6464
if not frame[5]:
65-
self.current_link_credit -= 1
6665
self.delivery_count += 1
66+
self.current_link_credit -= 1
67+
self.total_link_credit -= 1
6768
if self.received_delivery_id is not None:
6869
self._first_frame = frame
6970
if not self.received_delivery_id and not self._received_payload:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def _keep_alive(self):
239239
current_time = time.time()
240240
elapsed_time = current_time - start_time
241241
if elapsed_time >= self._keep_alive_interval:
242-
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
242+
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
243243
start_time = current_time
244244
time.sleep(1)
245245
except Exception as e: # pylint: disable=broad-except
@@ -866,7 +866,7 @@ def _client_run(self, **kwargs):
866866
:rtype: bool
867867
"""
868868
try:
869-
if self._link.current_link_credit <= 0:
869+
if self._link.total_link_credit <= 0:
870870
self._link.flow(link_credit=self._link_credit)
871871
self._connection.listen(wait=self._socket_timeout, **kwargs)
872872
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/link.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def __init__(
9797
self._on_link_state_change = kwargs.get("on_link_state_change")
9898
self._on_attach = kwargs.get("on_attach")
9999
self._error: Optional[AMQPLinkError] = None
100+
self.total_link_credit = self.link_credit
100101

101102
def __enter__(self) -> "Link":
102103
self.attach()
@@ -268,5 +269,18 @@ def detach(self, close: bool = False, error: Optional[AMQPError] = None) -> None
268269
self._set_state(LinkState.DETACHED)
269270

270271
def flow(self, *, link_credit: Optional[int] = None, **kwargs: Any) -> None:
271-
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
272-
self._outgoing_flow(**kwargs)
272+
# Given the desired link credit `link_credit`, the link credit sent via
273+
# FlowFrame is calculated as follows: The link credit to flow on the wire
274+
# `self.current_link_credit` is the desired link credit
275+
# `link_credit` minus the current link credit on the wire `self.total_link_credit`.
276+
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None \
277+
else self.link_credit
278+
279+
# If the link credit to flow is greater than 0 (i.e the desired link credit is greater than
280+
# the current link credit on the wire), then we will send a flow to issue more link credit.
281+
# Otherwise link credit on the wire is sufficient.
282+
if self.current_link_credit > 0:
283+
# Calculate the total link credit on the wire, by adding the credit we will flow to the total link credit.
284+
self.total_link_credit = self.current_link_credit + self.total_link_credit if link_credit is not None \
285+
else self.link_credit
286+
self._outgoing_flow(**kwargs)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def _incoming_transfer(self, frame):
6060
# If more is false --> this is the last frame of the message
6161
if not frame[5]:
6262
self.current_link_credit -= 1
63+
self.total_link_credit -=1
6364
self.delivery_count += 1
6465
self.received_delivery_id = frame[1] # delivery_id
6566
if self.received_delivery_id is not None:

sdk/eventhub/azure-eventhub/dev_requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
-e ../../../tools/azure-sdk-tools
22
../../core/azure-core
3-
-e ../../identity/azure-identity
3+
azure-identity~=1.17.0
44
azure-mgmt-eventhub<=10.1.0
55
azure-mgmt-resource==20.0.0
66
aiohttp; python_version < '3.12'

sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,45 @@ def test_receive_transfer_continuation_frame():
111111
assert link.delivery_count == 1
112112
link._incoming_transfer(transfer_frame_three)
113113
assert link.current_link_credit == 1
114-
assert link.delivery_count == 2
114+
assert link.delivery_count == 2
115+
116+
117+
def test_receive_transfer_and_flow():
118+
def mock_outgoing(): pass
119+
session = None
120+
link = ReceiverLink(
121+
session,
122+
3,
123+
source_address="test_source",
124+
target_address="test_target",
125+
network_trace=False,
126+
network_trace_params={},
127+
on_transfer=Mock(),
128+
)
129+
130+
link._outgoing_flow = mock_outgoing
131+
link.total_link_credit = 0 # Set the total link credit to 0 to start, no credit on the wire
132+
133+
link.flow(link_credit=100) # Send a flow frame with desired link credit of 100
134+
135+
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
136+
transfer_frame_one = [3, 0, b'/blah', 0, True, False, None, None, None, None, False, ""]
137+
transfer_frame_two = [3, 1, b'/blah', 0, True, False, None, None, None, None, False, ""]
138+
transfer_frame_three = [3, 2, b'/blah', 0, True, False, None, None, None, None, False, ""]
139+
140+
link._incoming_transfer(transfer_frame_one)
141+
assert link.current_link_credit == 99
142+
assert link.total_link_credit == 99
143+
144+
# Only received 1 transfer frame per receive call, we set desired link credit again
145+
# this will send a flow of 1
146+
link.flow(link_credit=100)
147+
assert link.current_link_credit == 1
148+
assert link.total_link_credit == 100
149+
150+
link._incoming_transfer(transfer_frame_two)
151+
assert link.current_link_credit == 0
152+
assert link.total_link_credit == 99
153+
link._incoming_transfer(transfer_frame_three)
154+
assert link.current_link_credit == -1
155+
assert link.total_link_credit == 98

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 7.12.3 (2024-09-17)
3+
## 7.12.3 (2024-09-19)
44

55
### Bugs Fixed
66

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async def _keep_alive_async(self):
152152
elapsed_time = current_time - start_time
153153
if elapsed_time >= self._keep_alive_interval:
154154
await asyncio.shield(self._connection.listen(wait=self._socket_timeout,
155-
batch=self._link.current_link_credit))
155+
batch=self._link.total_link_credit))
156156
start_time = current_time
157157
await asyncio.sleep(1)
158158
except Exception as e: # pylint: disable=broad-except
@@ -759,7 +759,7 @@ async def _client_run_async(self, **kwargs):
759759
:rtype: bool
760760
"""
761761
try:
762-
if self._link.current_link_credit <= 0:
762+
if self._link.total_link_credit <= 0:
763763
await self._link.flow(link_credit=self._link_credit)
764764
await self._connection.listen(wait=self._socket_timeout, **kwargs)
765765
except ValueError:

0 commit comments

Comments
 (0)