Skip to content

Commit e3bb7f2

Browse files
[EventHub] Consumer stops sending link credits to service (Azure#32767)
* flow fix * remove prints * update sync * update pylint * tests * unittest transfer frame * another unittest * Update sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py Co-authored-by: Kashif Khan <[email protected]> * update transfer test * missing list entry * fix tests --------- Co-authored-by: Kashif Khan <[email protected]>
1 parent daf3eda commit e3bb7f2

File tree

7 files changed

+115
-4
lines changed

7 files changed

+115
-4
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ async def _client_run_async(self, **kwargs):
748748
:rtype: bool
749749
"""
750750
try:
751-
if self._link.current_link_credit == 0:
751+
if self._link.current_link_credit <= 0:
752752
await self._link.flow()
753753
await self._connection.listen(wait=self._socket_timeout, **kwargs)
754754
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ async def _incoming_attach(self, frame):
5757
async def _incoming_transfer(self, frame):
5858
if self.network_trace:
5959
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
60-
self.current_link_credit -= 1
6160
self.delivery_count += 1
6261
self.received_delivery_id = frame[1] # delivery_id
62+
# If more is false --> this is the last frame of the message
63+
if not frame[5]:
64+
self.current_link_credit -= 1
6365
if self.received_delivery_id is not None:
6466
self._first_frame = frame
6567
if not self.received_delivery_id and not self._received_payload:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ def _client_run(self, **kwargs):
851851
:rtype: bool
852852
"""
853853
try:
854-
if self._link.current_link_credit == 0:
854+
if self._link.current_link_credit <= 0:
855855
self._link.flow()
856856
self._connection.listen(wait=self._socket_timeout, **kwargs)
857857
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def _incoming_attach(self, frame):
5454
def _incoming_transfer(self, frame):
5555
if self.network_trace:
5656
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
57-
self.current_link_credit -= 1
57+
# If more is false --> this is the last frame of the message
58+
if not frame[5]:
59+
self.current_link_credit -= 1
5860
self.delivery_count += 1
5961
self.received_delivery_id = frame[1] # delivery_id
6062
if self.received_delivery_id is not None:

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,27 @@ async def on_event_batch(partition_context, event_batch):
305305
assert root_receive.children[1].links[1].headers['traceparent'] == traceparent2
306306

307307
settings.tracing_implementation.set_value(None)
308+
309+
310+
@pytest.mark.liveTest
311+
@pytest.mark.asyncio
312+
async def test_receive_batch_large_event_async(connstr_senders, uamqp_transport):
313+
connection_str, senders = connstr_senders
314+
senders[0].send(EventData("A" * 15700))
315+
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default', uamqp_transport=uamqp_transport)
316+
317+
async def on_event(partition_context, event):
318+
assert partition_context.partition_id == "0"
319+
assert partition_context.consumer_group == "$default"
320+
assert partition_context.fully_qualified_namespace in connection_str
321+
assert partition_context.eventhub_name == senders[0]._client.eventhub_name
322+
on_event.received += 1
323+
assert client._event_processors[0]._consumers[0]._handler._link.current_link_credit == 1
324+
325+
on_event.received = 0
326+
async with client:
327+
task = asyncio.ensure_future(
328+
client.receive(on_event, partition_id="0", starting_position="-1", prefetch=2))
329+
await asyncio.sleep(10)
330+
assert on_event.received == 1
331+
await task

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_consumer_client.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,3 +328,34 @@ def on_event_batch(partition_context, event_batch):
328328
assert root_receive.children[1].links[1].headers['traceparent'] == traceparent2
329329

330330
settings.tracing_implementation.set_value(None)
331+
332+
333+
@pytest.mark.liveTest
334+
def test_receive_batch_large_event(connstr_senders, uamqp_transport):
335+
connection_str, senders = connstr_senders
336+
senders[0].send(EventData("A" * 15700))
337+
client = EventHubConsumerClient.from_connection_string(
338+
connection_str, consumer_group='$default', uamqp_transport=uamqp_transport
339+
)
340+
341+
def on_event(partition_context, event):
342+
on_event.received += 1
343+
on_event.partition_id = partition_context.partition_id
344+
on_event.consumer_group = partition_context.consumer_group
345+
on_event.fully_qualified_namespace = partition_context.fully_qualified_namespace
346+
on_event.eventhub_name = partition_context.eventhub_name
347+
assert client._event_processors[0]._consumers[0]._handler._link.current_link_credit == 1
348+
349+
on_event.received = 0
350+
with client:
351+
worker = threading.Thread(target=client.receive_batch,
352+
args=(on_event,),
353+
kwargs={"starting_position": "-1",
354+
"partition_id": "0", "prefetch": 2})
355+
worker.start()
356+
time.sleep(10)
357+
assert on_event.received == 1
358+
assert on_event.partition_id == "0"
359+
assert on_event.consumer_group == "$default"
360+
assert on_event.fully_qualified_namespace in connection_str
361+
assert on_event.eventhub_name == senders[0]._client.eventhub_name

sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from unittest.mock import Mock
22
from azure.eventhub._pyamqp.link import Link
3+
from azure.eventhub._pyamqp.receiver import ReceiverLink
34
from azure.eventhub._pyamqp.constants import LinkState
45
import pytest
56

@@ -57,3 +58,54 @@ def test_link_should_not_detach(state):
5758
link._outgoing_detach = Mock(return_value=None)
5859
link.detach()
5960
link._outgoing_detach.assert_not_called()
61+
62+
def test_receive_transfer_frame_multiple():
63+
session = None
64+
link = ReceiverLink(
65+
session,
66+
3,
67+
source_address="test_source",
68+
target_address="test_target",
69+
network_trace=False,
70+
network_trace_params={},
71+
on_transfer=Mock(),
72+
)
73+
74+
link.current_link_credit = 2 # Set the link credit to 2
75+
76+
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, bathable, payload
77+
transfer_frame_one = [3, 0, b'/blah', 0, True, True, None, None, None, None, False, ""]
78+
transfer_frame_two = [3, None, b'/blah', 0, True, False, None, None, None, None, False, ""]
79+
80+
link._incoming_transfer(transfer_frame_one)
81+
assert link.current_link_credit == 2
82+
link._incoming_transfer(transfer_frame_two)
83+
assert link.current_link_credit == 1
84+
85+
def test_receive_transfer_continuation_frame():
86+
session = None
87+
link = ReceiverLink(
88+
session,
89+
3,
90+
source_address="test_source",
91+
target_address="test_target",
92+
network_trace=False,
93+
network_trace_params={},
94+
on_transfer=Mock(),
95+
)
96+
97+
link.current_link_credit = 3 # Set the link credit to 2
98+
99+
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
100+
transfer_frame_one = [3, 0, b'/blah', 0, True, False, None, None, None, None, False, ""]
101+
transfer_frame_two = [3, 1, b'/blah', 0, True, True, None, None, None, None, False, ""]
102+
transfer_frame_three = [3, None, b'/blah', 0, True, False, None, None, None, None, False, ""]
103+
104+
105+
106+
link._incoming_transfer(transfer_frame_one)
107+
assert link.current_link_credit == 2
108+
link._incoming_transfer(transfer_frame_two)
109+
assert link.current_link_credit == 2
110+
link._incoming_transfer(transfer_frame_three)
111+
assert link.current_link_credit == 1

0 commit comments

Comments
 (0)