Skip to content

Commit 2f49bd6

Browse files
authored
[SB] settled as false (Azure#30607)
* deliveryid * test * assert message strung together correctly * add settlement fix * changing fix location * fix tests * update to just receive to clear out queue * complete messages
1 parent 4f28010 commit 2f49bd6

File tree

4 files changed

+55
-102
lines changed

4 files changed

+55
-102
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,5 @@ async def send_disposition(
124124
if self._is_closed:
125125
raise ValueError("Link already closed.")
126126
await self._outgoing_disposition(first_delivery_id, last_delivery_id, settled, delivery_state, batchable)
127-
await self._wait_for_response(wait)
127+
if not settled:
128+
await self._wait_for_response(wait)

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,5 @@ def send_disposition(
121121
if self._is_closed:
122122
raise ValueError("Link already closed.")
123123
self._outgoing_disposition(first_delivery_id, last_delivery_id, settled, delivery_state, batchable)
124-
self._wait_for_response(wait)
124+
if not settled:
125+
self._wait_for_response(wait)

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,11 @@ async def test_async_queue_by_servicebus_client_session_fail(self, uamqp_transpo
981981

982982
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
983983
await sender.send_messages(ServiceBusMessage("test session sender", session_id="test"))
984+
985+
async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
986+
messages = await receiver.receive_messages()
987+
for message in messages:
988+
await receiver.complete_message(message)
984989

985990
@pytest.mark.asyncio
986991
@pytest.mark.liveTest
@@ -2083,57 +2088,27 @@ async def _hack_amqp_sender_run_async(self, **kwargs):
20832088
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
20842089
@ArgPasserAsync()
20852090
async def test_async_queue_send_large_message_receive(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
2086-
async def _hack_amqp_sender_run_async(self, **kwargs):
2087-
time.sleep(6) # sleep until timeout
2088-
if uamqp_transport:
2089-
await self.message_handler.work_async()
2090-
self._waiting_messages = 0
2091-
self._pending_messages = self._filter_pending()
2092-
if self._backoff and not self._waiting_messages:
2093-
_logger.info("Client told to backoff - sleeping for %r seconds", self._backoff)
2094-
await self._connection.sleep_async(self._backoff)
2095-
self._backoff = 0
2096-
await self._connection.work_async()
2097-
else:
2098-
try:
2099-
await self._link.update_pending_deliveries()
2100-
await self._connection.listen(wait=self._socket_timeout, **kwargs)
2101-
except ValueError:
2102-
self._shutdown = True
2103-
return False
2104-
return True
2105-
21062091
async with ServiceBusClient.from_connection_string(
2107-
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:
2108-
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
2109-
# this one doesn't need to reset the method, as it's hacking the method on the instance
2110-
sender._handler._client_run_async = types.MethodType(_hack_amqp_sender_run_async, sender._handler)
2111-
with pytest.raises(OperationTimeoutError):
2112-
await sender.send_messages(ServiceBusMessage("body"), timeout=5)
2113-
2114-
if not uamqp_transport:
2115-
# Amqp
2116-
async with ServiceBusClient.from_connection_string(
2117-
servicebus_namespace_connection_string,
2118-
uamqp_transport=uamqp_transport
2119-
) as sb_client:
2120-
async with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=1.0) as sender:
2121-
payload = "A" * 250 * 1024
2122-
await sender.send_messages(ServiceBusMessage(payload))
2092+
servicebus_namespace_connection_string,
2093+
uamqp_transport=uamqp_transport
2094+
) as sb_client:
2095+
async with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=1.0) as sender:
2096+
payload = "A" * 250 * 1024
2097+
await sender.send_messages(ServiceBusMessage(payload))
21232098

2124-
if uamqp:
2125-
transport_type = uamqp.constants.TransportType.AmqpOverWebsocket
2126-
else:
2127-
transport_type = TransportType.AmqpOverWebsocket
2128-
# AmqpOverWebsocket
2129-
async with ServiceBusClient.from_connection_string(
2130-
servicebus_namespace_connection_string,
2131-
transport_type=transport_type,
2132-
uamqp_transport=uamqp_transport
2133-
) as sb_client:
2134-
async with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=1.2) as sender:
2135-
payload = "A" * 250 * 1024
2136-
await sender.send_messages(ServiceBusMessage(payload))
2099+
if uamqp:
2100+
transport_type = uamqp.constants.TransportType.AmqpOverWebsocket
2101+
else:
2102+
transport_type = TransportType.AmqpOverWebsocket
2103+
# AmqpOverWebsocket
2104+
async with ServiceBusClient.from_connection_string(
2105+
servicebus_namespace_connection_string,
2106+
transport_type=transport_type,
2107+
uamqp_transport=uamqp_transport
2108+
) as sb_client:
2109+
async with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=1.2) as sender:
2110+
payload = "A" * 250 * 1024
2111+
await sender.send_messages(ServiceBusMessage(payload))
21372112

21382113
# ReceiveMessages
21392114
async with ServiceBusClient.from_connection_string(
@@ -2144,7 +2119,7 @@ async def _hack_amqp_sender_run_async(self, **kwargs):
21442119
if not uamqp_transport:
21452120
assert message._delivery_id is not None
21462121
assert message._message.data[0].decode("utf-8") == "A" * 250 * 1024
2147-
receiver.complete_message(message) # complete messages
2122+
await receiver.complete_message(message) # complete messages
21482123

21492124
@pytest.mark.asyncio
21502125
@pytest.mark.liveTest
@@ -2723,7 +2698,7 @@ async def test_queue_async_send_amqp_annotated_message(self, uamqp_transport, *,
27232698
assert raw_amqp_message.annotations[b'ann_key'] == b'ann_value'
27242699
assert raw_amqp_message.application_properties[b'body_type'] == b'value'
27252700
recv_value_msg += 1
2726-
receiver.complete_message(message)
2701+
await receiver.complete_message(message)
27272702

27282703
assert recv_data_msg == 3
27292704
assert recv_sequence_msg == 3

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,11 @@ def test_queue_by_servicebus_client_session_fail(self, uamqp_transport, *, servi
10701070

10711071
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
10721072
sender.send_messages(ServiceBusMessage("test session sender", session_id="test"))
1073+
1074+
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
1075+
messages = receiver.receive_messages()
1076+
for message in messages:
1077+
receiver.complete_message(message)
10731078

10741079

10751080
@pytest.mark.liveTest
@@ -2498,57 +2503,28 @@ def _hack_amqp_sender_run(self, **kwargs):
24982503
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
24992504
@ArgPasser()
25002505
def test_queue_send_large_message_receive(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs):
2501-
def _hack_amqp_sender_run(self, **kwargs):
2502-
time.sleep(6) # sleep until timeout
2503-
if uamqp_transport:
2504-
self.message_handler.work()
2505-
self._waiting_messages = 0
2506-
self._pending_messages = self._filter_pending()
2507-
if self._backoff and not self._waiting_messages:
2508-
_logger.info("Client told to backoff - sleeping for %r seconds", self._backoff)
2509-
self._connection.sleep(self._backoff)
2510-
self._backoff = 0
2511-
self._connection.work()
2512-
else:
2513-
try:
2514-
self._link.update_pending_deliveries()
2515-
self._connection.listen(wait=self._socket_timeout, **kwargs)
2516-
except ValueError:
2517-
self._shutdown = True
2518-
return False
2519-
return True
2520-
2506+
# Amqp
25212507
with ServiceBusClient.from_connection_string(
2522-
servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client:
2523-
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
2524-
# this one doesn't need to reset the method, as it's hacking the method on the instance
2525-
sender._handler._client_run = types.MethodType(_hack_amqp_sender_run, sender._handler)
2526-
with pytest.raises(OperationTimeoutError):
2527-
sender.send_messages(ServiceBusMessage("body"), timeout=5)
2528-
2529-
if not uamqp_transport:
2530-
# Amqp
2531-
with ServiceBusClient.from_connection_string(
2532-
servicebus_namespace_connection_string,
2533-
uamqp_transport=uamqp_transport
2534-
) as sb_client:
2535-
with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=0.6) as sender:
2536-
payload = "A" * 250 * 1024
2537-
sender.send_messages(ServiceBusMessage(payload))
2508+
servicebus_namespace_connection_string,
2509+
uamqp_transport=uamqp_transport
2510+
) as sb_client:
2511+
with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=0.6) as sender:
2512+
payload = "A" * 250 * 1024
2513+
sender.send_messages(ServiceBusMessage(payload))
25382514

2539-
if uamqp:
2540-
transport_type = uamqp.constants.TransportType.AmqpOverWebsocket
2541-
else:
2542-
transport_type = TransportType.AmqpOverWebsocket
2543-
# AmqpOverWebsocket
2544-
with ServiceBusClient.from_connection_string(
2545-
servicebus_namespace_connection_string,
2546-
transport_type=transport_type,
2547-
uamqp_transport=uamqp_transport
2548-
) as sb_client:
2549-
with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=0.8) as sender:
2550-
payload = "A" * 250 * 1024
2551-
sender.send_messages(ServiceBusMessage(payload))
2515+
if uamqp:
2516+
transport_type = uamqp.constants.TransportType.AmqpOverWebsocket
2517+
else:
2518+
transport_type = TransportType.AmqpOverWebsocket
2519+
# AmqpOverWebsocket
2520+
with ServiceBusClient.from_connection_string(
2521+
servicebus_namespace_connection_string,
2522+
transport_type=transport_type,
2523+
uamqp_transport=uamqp_transport
2524+
) as sb_client:
2525+
with sb_client.get_queue_sender(servicebus_queue.name, socket_timeout=0.8) as sender:
2526+
payload = "A" * 250 * 1024
2527+
sender.send_messages(ServiceBusMessage(payload))
25522528

25532529
# ReceiveMessages
25542530
with ServiceBusClient.from_connection_string(

0 commit comments

Comments
 (0)