Skip to content

Commit 7ecc446

Browse files
committed
Added customer connection lost test
1 parent b9f114e commit 7ecc446

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/azure_core_amqp/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
4040
default = ["fe2o3_amqp"]
4141
cplusplus = []
4242
test = []
43+
amqp_tracing = ["fe2o3-amqp/tracing"]
4344
fe2o3_amqp = [
4445
"dep:fe2o3-amqp",
4546
"fe2o3-amqp-types",
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use azure_core_test::{recorded, TestContext};
2+
use azure_messaging_eventhubs::{
3+
ConsumerClient, OpenReceiverOptions, ProducerClient, StartLocation, StartPosition,
4+
};
5+
use futures::StreamExt;
6+
7+
#[recorded::test(live)]
8+
async fn consumer_error(ctx: TestContext) -> azure_core::Result<()> {
9+
let recording = ctx.recording();
10+
// Set up the Event Hub client
11+
let eventhub_namespace = recording.var("EVENTHUBS_HOST", None);
12+
let eventhub_name = recording.var("EVENTHUB_NAME", None);
13+
let credential = recording.credential();
14+
15+
{
16+
let producer = ProducerClient::builder()
17+
.open(
18+
eventhub_namespace.as_str(),
19+
eventhub_name.as_str(),
20+
credential.clone(),
21+
)
22+
.await?;
23+
for i in 0..12 {
24+
use azure_messaging_eventhubs::SendEventOptions;
25+
26+
let event = format!("Event {}", i);
27+
producer
28+
.send_event(
29+
event,
30+
Some(SendEventOptions {
31+
partition_id: Some("0".into()),
32+
}),
33+
)
34+
.await?;
35+
}
36+
}
37+
38+
let consumer = ConsumerClient::builder()
39+
.open(
40+
eventhub_namespace.as_str(),
41+
eventhub_name,
42+
credential.clone(),
43+
)
44+
.await?;
45+
46+
println!("Opened consumer client");
47+
48+
// Get the partition IDs
49+
let properties = consumer.get_eventhub_properties().await?;
50+
println!("EventHub Properties: {:?}", properties);
51+
52+
// The default is to receive messages from the end of the partition, so specify a start position at the start of the partition.
53+
let receiver = consumer
54+
.open_receiver_on_partition(
55+
properties.partition_ids[0].clone(),
56+
Some(OpenReceiverOptions {
57+
start_position: Some(StartPosition {
58+
location: StartLocation::Earliest,
59+
..Default::default()
60+
}),
61+
// receive_timeout: Some(azure_core::time::Duration::seconds(1)),
62+
..Default::default()
63+
}),
64+
)
65+
.await?;
66+
67+
println!("Created receiver");
68+
69+
// Create a stream of events from the receiver
70+
let mut receive_stream = receiver.stream_events();
71+
72+
println!("Created receive stream");
73+
74+
// Read 10 events
75+
let mut count = 0;
76+
while let Some(event) = receive_stream.next().await {
77+
count += 1;
78+
if count > 10 {
79+
break;
80+
}
81+
82+
let event = event?;
83+
println!("Partition ID: {:?}", event.partition_key());
84+
println!("Event offset: {:?}", event.offset());
85+
}
86+
87+
// Error
88+
match consumer.close().await {
89+
Ok(_) => {
90+
println!("Consumer closed successfully");
91+
}
92+
Err(e) => {
93+
eprintln!("Error closing consumer: {}", e);
94+
}
95+
}
96+
97+
Ok(())
98+
}

0 commit comments

Comments
 (0)