|
| 1 | +use azure_core_test::{recorded, TestContext}; |
| 2 | +use azure_messaging_eventhubs::{ |
| 3 | + ConsumerClient, OpenReceiverOptions, ProducerClient, StartLocation, StartPosition, |
| 4 | +}; |
| 5 | +use futures::StreamExt; |
| 6 | + |
| 7 | +#[recorded::test(live)] |
| 8 | +async fn consumer_error(ctx: TestContext) -> azure_core::Result<()> { |
| 9 | + let recording = ctx.recording(); |
| 10 | + // Set up the Event Hub client |
| 11 | + let eventhub_namespace = recording.var("EVENTHUBS_HOST", None); |
| 12 | + let eventhub_name = recording.var("EVENTHUB_NAME", None); |
| 13 | + let credential = recording.credential(); |
| 14 | + |
| 15 | + { |
| 16 | + let producer = ProducerClient::builder() |
| 17 | + .open( |
| 18 | + eventhub_namespace.as_str(), |
| 19 | + eventhub_name.as_str(), |
| 20 | + credential.clone(), |
| 21 | + ) |
| 22 | + .await?; |
| 23 | + for i in 0..12 { |
| 24 | + use azure_messaging_eventhubs::SendEventOptions; |
| 25 | + |
| 26 | + let event = format!("Event {}", i); |
| 27 | + producer |
| 28 | + .send_event( |
| 29 | + event, |
| 30 | + Some(SendEventOptions { |
| 31 | + partition_id: Some("0".into()), |
| 32 | + }), |
| 33 | + ) |
| 34 | + .await?; |
| 35 | + } |
| 36 | + } |
| 37 | + |
| 38 | + let consumer = ConsumerClient::builder() |
| 39 | + .open( |
| 40 | + eventhub_namespace.as_str(), |
| 41 | + eventhub_name, |
| 42 | + credential.clone(), |
| 43 | + ) |
| 44 | + .await?; |
| 45 | + |
| 46 | + println!("Opened consumer client"); |
| 47 | + |
| 48 | + // Get the partition IDs |
| 49 | + let properties = consumer.get_eventhub_properties().await?; |
| 50 | + println!("EventHub Properties: {:?}", properties); |
| 51 | + |
| 52 | + // The default is to receive messages from the end of the partition, so specify a start position at the start of the partition. |
| 53 | + let receiver = consumer |
| 54 | + .open_receiver_on_partition( |
| 55 | + properties.partition_ids[0].clone(), |
| 56 | + Some(OpenReceiverOptions { |
| 57 | + start_position: Some(StartPosition { |
| 58 | + location: StartLocation::Earliest, |
| 59 | + ..Default::default() |
| 60 | + }), |
| 61 | + // receive_timeout: Some(azure_core::time::Duration::seconds(1)), |
| 62 | + ..Default::default() |
| 63 | + }), |
| 64 | + ) |
| 65 | + .await?; |
| 66 | + |
| 67 | + println!("Created receiver"); |
| 68 | + |
| 69 | + // Create a stream of events from the receiver |
| 70 | + let mut receive_stream = receiver.stream_events(); |
| 71 | + |
| 72 | + println!("Created receive stream"); |
| 73 | + |
| 74 | + // Read 10 events |
| 75 | + let mut count = 0; |
| 76 | + while let Some(event) = receive_stream.next().await { |
| 77 | + count += 1; |
| 78 | + if count > 10 { |
| 79 | + break; |
| 80 | + } |
| 81 | + |
| 82 | + let event = event?; |
| 83 | + println!("Partition ID: {:?}", event.partition_key()); |
| 84 | + println!("Event offset: {:?}", event.offset()); |
| 85 | + } |
| 86 | + |
| 87 | + // Error |
| 88 | + match consumer.close().await { |
| 89 | + Ok(_) => { |
| 90 | + println!("Consumer closed successfully"); |
| 91 | + } |
| 92 | + Err(e) => { |
| 93 | + eprintln!("Error closing consumer: {}", e); |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + Ok(()) |
| 98 | +} |
0 commit comments