Skip to content

Commit d2066c5

Browse files
committed
Updated tests
1 parent 30300e7 commit d2066c5

File tree

4 files changed

+86
-46
lines changed

4 files changed

+86
-46
lines changed

samples/test_azure_event_hubs_receive.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,12 @@ def test_event_hubs_client_receive_sync(live_eventhub_config):
104104
log.info("Finished receiving")
105105

106106

107-
def test_event_hubs_callback_receive(live_eventhub_config):
107+
def test_event_hubs_callback_receive_sync(live_eventhub_config):
108108
def on_message_received(message):
109109
annotations = message.annotations
110110
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
111-
return message
111+
log.info(str(message))
112+
message.accept()
112113

113114
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
114115
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
@@ -138,9 +139,25 @@ def test_event_hubs_iter_receive_sync(live_eventhub_config):
138139
live_eventhub_config['partition'])
139140

140141
receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=10, debug=False, prefetch=10)
141-
for message in receive_client.receive_messages_iter():
142-
annotations = message.annotations
143-
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
142+
count = 0
143+
gen = receive_client.receive_messages_iter()
144+
for message in gen:
145+
log.info(message.annotations.get(b'x-opt-sequence-number'))
146+
log.info(str(message))
147+
count += 1
148+
if count >= 100000:
149+
log.info("Got {} messages. Shutting down.".format(count))
150+
message.accept()
151+
break
152+
count = 0
153+
for message in gen:
154+
count += 1
155+
if count >= 10:
156+
log.info("Got {} more messages. Shutting down.".format(count))
157+
message.accept()
158+
break
159+
160+
receive_client.close()
144161

145162

146163
def test_event_hubs_filter_receive(live_eventhub_config):
@@ -154,7 +171,7 @@ def test_event_hubs_filter_receive(live_eventhub_config):
154171
live_eventhub_config['consumer_group'],
155172
live_eventhub_config['partition'])
156173
source = address.Source(source_url)
157-
source.set_filter(b"amqp.annotation.x-opt-enqueuedtimeutc > 1518731960545")
174+
source.set_filter(b"amqp.annotation.x-opt-sequence-number > 1324709514")
158175

159176
with uamqp.ReceiveClient(source, auth=plain_auth, timeout=50, prefetch=50) as receive_client:
160177
log.info("Created client, receiving...")

samples/test_azure_event_hubs_receive_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def test_event_hubs_shared_connection_async(live_eventhub_config):
125125
live_eventhub_config['event_hub'],
126126
live_eventhub_config['consumer_group'])
127127

128-
with uamqp.ConnectionAsync(live_eventhub_config['hostname'], sas_auth, debug=True) as conn:
128+
with uamqp.ConnectionAsync(live_eventhub_config['hostname'], sas_auth, debug=False) as conn:
129129
partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=True, auth=sas_auth, timeout=1000, prefetch=1)
130130
partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=True, auth=sas_auth, timeout=1000, prefetch=1)
131131
await partition_0.open_async(connection=conn)
@@ -182,4 +182,4 @@ async def test_event_hubs_multiple_receiver_async(live_eventhub_config):
182182
config['partition'] = "0"
183183

184184
loop = asyncio.get_event_loop()
185-
loop.run_until_complete(test_event_hubs_shared_connection_async(config))
185+
loop.run_until_complete(test_event_hubs_iter_receive_async(config))

samples/test_azure_event_hubs_send.py

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,60 +34,65 @@ def test_event_hubs_simple_send(live_eventhub_config):
3434
uamqp.send_message(target, msg_content, auth=sas_auth)
3535

3636

37-
def test_event_hubs_client_send(live_eventhub_config):
37+
def test_event_hubs_client_send_sync(live_eventhub_config):
3838
annotations={b"x-opt-partition-key": b"PartitionKeyInfo"}
39-
header = uamqp.message.MessageHeader()
40-
props = uamqp.message.MessageProperties()
41-
msg_content = b"hello world"
42-
message = uamqp.Message(
43-
msg_content,
44-
annotations=annotations,
45-
properties=props,
46-
header=header)
47-
4839
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
4940
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
5041
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
5142

5243
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
5344
send_client = uamqp.SendClient(target, auth=sas_auth, debug=False)
54-
send_client.queue_message(message)
55-
results = send_client.send_all_messages()
56-
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
45+
for _ in range(10):
46+
header = uamqp.message.MessageHeader()
47+
header.durable = True
48+
props = uamqp.message.MessageProperties(message_id=b"message id")
49+
msg_content = b"hello world"
50+
message = uamqp.Message(
51+
msg_content,
52+
properties=props,
53+
header=header,
54+
application_properties=annotations,
55+
annotations=annotations)
56+
send_client.queue_message(message)
57+
results = send_client.send_all_messages(close_on_done=False)
58+
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
59+
send_client.close()
5760

5861

5962
def test_event_hubs_single_send_sync(live_eventhub_config):
6063
annotations={b"x-opt-partition-key": b"PartitionKeyInfo"}
6164
msg_content = b"hello world"
6265

63-
message = uamqp.Message(msg_content, annotations=annotations)
64-
6566
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
6667
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
6768
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
6869

6970
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
7071
send_client = uamqp.SendClient(target, auth=sas_auth, debug=False)
71-
send_client.send_message(message, close_on_done=True)
72+
for _ in range(10):
73+
message = uamqp.Message(msg_content, application_properties=annotations, annotations=annotations)
74+
send_client.send_message(message)
75+
send_client.close()
7276

7377

74-
def test_event_hubs_batch_send(live_eventhub_config):
78+
def test_event_hubs_batch_send_sync(live_eventhub_config):
7579
def data_generator():
76-
for i in range(500):
80+
for i in range(50):
7781
msg_content = "Hello world {}".format(i).encode('utf-8')
7882
yield msg_content
7983

80-
message_batch = uamqp.message.BatchMessage(data_generator())
81-
8284
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
8385
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
8486
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
8587

8688
target = "amqps://{}/{}/Partitions/0".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
8789
send_client = uamqp.SendClient(target, auth=sas_auth, debug=False)
88-
send_client.queue_message(message_batch)
89-
results = send_client.send_all_messages()
90-
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
90+
for _ in range(10):
91+
message_batch = uamqp.message.BatchMessage(data_generator())
92+
send_client.queue_message(message_batch)
93+
results = send_client.send_all_messages(close_on_done=False)
94+
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
95+
send_client.close()
9196

9297

9398
if __name__ == '__main__':
@@ -99,4 +104,4 @@ def data_generator():
99104
config['consumer_group'] = "$Default"
100105
config['partition'] = "0"
101106

102-
test_event_hubs_client_send(config)
107+
test_event_hubs_batch_send_sync(config)

samples/test_azure_event_hubs_send_async.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def test_event_hubs_client_send_async(live_eventhub_config):
3535
message = uamqp.Message(msg_content, application_properties=properties)
3636
plain_auth = authentication.SASLPlain(live_eventhub_config['hostname'], live_eventhub_config['key_name'], live_eventhub_config['access_key'])
3737
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
38-
send_client = uamqp.SendClientAsync(target, auth=plain_auth, debug=True)
38+
send_client = uamqp.SendClientAsync(target, auth=plain_auth, debug=False)
3939
send_client.queue_message(message)
4040
results = await send_client.send_all_messages_async()
4141
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
@@ -52,24 +52,42 @@ async def test_event_hubs_single_send_async(live_eventhub_config):
5252

5353
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
5454
send_client = uamqp.SendClientAsync(target, auth=sas_auth, debug=False)
55-
await send_client.send_message_async(message, close_on_done=True)
55+
56+
for _ in range(10):
57+
message = uamqp.Message(msg_content, application_properties=annotations, annotations=annotations)
58+
await send_client.send_message_async(message)
59+
await send_client.close_async()
5660

5761

5862
@pytest.mark.asyncio
5963
async def test_event_hubs_batch_send_async(live_eventhub_config):
60-
def data_generator():
61-
for i in range(5):
62-
msg_content = "Hello world {}".format(i).encode('utf-8')
63-
yield msg_content
64+
for _ in range(10):
65+
def data_generator():
66+
for i in range(50):
67+
msg_content = "Hello world {}".format(i).encode('utf-8')
68+
yield msg_content
6469

65-
message_batch = uamqp.message.BatchMessage(data_generator())
70+
message_batch = uamqp.message.BatchMessage(data_generator())
6671

67-
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
68-
sas_auth = authentication.SASTokenAsync.from_shared_access_key(uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
72+
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
73+
sas_auth = authentication.SASTokenAsync.from_shared_access_key(uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
6974

70-
target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
71-
send_client = uamqp.SendClientAsync(target, auth=sas_auth, debug=False)
75+
target = "amqps://{}/{}/Partitions/0".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
76+
send_client = uamqp.SendClientAsync(target, auth=sas_auth, debug=False)
7277

73-
send_client.queue_message(message_batch)
74-
results = await send_client.send_all_messages_async()
75-
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
78+
send_client.queue_message(message_batch)
79+
results = await send_client.send_all_messages_async()
80+
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
81+
82+
83+
if __name__ == '__main__':
84+
config = {}
85+
config['hostname'] = os.environ['EVENT_HUB_HOSTNAME']
86+
config['event_hub'] = os.environ['EVENT_HUB_NAME']
87+
config['key_name'] = os.environ['EVENT_HUB_SAS_POLICY']
88+
config['access_key'] = os.environ['EVENT_HUB_SAS_KEY']
89+
config['consumer_group'] = "$Default"
90+
config['partition'] = "0"
91+
92+
loop = asyncio.get_event_loop()
93+
loop.run_until_complete(test_event_hubs_single_send_async(config))

0 commit comments

Comments
 (0)