Skip to content

Commit ab078c1

Browse files
Create EventHubs processor (Azure#2413)
1 parent 95cc29b commit ab078c1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3868
-358
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/core/azure_core_amqp/CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
# Release History
22

3+
## 0.2.0 (Unreleased)
4+
5+
### Features Added
6+
7+
- Added the ability to compare an `AmqpAnnotationKey` with a string and string slice.
8+
9+
### Breaking Changes
10+
11+
- APIs which used to return `Option<String>`, and `Option<Vec<T>>` now return `Option<&str>`, and `Option<&[T]>`.
12+
- APIs which take ownership of string parameters now take a `String` parameter instead of a `&str` parameter.
13+
14+
### Bugs Fixed
15+
16+
### Other Changes
17+
318
## 0.1.0 (2025-02-18)
419

520
### Features Added

sdk/core/azure_core_amqp/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ impl AmqpDescribedError {
5555
pub fn condition(&self) -> &AmqpSymbol {
5656
&self.condition
5757
}
58-
pub fn description(&self) -> Option<&String> {
59-
self.description.as_ref()
58+
pub fn description(&self) -> Option<&str> {
59+
self.description.as_deref()
6060
}
6161
pub fn info(&self) -> &AmqpOrderedMap<AmqpSymbol, AmqpValue> {
6262
&self.info

sdk/core/azure_core_amqp/src/fe2o3/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
8080
}
8181
if let Some(incoming_locales) = options.incoming_locales {
8282
for locale in incoming_locales {
83-
builder = builder.add_incoming_locales(locale.as_str());
83+
builder = builder.add_incoming_locales(locale);
8484
}
8585
}
8686
if let Some(offered_capabilities) = options.offered_capabilities.as_ref() {

sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4+
use std::time::Duration;
5+
46
use crate::{
57
messaging::{
68
AmqpAnnotationKey, AmqpAnnotations, AmqpApplicationProperties, AmqpMessageHeader,
@@ -136,9 +138,7 @@ impl From<fe2o3_amqp_types::messaging::Header> for AmqpMessageHeader {
136138
AmqpMessageHeader {
137139
durable: header.durable,
138140
priority: header.priority.into(),
139-
time_to_live: header
140-
.ttl
141-
.map(|t| std::time::Duration::from_millis(t as u64)),
141+
time_to_live: header.ttl.map(|t| Duration::from_millis(t as u64)),
142142
first_acquirer: (header.first_acquirer),
143143
delivery_count: (header.delivery_count),
144144
}
@@ -426,6 +426,8 @@ impl From<AmqpMessageProperties> for fe2o3_amqp_types::messaging::Properties {
426426

427427
#[test]
428428
fn test_properties_conversion() {
429+
use std::time::{SystemTime, UNIX_EPOCH};
430+
429431
{
430432
let properties = fe2o3_amqp_types::messaging::Properties {
431433
message_id: Some(fe2o3_amqp_types::messaging::MessageId::String(
@@ -451,14 +453,13 @@ fn test_properties_conversion() {
451453
}
452454

453455
{
454-
let time_now = std::time::SystemTime::now()
455-
.duration_since(std::time::UNIX_EPOCH)
456+
let time_now = SystemTime::now()
457+
.duration_since(UNIX_EPOCH)
456458
.unwrap()
457459
.as_millis() as i64;
458460

459461
// Round trip time_now through milliseconds to round down from nanoseconds.
460-
let time_now: std::time::SystemTime =
461-
std::time::UNIX_EPOCH + std::time::Duration::from_millis(time_now as u64);
462+
let time_now: SystemTime = UNIX_EPOCH + Duration::from_millis(time_now as u64);
462463

463464
let properties = AmqpMessageProperties {
464465
absolute_expiry_time: Some(time_now.into()),

sdk/core/azure_core_amqp/src/fe2o3/messaging/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,8 @@ impl From<AmqpMessage>
428428
#[cfg(test)]
429429
mod tests {
430430

431+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
432+
431433
use super::*;
432434
use crate::messaging::{
433435
AmqpAnnotationKey, AmqpAnnotations, AmqpMessageHeader, AmqpMessageProperties,
@@ -510,14 +512,13 @@ mod tests {
510512
}
511513
// Amqp->Fe2o3
512514
{
513-
let timestamp = std::time::SystemTime::now()
514-
.duration_since(std::time::UNIX_EPOCH)
515+
let timestamp = SystemTime::now()
516+
.duration_since(UNIX_EPOCH)
515517
.unwrap()
516518
.as_millis() as i64;
517519

518520
// Round trip timestamp through milliseconds to round down from nanoseconds.
519-
let timestamp: std::time::SystemTime =
520-
std::time::UNIX_EPOCH + std::time::Duration::from_millis(timestamp as u64);
521+
let timestamp: SystemTime = UNIX_EPOCH + Duration::from_millis(timestamp as u64);
521522

522523
let amqp_message = AmqpMessage::builder()
523524
.add_application_property("abc".to_string(), "23 skiddoo")
@@ -542,7 +543,7 @@ mod tests {
542543
delivery_count: 95,
543544
first_acquirer: true,
544545
durable: true,
545-
time_to_live: Some(std::time::Duration::from_millis(1000)),
546+
time_to_live: Some(Duration::from_millis(1000)),
546547
priority: 3,
547548
})
548549
.with_delivery_annotations(AmqpAnnotations::from(vec![

sdk/core/azure_core_amqp/src/fe2o3/value.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
};
1212
use serde_amqp::primitives::Timestamp;
1313
use serde_bytes::ByteBuf;
14-
use std::time::UNIX_EPOCH;
14+
use std::time::{Duration, UNIX_EPOCH};
1515
use typespec::error::ErrorKind;
1616

1717
use super::error::Fe2o3SerializationError;
@@ -57,9 +57,7 @@ impl From<fe2o3_amqp_types::primitives::Timestamp> for AmqpTimestamp {
5757
return AmqpTimestamp(None);
5858
}
5959
AmqpTimestamp(
60-
std::time::UNIX_EPOCH.checked_add(std::time::Duration::from_millis(
61-
timestamp.milliseconds() as u64,
62-
)),
60+
UNIX_EPOCH.checked_add(Duration::from_millis(timestamp.milliseconds() as u64)),
6361
)
6462
}
6563
}

sdk/core/azure_core_amqp/src/messaging.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::Deserializable;
99
#[cfg(feature = "cplusplus")]
1010
use azure_core::error::ErrorKind;
1111
use azure_core::{Result, Uuid};
12+
use std::time::Duration;
1213

1314
#[cfg(all(feature = "fe2o3_amqp", not(target_arch = "wasm32")))]
1415
type DeliveryImplementation = super::fe2o3::messaging::messaging_types::Fe2o3AmqpDelivery;
@@ -233,8 +234,8 @@ impl AmqpTarget {
233234
builders::AmqpTargetBuilder::new()
234235
}
235236

236-
pub fn address(&self) -> Option<&String> {
237-
self.address.as_ref()
237+
pub fn address(&self) -> Option<&str> {
238+
self.address.as_deref()
238239
}
239240

240241
pub fn durable(&self) -> Option<&TerminusDurability> {
@@ -257,8 +258,8 @@ impl AmqpTarget {
257258
self.dynamic_node_properties.as_ref()
258259
}
259260

260-
pub fn capabilities(&self) -> Option<&Vec<AmqpValue>> {
261-
self.capabilities.as_ref()
261+
pub fn capabilities(&self) -> Option<&[AmqpValue]> {
262+
self.capabilities.as_deref()
262263
}
263264
}
264265

@@ -577,7 +578,7 @@ impl From<AmqpSource> for AmqpList {
577578
pub struct AmqpMessageHeader {
578579
pub durable: bool,
579580
pub priority: u8,
580-
pub time_to_live: Option<std::time::Duration>,
581+
pub time_to_live: Option<Duration>,
581582
pub first_acquirer: bool,
582583
pub delivery_count: u32,
583584
}
@@ -980,6 +981,24 @@ impl From<u64> for AmqpAnnotationKey {
980981
}
981982
}
982983

984+
impl PartialEq<String> for AmqpAnnotationKey {
985+
fn eq(&self, other: &String) -> bool {
986+
match self {
987+
AmqpAnnotationKey::Symbol(symbol) => symbol.0 == *other,
988+
AmqpAnnotationKey::Ulong(_) => false,
989+
}
990+
}
991+
}
992+
993+
impl PartialEq<&str> for AmqpAnnotationKey {
994+
fn eq(&self, other: &&str) -> bool {
995+
match self {
996+
AmqpAnnotationKey::Symbol(symbol) => symbol.0 == *other,
997+
AmqpAnnotationKey::Ulong(_) => false,
998+
}
999+
}
1000+
}
1001+
9831002
#[derive(Debug, Clone, PartialEq, Default)]
9841003
pub struct AmqpAnnotations(pub AmqpOrderedMap<AmqpAnnotationKey, AmqpValue>);
9851004

sdk/core/azure_core_amqp/src/receiver.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ mod tests {
182182

183183
assert!(receiver_options.target.is_some());
184184
assert_eq!(
185-
receiver_options.target.unwrap().address().unwrap().clone(),
186-
"test_address".to_string()
185+
receiver_options.target.unwrap().address().unwrap(),
186+
"test_address"
187187
);
188188
}
189189

@@ -249,8 +249,8 @@ mod tests {
249249
);
250250
assert!(receiver_options.target.is_some());
251251
assert_eq!(
252-
receiver_options.target.unwrap().address().unwrap().clone(),
253-
"combo_address".to_string()
252+
receiver_options.target.unwrap().address().unwrap(),
253+
"combo_address"
254254
);
255255
assert_eq!(receiver_options.name.unwrap(), "combo_name".to_string());
256256
assert!(receiver_options.properties.is_some());

sdk/core/azure_core_amqp/src/session.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ impl AmqpSessionOptions {
4343
self.handle_max
4444
}
4545

46-
pub fn offered_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
47-
self.offered_capabilities.as_ref()
46+
pub fn offered_capabilities(&self) -> Option<&[AmqpSymbol]> {
47+
self.offered_capabilities.as_deref()
4848
}
4949

50-
pub fn desired_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
51-
self.desired_capabilities.as_ref()
50+
pub fn desired_capabilities(&self) -> Option<&[AmqpSymbol]> {
51+
self.desired_capabilities.as_deref()
5252
}
5353

5454
pub fn properties(&self) -> Option<&AmqpOrderedMap<AmqpSymbol, AmqpValue>> {

0 commit comments

Comments
 (0)