Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions eng/scripts/setup-openssl-env.ps1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Needs a shebang:
    #!/usr/bin/env pwsh
  2. Needs a copyright header after that (Microsoft OSS rules).
  3. You need to chmod +x the script. If you're on Windows, run git update-index --chmod=+x <path>.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Setup script for OpenSSL environment to enable workspace-wide clippy and builds
# Run this script before running cargo clippy --workspace or similar commands

Write-Host "Setting up OpenSSL environment for Rust builds..." -ForegroundColor Green

# Set VCPKG environment variables
$env:VCPKG_ROOT = "C:\vcpkg"
$env:OPENSSL_DIR = "C:\vcpkg\installed\x64-windows-static-md"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a global directory when we may have different versions for different repos? I have several worktrees (repos) on my machine, which is why the existing docs in our root CONTRIBUTING.md say to use a repo-related path.

If you're going to codify those instructions:

  1. They should be compatible with what I've already tested works in more ways than this.
  2. Those instruction should be updated to have people run this script instead.

$env:CMAKE_TOOLCHAIN_FILE = "C:\vcpkg\scripts\buildsystems\vcpkg.cmake"

Write-Host "Environment variables set:" -ForegroundColor Yellow
Write-Host " VCPKG_ROOT = $env:VCPKG_ROOT"
Write-Host " OPENSSL_DIR = $env:OPENSSL_DIR"
Write-Host " CMAKE_TOOLCHAIN_FILE = $env:CMAKE_TOOLCHAIN_FILE"

# Check if vcpkg and OpenSSL are installed
if (-not (Test-Path "C:\vcpkg\vcpkg.exe")) {
Write-Host "WARNING: vcpkg not found at C:\vcpkg\vcpkg.exe" -ForegroundColor Red
Write-Host "Please run the following commands to install vcpkg and OpenSSL:" -ForegroundColor Yellow
Write-Host " git clone https://github.com/Microsoft/vcpkg.git C:\vcpkg"
Comment on lines +17 to +20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why script it if you're not going to do it for them? Why is this script beneficial instead of just using the existing CONTRIBUTING.md if we don't basically do it all for them? Why not go that extra step?

Write-Host " C:\vcpkg\bootstrap-vcpkg.bat"
Write-Host " C:\vcpkg\vcpkg.exe integrate install"
Write-Host " C:\vcpkg\vcpkg.exe install openssl:x64-windows-static-md"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What version? We may pin a version, which we already have in our eng/vcpkg.json that, BTW, you're not using.

}
elseif (-not (Test-Path "C:\vcpkg\installed\x64-windows-static-md\lib\libssl.lib")) {
Write-Host "WARNING: OpenSSL not found in vcpkg installation" -ForegroundColor Red
Write-Host "Please run: C:\vcpkg\vcpkg.exe install openssl:x64-windows-static-md"
}
else {
Write-Host "✓ vcpkg and OpenSSL are properly installed" -ForegroundColor Green
Write-Host ""
Write-Host "You can now run:" -ForegroundColor Cyan
Write-Host " cargo clippy --workspace --all-features --all-targets --keep-going --no-deps"
Write-Host " cargo build --workspace --all-features --all-targets"
Write-Host " cargo test --workspace"
}
11 changes: 11 additions & 0 deletions sdk/servicebus/azure_messaging_servicebus/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

//! Clients used to communicate with Azure Service Bus

mod servicebus_client;

pub use servicebus_client::{
CreateReceiverOptions, CreateSenderOptions, ServiceBusClient, ServiceBusClientBuilder,
ServiceBusClientOptions, SubQueue,
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum SubQueue {

impl SubQueue {
/// Returns the path suffix for the sub-queue.
pub fn as_path_suffix(&self) -> &'static str {
pub(crate) fn as_path_suffix(&self) -> &'static str {
match self {
SubQueue::DeadLetter => "/$DeadLetterQueue",
SubQueue::Transfer => "/$Transfer/$DeadLetterQueue",
Expand All @@ -52,7 +52,7 @@ pub struct ServiceBusClientOptions {
impl Default for ServiceBusClientOptions {
fn default() -> Self {
Self {
api_version: "2017-04".to_string(), // Default Service Bus API version
api_version: "2024-01-01".to_string(), // Default Service Bus API version
client_options: ClientOptions::default(),
application_id: None,
}
Expand Down
77 changes: 67 additions & 10 deletions sdk/servicebus/azure_messaging_servicebus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ impl fmt::Display for ErrorKind {
}

/// A Service Bus specific error.
#[derive(SafeDebug, Clone, PartialEq, Eq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error type should just be Debug not SafeDebug. All the information is kinda important and you should make sure nothing inside it otherwise leaks. typespec::Error impl Debug.

#[derive(SafeDebug)]
pub struct ServiceBusError {
kind: ErrorKind,
message: String,
source: Option<Box<ServiceBusError>>,
source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add Send, Sync, and 'static constraints? That makes it pretty difficult to send any error since not all implement those traits.

}

impl ServiceBusError {
/// Creates a new Service Bus error.
pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
pub(crate) fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why protect it? What if someone wanted to wrap our client and return their own instance of a ServiceBusError? I don't see any value in this.

Self {
kind,
message: message.into(),
Expand All @@ -71,10 +71,10 @@ impl ServiceBusError {
}

/// Creates a new Service Bus error with a source error.
pub fn with_source(
pub(crate) fn with_source(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

kind: ErrorKind,
message: impl Into<String>,
source: ServiceBusError,
source: impl std::error::Error + Send + Sync + 'static,
) -> Self {
Self {
kind,
Expand All @@ -94,8 +94,10 @@ impl ServiceBusError {
}

/// Returns the source error, if any.
pub fn source(&self) -> Option<&ServiceBusError> {
self.source.as_deref()
pub fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have a source() function on ServiceBusError which is a different implementation from the trait version? The two appear to be basically identical.

self.source
.as_ref()
.map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
}
}

Expand All @@ -107,7 +109,9 @@ impl fmt::Display for ServiceBusError {

impl std::error::Error for ServiceBusError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e as &dyn std::error::Error)
self.source
.as_ref()
.map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
}
}

Expand All @@ -123,12 +127,65 @@ impl From<azure_core::error::Error> for ServiceBusError {
_ => ErrorKind::Unknown,
};

ServiceBusError::new(kind, error.to_string())
ServiceBusError::with_source(kind, error.to_string(), error)
}
}

impl From<azure_core_amqp::AmqpError> for ServiceBusError {
fn from(error: azure_core_amqp::AmqpError) -> Self {
ServiceBusError::new(ErrorKind::Amqp, error.to_string())
ServiceBusError::with_source(ErrorKind::Amqp, error.to_string(), error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect to provide descriptive texts for servicebus errors generated from other errors? If your general practice is to just use .to_string(), there's not a ton of value in the message parameter.

}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_servicebus_error_can_store_any_std_error() {
// Test that we can store any std::error::Error as a source
let io_error = std::io::Error::other("test error");
let service_bus_error =
ServiceBusError::with_source(ErrorKind::Unknown, "wrapper error", io_error);

assert_eq!(service_bus_error.kind(), &ErrorKind::Unknown);
assert_eq!(service_bus_error.message(), "wrapper error");
assert!(service_bus_error.source().is_some());

// Verify the source can be downcast to the original error type
let source = service_bus_error.source().unwrap();
assert!(source.downcast_ref::<std::io::Error>().is_some());
}

#[test]
fn test_servicebus_error_implements_std_error() {
let error = ServiceBusError::new(ErrorKind::InvalidRequest, "test message");

// Should implement std::error::Error
let _: &dyn std::error::Error = &error;

// Should return None for source when no source is set
assert!(error.source().is_none());
}

#[test]
fn test_servicebus_error_with_chain() {
let inner_error = std::io::Error::other("inner error");
let middle_error =
ServiceBusError::with_source(ErrorKind::Amqp, "middle error", inner_error);
let outer_error =
ServiceBusError::with_source(ErrorKind::Unknown, "outer error", middle_error);

// Check that we can traverse the error chain
assert_eq!(outer_error.kind(), &ErrorKind::Unknown);
assert_eq!(outer_error.message(), "outer error");

let source = outer_error.source().unwrap();
let middle_as_servicebus = source.downcast_ref::<ServiceBusError>().unwrap();
assert_eq!(middle_as_servicebus.kind(), &ErrorKind::Amqp);
assert_eq!(middle_as_servicebus.message(), "middle error");

let inner_source = middle_as_servicebus.source().unwrap();
assert!(inner_source.downcast_ref::<std::io::Error>().is_some());
}
}
4 changes: 2 additions & 2 deletions sdk/servicebus/azure_messaging_servicebus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

/// Service Bus client
pub mod client;
pub mod clients;
mod error;
mod message;
/// Service Bus message receiving functionality and options.
Expand All @@ -22,7 +22,7 @@ pub mod models;
/// Common types and utilities.
mod common;

pub use client::{
pub use clients::{
CreateReceiverOptions, CreateSenderOptions, ServiceBusClient, ServiceBusClientBuilder,
ServiceBusClientOptions, SubQueue,
};
Expand Down
72 changes: 51 additions & 21 deletions sdk/servicebus/azure_messaging_servicebus/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@
//! ```

use crate::{
client::ServiceBusClientOptions, message::SystemProperties, ErrorKind, ReceivedMessage, Result,
ServiceBusError,
clients::ServiceBusClientOptions, message::SystemProperties, ErrorKind, ReceivedMessage,
Result, ServiceBusError,
};
use async_lock::{Mutex, OnceCell};
use azure_core::{fmt::SafeDebug, time::Duration, time::OffsetDateTime, Uuid};
Expand All @@ -113,6 +113,33 @@ use futures::{select, FutureExt};
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, trace, warn};

/// Property key for lock token in Service Bus management operations.
const LOCK_TOKEN_PROPERTY_KEY: &str = "lock-token";

/// Property key for sequence numbers in Service Bus management operations.
const SEQUENCE_NUMBERS_PROPERTY_KEY: &str = "sequence-numbers";

/// Property key for receiver settle mode in Service Bus management operations.
const RECEIVER_SETTLE_MODE_PROPERTY_KEY: &str = "receiver-settle-mode";

/// Property key for message count in Service Bus management operations.
const MESSAGE_COUNT_PROPERTY_KEY: &str = "message-count";

/// Property key for from sequence number in Service Bus management operations.
const FROM_SEQUENCE_NUMBER_PROPERTY_KEY: &str = "from-sequence-number";

/// Service Bus management operation name for deferring messages.
const DEFER_MESSAGE_OPERATION: &str = "com.microsoft:defer-message";

/// Service Bus management operation name for receiving messages by sequence number.
const RECEIVE_BY_SEQUENCE_NUMBER_OPERATION: &str = "com.microsoft:receive-by-sequence-number";

/// Service Bus management operation name for renewing message locks.
const RENEW_LOCK_OPERATION: &str = "com.microsoft:renew-lock";

/// Service Bus management operation name for peeking messages.
const PEEK_MESSAGE_OPERATION: &str = "com.microsoft:peek-message";

/// Represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`.
///
/// This enum controls when a message is deleted from Service Bus and determines how message
Expand Down Expand Up @@ -1291,7 +1318,10 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Add the lock token as a string representation
application_properties.insert("lock-token".to_string(), lock_token.to_string().into());
application_properties.insert(
LOCK_TOKEN_PROPERTY_KEY.to_string(),
lock_token.to_string().into(),
);

// Add any properties to modify if specified
if let Some(properties) = options
Expand All @@ -1304,10 +1334,7 @@ impl Receiver {
}

let _response = management_client
.call(
"com.microsoft:defer-message".to_string(),
application_properties,
)
.call(DEFER_MESSAGE_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(ErrorKind::Amqp, format!("Failed to defer message: {:?}", e))
Expand Down Expand Up @@ -1515,18 +1542,24 @@ impl Receiver {
.collect::<Vec<_>>()
.join(",");

application_properties.insert("sequence-numbers".to_string(), sequence_numbers_str.into());
application_properties.insert(
SEQUENCE_NUMBERS_PROPERTY_KEY.to_string(),
sequence_numbers_str.into(),
);

// Set receiver settle mode based on receive mode
let settle_mode = match self.receive_mode {
ReceiveMode::PeekLock => 1u32,
ReceiveMode::ReceiveAndDelete => 0u32,
};
application_properties.insert("receiver-settle-mode".to_string(), settle_mode.into());
application_properties.insert(
RECEIVER_SETTLE_MODE_PROPERTY_KEY.to_string(),
settle_mode.into(),
);

let response = management_client
.call(
"com.microsoft:receive-by-sequence-number".to_string(),
RECEIVE_BY_SEQUENCE_NUMBER_OPERATION.to_string(),
application_properties,
)
.await
Expand Down Expand Up @@ -1716,13 +1749,13 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Add the lock token as a string representation
application_properties.insert("lock-token".to_string(), lock_token.to_string().into());
application_properties.insert(
LOCK_TOKEN_PROPERTY_KEY.to_string(),
lock_token.to_string().into(),
);

let response = management_client
.call(
"com.microsoft:renew-lock".to_string(),
application_properties,
)
.call(RENEW_LOCK_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(
Expand Down Expand Up @@ -1922,21 +1955,18 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Set the maximum number of messages to peek
application_properties.insert("message-count".to_string(), max_count.into());
application_properties.insert(MESSAGE_COUNT_PROPERTY_KEY.to_string(), max_count.into());

// Set the starting sequence number if provided
if let Some(from_sequence_number) = options.as_ref().and_then(|o| o.from_sequence_number) {
application_properties.insert(
"from-sequence-number".to_string(),
FROM_SEQUENCE_NUMBER_PROPERTY_KEY.to_string(),
from_sequence_number.into(),
);
}

let response = management_client
.call(
"com.microsoft:peek-message".to_string(),
application_properties,
)
.call(PEEK_MESSAGE_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(ErrorKind::Amqp, format!("Failed to peek messages: {:?}", e))
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure_messaging_servicebus/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.

use crate::{client::ServiceBusClientOptions, ErrorKind, Message, Result, ServiceBusError};
use crate::{clients::ServiceBusClientOptions, ErrorKind, Message, Result, ServiceBusError};
use azure_core::fmt::SafeDebug;
use azure_core_amqp::{
AmqpConnection, AmqpMessage, AmqpSender, AmqpSenderApis, AmqpSession, AmqpSessionApis,
Expand Down

This file was deleted.

Loading