Skip to content

Commit 38b3cd4

Browse files
authored
[EH] on event tasks bound receive (#36502)
* update to limit prefetch * updates * pylint * use lock * refactor * pylint * make value a constant * add a mimic process test * test for buffer length * Update sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py * nit * dont use connstr * typo
1 parent f729bea commit 38b3cd4

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
USER_AGENT_PREFIX = "azsdk-python-eventhubs"
4747
UAMQP_LIBRARY = "uamqp"
4848
PYAMQP_LIBRARY = "pyamqp"
49+
MAX_BUFFER_LENGTH = 300
4950

5051
NO_RETRY_ERRORS = [
5152
b"com.microsoft:argument-out-of-range",

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
EventDataSendError,
2323
OperationTimeoutError
2424
)
25+
from ..._constants import MAX_BUFFER_LENGTH
2526

2627

2728
if TYPE_CHECKING:
@@ -253,16 +254,24 @@ async def _callback_task(consumer, batch, max_batch_size, max_wait_time):
253254
await asyncio.sleep(0.05)
254255

255256
@staticmethod
256-
async def _receive_task(consumer):
257+
async def _receive_task(consumer, max_batch_size):
257258
# pylint:disable=protected-access
258259
max_retries = consumer._client._config.max_retries
259260
retried_times = 0
260261
running = True
261262
try:
262263
while retried_times <= max_retries and running and consumer._callback_task_run:
263264
try:
265+
# set a default value of consumer._prefetch for buffer length
266+
buff_length = MAX_BUFFER_LENGTH
264267
await consumer._open() # pylint: disable=protected-access
265-
running = await cast(ReceiveClientAsync, consumer._handler).do_work_async(batch=consumer._prefetch)
268+
async with consumer._message_buffer_lock:
269+
buff_length = len(consumer._message_buffer)
270+
if buff_length <= max_batch_size:
271+
running = await cast(ReceiveClientAsync, consumer._handler).do_work_async(
272+
batch=consumer._prefetch
273+
)
274+
await asyncio.sleep(0.05)
266275
except asyncio.CancelledError: # pylint: disable=try-except-raise
267276
raise
268277
except Exception as exception: # pylint: disable=broad-except
@@ -314,7 +323,7 @@ async def receive_messages_async(consumer, batch, max_batch_size, max_wait_time)
314323
callback_task = asyncio.create_task(
315324
PyamqpTransportAsync._callback_task(consumer, batch, max_batch_size, max_wait_time)
316325
)
317-
receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer))
326+
receive_task = asyncio.create_task(PyamqpTransportAsync._receive_task(consumer, max_batch_size))
318327

319328
tasks = [callback_task, receive_task]
320329
try:

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,3 +460,32 @@ async def on_event(partition_context, event):
460460
await asyncio.sleep(10)
461461
assert on_event.received == 1
462462
await task
463+
464+
@pytest.mark.liveTest
465+
@pytest.mark.asyncio
466+
async def test_receive_mimic_processing_async(auth_credential_senders_async, uamqp_transport):
467+
fully_qualified_namespace, eventhub_name, credential, senders = auth_credential_senders_async
468+
for i in range(305):
469+
senders[0].send(EventData("A"))
470+
client = EventHubConsumerClient(
471+
fully_qualified_namespace=fully_qualified_namespace,
472+
eventhub_name=eventhub_name,
473+
credential=credential(),
474+
consumer_group="$default",
475+
uamqp_transport=uamqp_transport
476+
)
477+
async def on_event(partition_context, event):
478+
assert partition_context.partition_id == "0"
479+
assert partition_context.consumer_group == "$default"
480+
assert partition_context.eventhub_name == senders[0]._client.eventhub_name
481+
on_event.received += 1
482+
# Mimic processing of event
483+
await asyncio.sleep(20)
484+
assert client._event_processors[0]._consumers[0]._message_buffer <=300
485+
486+
on_event.received = 0
487+
async with client:
488+
task = asyncio.ensure_future(
489+
client.receive(on_event, partition_id="0", starting_position="-1", prefetch=2))
490+
await asyncio.sleep(10)
491+
await task

0 commit comments

Comments
 (0)