Skip to content

Commit 125fa85

Browse files
authored
[EH] Keep alive fixes on producer (#33788)
* start adding through keep alive * keep alive changes * add tests * consumer async * update tests * update test * adding more tests * add idle timeout test * tests * update test to use reconnect format * async transport
1 parent e9a5021 commit 125fa85

File tree

7 files changed

+117
-3
lines changed

7 files changed

+117
-3
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ def __init__(
311311
self._credential = EventhubAzureNamedKeyTokenCredential(credential) # type: ignore
312312
else:
313313
self._credential = credential # type: ignore
314-
self._keep_alive = kwargs.get("keep_alive", 30)
315314
self._auto_reconnect = kwargs.get("auto_reconnect", True)
316315
self._auth_uri = f"sb://{self._address.hostname}{self._address.path}"
317316
self._config = Configuration(

sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(
8383
event_position = kwargs.get("event_position", None)
8484
prefetch = kwargs.get("prefetch", 300)
8585
owner_level = kwargs.get("owner_level", None)
86-
keep_alive = kwargs.get("keep_alive", None)
86+
keep_alive = kwargs.get("keep_alive", 30)
8787
auto_reconnect = kwargs.get("auto_reconnect", True)
8888
track_last_enqueued_event_properties = kwargs.get(
8989
"track_last_enqueued_event_properties", False

sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ def __init__(
197197
network_tracing=kwargs.get("logging_enable"),
198198
**kwargs
199199
)
200+
201+
self._keep_alive = kwargs.get("keep_alive", None)
202+
200203
self._producers: Dict[str, Optional[EventHubProducer]] = {
201204
ALL_PARTITIONS: self._create_producer()
202205
}
@@ -380,6 +383,7 @@ def _create_producer(
380383
send_timeout=send_timeout,
381384
idle_timeout=self._idle_timeout,
382385
amqp_transport=self._amqp_transport,
386+
keep_alive=self._keep_alive,
383387
)
384388
return handler
385389

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(self, client: "EventHubConsumerClient", source: str, **kwargs: Any)
8282
event_position = kwargs.get("event_position", None)
8383
prefetch = kwargs.get("prefetch", 300)
8484
owner_level = kwargs.get("owner_level", None)
85-
keep_alive = kwargs.get("keep_alive", None)
85+
keep_alive = kwargs.get("keep_alive", 30)
8686
auto_reconnect = kwargs.get("auto_reconnect", True)
8787
track_last_enqueued_event_properties = kwargs.get(
8888
"track_last_enqueued_event_properties", False

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def __init__(
184184
network_tracing=kwargs.pop("logging_enable", False),
185185
**kwargs
186186
)
187+
self._keep_alive = kwargs.get("keep_alive", None)
187188
self._producers: Dict[str, Optional[EventHubProducer]] = {
188189
ALL_PARTITIONS: self._create_producer()
189190
}
@@ -365,6 +366,7 @@ def _create_producer(
365366
send_timeout=send_timeout,
366367
idle_timeout=self._idle_timeout,
367368
amqp_transport = self._amqp_transport,
369+
keep_alive = self._keep_alive,
368370
**self._internal_kwargs
369371
)
370372
return handler

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_send_async.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@
3434
from azure.eventhub._pyamqp.authentication import SASTokenAuth
3535
from azure.eventhub._pyamqp.client import ReceiveClient
3636
from azure.eventhub._pyamqp.error import AMQPConnectionError
37+
from azure.eventhub._utils import transform_outbound_single_message
38+
try:
39+
import uamqp
40+
from uamqp import compat
41+
from azure.eventhub.aio._transport._uamqp_transport_async import UamqpTransportAsync as UamqpTransport
42+
except (ModuleNotFoundError, ImportError):
43+
uamqp = None
44+
UamqpTransport = None
45+
46+
from azure.eventhub.aio._transport._pyamqp_transport_async import PyamqpTransportAsync as PyamqpTransport
3747

3848

3949
@pytest.mark.liveTest
@@ -529,3 +539,49 @@ async def on_success(events, pid):
529539
assert sent_events[-1][1] == "0"
530540

531541
assert not on_error.err
542+
543+
@pytest.mark.parametrize("keep_alive", [None, 30, 60])
544+
@pytest.mark.liveTest
545+
@pytest.mark.asyncio
546+
def test_send_with_keep_alive_async(connstr_receivers, keep_alive, uamqp_transport):
547+
connection_str, receivers = connstr_receivers
548+
client = EventHubProducerClient.from_connection_string(connection_str, keep_alive=keep_alive, uamqp_transport=uamqp_transport)
549+
assert client._producers["all-partitions"]._keep_alive == keep_alive
550+
551+
552+
@pytest.mark.parametrize("keep_alive", [None, 5, 30])
553+
@pytest.mark.liveTest
554+
@pytest.mark.asyncio
555+
async def test_send_long_wait_idle_timeout(connstr_receivers, keep_alive, uamqp_transport):
556+
if uamqp_transport:
557+
amqp_transport = UamqpTransport
558+
retry_total = 3
559+
else:
560+
amqp_transport = PyamqpTransport
561+
retry_total = 0
562+
connection_str, receivers = connstr_receivers
563+
client = EventHubProducerClient.from_connection_string(connection_str, keep_alive=keep_alive, idle_timeout=10, retry_total=retry_total, uamqp_transport=uamqp_transport)
564+
sender = await client._create_producer(partition_id="0")
565+
async with sender:
566+
await sender._open_with_retry()
567+
ed = EventData('data')
568+
ed = transform_outbound_single_message(ed, EventData, amqp_transport.to_outgoing_amqp_message)
569+
sender._unsent_events = [ed._message]
570+
# hit idle timeout error
571+
await asyncio.sleep(11)
572+
573+
if uamqp_transport:
574+
sender._unsent_events[0].on_send_complete = sender._on_outcome
575+
if keep_alive !=5:
576+
with pytest.raises((uamqp.errors.ConnectionClose,
577+
uamqp.errors.MessageHandlerError, OperationTimeoutError)):
578+
await sender._send_event_data()
579+
else:
580+
await sender._send_event_data()
581+
582+
if not uamqp_transport:
583+
if keep_alive == 5:
584+
await sender._send_event_data()
585+
else:
586+
with pytest.raises(AMQPConnectionError):
587+
await sender._send_event_data()

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_send.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@
3434
from azure.eventhub._pyamqp.authentication import SASTokenAuth
3535
from azure.eventhub._pyamqp import ReceiveClient
3636
from azure.eventhub._pyamqp.error import AMQPConnectionError
37+
from azure.eventhub._utils import transform_outbound_single_message
38+
try:
39+
import uamqp
40+
from uamqp import compat
41+
from azure.eventhub._transport._uamqp_transport import UamqpTransport
42+
except (ModuleNotFoundError, ImportError):
43+
uamqp = None
44+
UamqpTransport = None
45+
46+
from azure.eventhub._transport._pyamqp_transport import PyamqpTransport
3747

3848

3949
@pytest.mark.liveTest
@@ -557,3 +567,46 @@ def on_success(events, pid):
557567
assert sent_events[-1][1] == "0"
558568

559569
assert not on_error.err
570+
571+
@pytest.mark.parametrize("keep_alive", [None, 30, 60])
572+
@pytest.mark.liveTest
573+
def test_send_with_keep_alive(connstr_receivers, keep_alive, uamqp_transport):
574+
connection_str, receivers = connstr_receivers
575+
client = EventHubProducerClient.from_connection_string(connection_str, keep_alive=keep_alive, uamqp_transport=uamqp_transport)
576+
assert client._producers["all-partitions"]._keep_alive == keep_alive
577+
578+
@pytest.mark.parametrize("keep_alive", [None, 5, 30])
579+
@pytest.mark.liveTest
580+
def test_send_long_wait_idle_timeout(connstr_receivers, keep_alive, uamqp_transport):
581+
if uamqp_transport:
582+
amqp_transport = UamqpTransport
583+
retry_total = 3
584+
else:
585+
amqp_transport = PyamqpTransport
586+
retry_total = 0
587+
connection_str, receivers = connstr_receivers
588+
client = EventHubProducerClient.from_connection_string(connection_str, keep_alive=keep_alive, idle_timeout=10, retry_total=retry_total, uamqp_transport=uamqp_transport)
589+
sender = client._create_producer(partition_id="0")
590+
with sender:
591+
sender._open_with_retry()
592+
ed = EventData('data')
593+
ed = transform_outbound_single_message(ed, EventData, amqp_transport.to_outgoing_amqp_message)
594+
sender._unsent_events = [ed._message]
595+
# hit idle timeout error
596+
time.sleep(11)
597+
598+
if uamqp_transport:
599+
sender._unsent_events[0].on_send_complete = sender._on_outcome
600+
if keep_alive !=5:
601+
with pytest.raises((uamqp.errors.ConnectionClose,
602+
uamqp.errors.MessageHandlerError, OperationTimeoutError)):
603+
sender._send_event_data()
604+
else:
605+
sender._send_event_data()
606+
607+
if not uamqp_transport:
608+
if keep_alive == 5:
609+
sender._send_event_data()
610+
else:
611+
with pytest.raises(AMQPConnectionError):
612+
sender._send_event_data()

0 commit comments

Comments
 (0)