Skip to content

Commit 0ac1da8

Browse files
authored
Merge pull request #28 from annatisch/send-async
vNext
2 parents 3114943 + a1b87a7 commit 0ac1da8

37 files changed

+1115
-338
lines changed

HISTORY.rst

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,55 @@
33
Release History
44
===============
55

6+
0.2.0 (2018-07-25)
7+
++++++++++++++++++
8+
9+
- **Breaking change** `MessageSender.send_async` has been renamed to `MessageSender.send`, and
10+
`MessageSenderAsync.send_async` is now a coroutine.
11+
- **Breaking change** Removed `detach_received` callback argument from MessageSender, MessageReceiver,
12+
MessageSenderAsync, and MessageReceiverAsync in favour of new `error_policy` argument.
13+
- Added ErrorPolicy class to determine how the client should respond to both generic AMQP errors
14+
and custom or vendor-specific errors. A default policy will be used, but a custom policy can
15+
be added to any client by using a new `error_policy` argument. Value must be either an instance
16+
or subclass of ErrorPolicy.
17+
18+
- The `error_policy` argument has also been added to MessageSender, MessageReceiver, Connection, and their
19+
async counterparts to allow for handling of link DETACH and connection CLOSE events.
20+
- The error policy passed to a SendClient determines the number of message send retry
21+
attempts. This replaces the previous `constants.MESSAGE_SEND_RETRIES` value which is now
22+
deprecated.
23+
- Added new ErrorAction object to determine how a client should respond to an error. It has
24+
three properties: `retry` (a boolean to determine whether the error is retryable), `backoff`
25+
(an integer to determine how long the client should wait before retrying, default is 0) and
26+
`increment_retries` (a boolean to determine whether the error should count against the maximum
27+
retry attempts, default is `True`). Currently `backoff` and `increment_retries` are only
28+
considered for message send failures.
29+
- Added `VendorConnectionClose` and `VendorLinkDetach` exceptions for non-standard (unrecognized)
30+
connection/link errors.
31+
32+
- Added support for HTTP proxy configuration.
33+
- Added support for running async clients synchronously.
34+
- Added keep-alive support for connection - this is a background thread for a synchronous
35+
client, and a background async function for an async client. The keep-alive feature is
36+
disabled by default, to enable, set the `keep_alive_interval` argument on the client to
37+
an integer representing the number of seconds between connection pings.
38+
- Added support for catching a Connection CLOSE event.
39+
- Added support for `Connection.sleep` and `ConnectionAsync.sleep_async` to pause the connection.
40+
- Added support for surfacing message disposition delivery-state (with error information).
41+
- Added `constants.ErrorCodes` enum to map standard AMQP error conditions. This replaces the previous
42+
`constants.ERROR_CONNECTION_REDIRECT` and `constants.ERROR_LINK_REDIRECT` which are now both
43+
deprecated.
44+
- Added new super error `AMQPError` from which all exceptions inherit.
45+
- Added new `MessageHandlerError` exception, a subclass of `AMQPConnectionError`, for
46+
Senders/Receivers that enter an indeterminate error state.
47+
- `MessageException` is now a subclass of `MessageResponse`.
48+
- Added `ClientMessageError` exception, a subclass of `MessageException` for send errors raised client-side.
49+
- Catching Link DETACH event will now work regardless of whether service returns delivery-state.
50+
- Fixed bug where received messages attempting to settle on a detached link crashed the client.
51+
- Fixed bug in amqp C DescribedValue.
52+
- Fixed bug where client crashed on deallocating failed management operation.
53+
54+
655
0.1.1 (2018-07-14)
756
++++++++++++++++++
857

build_armv7l.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -e
44
# To execute this script:
55
# docker run --rm -v $PWD:/data local/manylinux_crypto32 /data/build_many_linux_32bit.sh
66

7-
export UAMQP_VERSION="0.1.1"
7+
export UAMQP_VERSION="0.2.0"
88

99
export CPATH="/opt/uamqp/openssl/include"
1010
export LIBRARY_PATH="/opt/uamqp/openssl/lib"

build_many_linux_32bit.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -e
44
# To execute this script:
55
# docker run --rm -v $PWD:/data local/manylinux_crypto32 /data/build_many_linux_32bit.sh
66

7-
export UAMQP_VERSION="0.1.1"
7+
export UAMQP_VERSION="0.2.0"
88

99
export CPATH="/opt/pyca/cryptography/openssl/include"
1010
export LIBRARY_PATH="/opt/pyca/cryptography/openssl/lib"

build_many_linux_64bit.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -e
44
# To execute this script:
55
# docker run --rm -v $PWD:/data local/manylinux_crypto64 /data/build_many_linux_64bit.sh
66

7-
export UAMQP_VERSION="0.1.1"
7+
export UAMQP_VERSION="0.2.0"
88

99
export CPATH="/opt/pyca/cryptography/openssl/include"
1010
export LIBRARY_PATH="/opt/pyca/cryptography/openssl/lib"

samples/test_azure_event_hubs_receive.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from urllib.parse import quote_plus
1616

1717
import uamqp
18-
from uamqp import address
18+
from uamqp import address, errors
1919
from uamqp import authentication
2020

2121

@@ -79,6 +79,23 @@ def test_event_hubs_single_batch_receive(live_eventhub_config):
7979
assert len(message) <= 300
8080

8181

82+
def test_event_hubs_client_proxy_settings(live_eventhub_config):
83+
proxy_settings={'proxy_hostname':'127.0.0.1', 'proxy_port': 12345}
84+
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
85+
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
86+
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'], http_proxy=proxy_settings)
87+
88+
source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
89+
live_eventhub_config['hostname'],
90+
live_eventhub_config['event_hub'],
91+
live_eventhub_config['consumer_group'],
92+
live_eventhub_config['partition'])
93+
with pytest.raises(errors.AMQPConnectionError):
94+
receive_client = uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50)
95+
receive_client.receive_message_batch(max_batch_size=10)
96+
receive_client.close()
97+
98+
8299
def test_event_hubs_client_receive_sync(live_eventhub_config):
83100
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
84101
sas_auth = authentication.SASTokenAuth.from_shared_access_key(

samples/test_azure_event_hubs_send.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_event_hubs_simple_send(live_eventhub_config):
3131
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
3232
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
3333
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
34-
uamqp.send_message(target, msg_content, auth=sas_auth)
34+
uamqp.send_message(target, msg_content, auth=sas_auth, debug=True)
3535

3636

3737
def test_event_hubs_client_send_sync(live_eventhub_config):
@@ -104,4 +104,4 @@ def data_generator():
104104
config['consumer_group'] = "$Default"
105105
config['partition'] = "0"
106106

107-
test_event_hubs_batch_send_sync(config)
107+
test_event_hubs_client_send_sync(config)

samples/test_azure_event_hubs_send_async.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ async def test_event_hubs_single_send_async(live_eventhub_config):
5959
await send_client.close_async()
6060

6161

62+
@pytest.mark.asyncio
63+
async def test_event_hubs_async_sender_sync(live_eventhub_config):
64+
annotations={b"x-opt-partition-key": b"PartitionKeyInfo"}
65+
msg_content = b"hello world"
66+
67+
message = uamqp.Message(msg_content, annotations=annotations)
68+
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
69+
sas_auth = authentication.SASTokenAsync.from_shared_access_key(uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
70+
71+
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
72+
send_client = uamqp.SendClientAsync(target, auth=sas_auth, debug=False)
73+
74+
for _ in range(10):
75+
message = uamqp.Message(msg_content, application_properties=annotations, annotations=annotations)
76+
send_client.send_message(message)
77+
send_client.close()
78+
79+
6280
@pytest.mark.asyncio
6381
async def test_event_hubs_batch_send_async(live_eventhub_config):
6482
for _ in range(10):

src/amqpvalue.pyx

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -875,18 +875,26 @@ cdef class DescribedValue(AMQPValue):
875875
@property
876876
def description(self):
877877
cdef c_amqpvalue.AMQP_VALUE value
878+
cdef c_amqpvalue.AMQP_VALUE cloned
878879
value = c_amqpvalue.amqpvalue_get_inplace_descriptor(self._c_value)
879880
if <void*>value == NULL:
880881
self._value_error()
881-
return value_factory(value)
882+
cloned = c_amqpvalue.amqpvalue_clone(value)
883+
if <void*>cloned == NULL:
884+
self._value_error()
885+
return value_factory(cloned)
882886

883887
@property
884888
def data(self):
885889
cdef c_amqpvalue.AMQP_VALUE value
890+
cdef c_amqpvalue.AMQP_VALUE cloned
886891
value = c_amqpvalue.amqpvalue_get_inplace_described_value(self._c_value)
887892
if <void*>value == NULL:
888893
self._value_error()
889-
return value_factory(value)
894+
cloned = c_amqpvalue.amqpvalue_clone(value)
895+
if <void*>cloned == NULL:
896+
self._value_error()
897+
return value_factory(cloned)
890898

891899
@property
892900
def value(self):

src/connection.pyx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,17 @@ cdef void on_connection_state_changed(void* context, c_connection.CONNECTION_STA
190190
cdef void on_io_error(void* context):
191191
if <void*>context != NULL:
192192
context_obj = <object>context
193-
context_obj._io_error()
193+
if hasattr(context_obj, '_io_error'):
194+
context_obj._io_error()
194195

195196

196197
cdef void on_connection_close_received(void* context, c_amqp_definitions.ERROR_HANDLE error):
198+
cdef c_amqp_definitions.ERROR_HANDLE cloned
197199
context_obj = <object>context
198-
cloned = c_amqp_definitions.error_clone(error)
199-
wrapped_error = error_factory(cloned)
200+
if <void*> error != NULL:
201+
cloned = c_amqp_definitions.error_clone(error)
202+
wrapped_error = error_factory(cloned)
203+
else:
204+
wrapped_error = None
200205
if hasattr(context_obj, '_close_received'):
201206
context_obj._close_received(wrapped_error)
202-
elif callable(context_obj):
203-
context_obj(wrapped_error)

src/link.pyx

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,10 @@ cdef class cLink(StructBase):
150150
cdef void on_link_detach_received(void* context, c_amqp_definitions.ERROR_HANDLE error):
151151
cdef c_amqp_definitions.ERROR_HANDLE cloned
152152
context_obj = <object>context
153-
cloned = c_amqp_definitions.error_clone(error)
154-
wrapped_error = error_factory(cloned)
153+
if <void*> error != NULL:
154+
cloned = c_amqp_definitions.error_clone(error)
155+
wrapped_error = error_factory(cloned)
156+
else:
157+
wrapped_error = None
155158
if hasattr(context_obj, '_detach_received'):
156159
context_obj._detach_received(wrapped_error)
157-
elif callable(context_obj):
158-
context_obj(wrapped_error)

0 commit comments

Comments
 (0)