Skip to content

Added customer connection lost test from issue #2503 #2899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdk/core/azure_core_amqp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
default = ["fe2o3_amqp"]
cplusplus = []
test = []
amqp_tracing = ["fe2o3-amqp/tracing"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts come to mind. First and foremost - and I've started taking this stance with various reqwest options in azure_core - I don't think we want to start 1) trying to match parity with dependencies' features, and 2) advertise specific dependencies. At least 2 you did, but because fe2o3_amqp is optional, this should be:

Suggested change
amqp_tracing = ["fe2o3-amqp/tracing"]
amqp_tracing = ["fe2o3-amqp?/tracing"]

If we added support for other backends in the future, you could add them to this single feature.

That said, and considering what I said above, wouldn't it be better that someone just take a dependency on fe2o3-amqp themselves and add the feature? Resolve 2+ will unify (combine) them. This is the stance I think we want to take with various reqwest features apart from a few sane options like gzip and deflate support. Maybe even rustls given the issues that's causing for some customers.

/cc @RickWinter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we go that route, in your crate's Cargo.toml either add their tracing feature in your dev-dependencies for fe2o3-amqp (a separate import that adds features - I do this elsewhere and it works fine), or if you just want to do it for this one test, you could try declaring a [[test]] table just for this one file and add required-features = ["fe2o3-amqp/tracing"] and see if that works; though, I honestly don't know if it will...and kinda doubt it will, but worth a try.

fe2o3_amqp = [
"dep:fe2o3-amqp",
"fe2o3-amqp-types",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use azure_core_test::{recorded, TestContext};
use azure_messaging_eventhubs::{
ConsumerClient, OpenReceiverOptions, ProducerClient, StartLocation, StartPosition,
};
use futures::StreamExt;

#[recorded::test(live)]
async fn consumer_error(ctx: TestContext) -> azure_core::Result<()> {
let recording = ctx.recording();
// Set up the Event Hub client
let eventhub_namespace = recording.var("EVENTHUBS_HOST", None);
let eventhub_name = recording.var("EVENTHUB_NAME", None);
let credential = recording.credential();

{
let producer = ProducerClient::builder()
.open(
eventhub_namespace.as_str(),
eventhub_name.as_str(),
credential.clone(),
)
.await?;
for i in 0..12 {
use azure_messaging_eventhubs::SendEventOptions;

let event = format!("Event {}", i);
producer
.send_event(
event,
Some(SendEventOptions {
partition_id: Some("0".into()),
}),
)
.await?;
}
}

let consumer = ConsumerClient::builder()
.open(
eventhub_namespace.as_str(),
eventhub_name,
credential.clone(),
)
.await?;

println!("Opened consumer client");

// Get the partition IDs
let properties = consumer.get_eventhub_properties().await?;
println!("EventHub Properties: {:?}", properties);

// The default is to receive messages from the end of the partition, so specify a start position at the start of the partition.
let receiver = consumer
.open_receiver_on_partition(
properties.partition_ids[0].clone(),
Some(OpenReceiverOptions {
start_position: Some(StartPosition {
location: StartLocation::Earliest,
..Default::default()
}),
// receive_timeout: Some(azure_core::time::Duration::seconds(1)),
..Default::default()
}),
)
.await?;

println!("Created receiver");

// Create a stream of events from the receiver
let mut receive_stream = receiver.stream_events();

println!("Created receive stream");

// Read 10 events
let mut count = 0;
while let Some(event) = receive_stream.next().await {
count += 1;
if count > 10 {
break;
}

let event = event?;
println!("Partition ID: {:?}", event.partition_key());
println!("Event offset: {:?}", event.offset());
}

// Error
match consumer.close().await {
Ok(_) => {
println!("Consumer closed successfully");
}
Err(e) => {
eprintln!("Error closing consumer: {}", e);
}
}

Ok(())
}