Skip to content

Commit 4a6f658

Browse files
authored
[Rust] [Connector] Cleanup (#800)
This PR: - removes the now unused data transformer - adds some more logging around retries - adds escapes to the retries that aren't critical. Some of these maybe should be put back in later, but for now this will make our first drop more stable
1 parent f827e29 commit 4a6f658

File tree

4 files changed

+15
-62
lines changed

4 files changed

+15
-62
lines changed

rust/azure_iot_operations_connector/src/base_connector/managed_azure_device_registry.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use crate::{
2929
};
3030

3131
/// Used as the strategy when using [`tokio_retry2::Retry`]
32-
const RETRY_STRATEGY: tokio_retry2::strategy::ExponentialBackoff =
33-
tokio_retry2::strategy::ExponentialBackoff::from_millis(100);
32+
const RETRY_STRATEGY: tokio_retry2::strategy::ExponentialFactorBackoff =
33+
tokio_retry2::strategy::ExponentialFactorBackoff::from_millis(500, 2.0);
3434

3535
/// An Observation for device endpoint creation events that uses
3636
/// multiple underlying clients to get full device endpoint information.
@@ -70,7 +70,7 @@ impl DeviceEndpointClientCreationObservation {
7070
.await?;
7171

7272
// and then get device update observation as well and turn it into a DeviceEndpointClientUpdateObservation
73-
let device_endpoint_client_update_observation = match Retry::spawn(RETRY_STRATEGY, async || -> Result<DeviceUpdateObservation, RetryError<azure_device_registry::Error>> {
73+
let device_endpoint_client_update_observation = match Retry::spawn(RETRY_STRATEGY.take(10), async || -> Result<DeviceUpdateObservation, RetryError<azure_device_registry::Error>> {
7474
self.connector_context
7575
.azure_device_registry_client
7676
.observe_device_update_notifications(
@@ -110,6 +110,7 @@ impl DeviceEndpointClientCreationObservation {
110110
match e.kind() {
111111
// network/retriable
112112
azure_device_registry::ErrorKind::AIOProtocolError(_) => {
113+
log::warn!("Get device definition failed. Retrying: {e}");
113114
RetryError::transient(e)
114115
}
115116
// indicates an error in the configuration, so we want to get a new notification instead of retrying this operation
@@ -135,7 +136,7 @@ impl DeviceEndpointClientCreationObservation {
135136
);
136137
// unobserve as cleanup
137138
let _ = Retry::spawn(
138-
RETRY_STRATEGY,
139+
RETRY_STRATEGY.take(10),
139140
async || -> Result<(), RetryError<azure_device_registry::Error>> {
140141
self.connector_context
141142
.azure_device_registry_client
@@ -175,7 +176,7 @@ impl DeviceEndpointClientCreationObservation {
175176
);
176177
// unobserve
177178
let _ = Retry::spawn(
178-
RETRY_STRATEGY,
179+
RETRY_STRATEGY.take(10),
179180
async || -> Result<(), RetryError<azure_device_registry::Error>> {
180181
self.connector_context
181182
.azure_device_registry_client
@@ -325,7 +326,7 @@ impl DeviceEndpointClient {
325326
) {
326327
// send status update to the service
327328
match Retry::spawn(
328-
RETRY_STRATEGY,
329+
RETRY_STRATEGY.take(10),
329330
async || -> Result<Device, RetryError<azure_device_registry::Error>> {
330331
self.connector_context
331332
.azure_device_registry_client
@@ -340,6 +341,7 @@ impl DeviceEndpointClient {
340341
match e.kind() {
341342
// network/retriable
342343
azure_device_registry::ErrorKind::AIOProtocolError(_) => {
344+
log::warn!("Update device status failed. Retrying: {e}");
343345
RetryError::transient(e)
344346
}
345347
// indicates an error in the configuration, might be transient in the future depending on what it can indicate
@@ -428,7 +430,7 @@ impl AssetClientCreationObservation {
428430
self.asset_create_observation.recv_notification().await?;
429431

430432
// Get asset update observation as well and turn it into a AssetClientUpdateObservation
431-
let asset_client_update_observation = match Retry::spawn(RETRY_STRATEGY, async || -> Result<AssetUpdateObservation, RetryError<azure_device_registry::Error>> {
433+
let asset_client_update_observation = match Retry::spawn(RETRY_STRATEGY.take(10), async || -> Result<AssetUpdateObservation, RetryError<azure_device_registry::Error>> {
432434
self.connector_context
433435
.azure_device_registry_client
434436
.observe_asset_update_notifications(
@@ -470,6 +472,7 @@ impl AssetClientCreationObservation {
470472
match e.kind() {
471473
// network/retriable
472474
azure_device_registry::ErrorKind::AIOProtocolError(_) => {
475+
log::warn!("Get asset definition failed. Retrying: {e}");
473476
RetryError::transient(e)
474477
}
475478
// indicates an error in the configuration, so we want to get a new notification instead of retrying this operation
@@ -502,7 +505,7 @@ impl AssetClientCreationObservation {
502505
log::error!("Dropping asset create notification: {asset_ref:?}");
503506
// unobserve as cleanup
504507
let _ = Retry::spawn(
505-
RETRY_STRATEGY,
508+
RETRY_STRATEGY.take(10),
506509
async || -> Result<(), RetryError<azure_device_registry::Error>> {
507510
self.connector_context
508511
.azure_device_registry_client
@@ -717,7 +720,7 @@ impl AssetClient {
717720
) {
718721
// send status update to the service
719722
match Retry::spawn(
720-
RETRY_STRATEGY,
723+
RETRY_STRATEGY.take(10),
721724
async || -> Result<Asset, RetryError<azure_device_registry::Error>> {
722725
connector_context
723726
.azure_device_registry_client
@@ -733,6 +736,7 @@ impl AssetClient {
733736
match e.kind() {
734737
// network/retriable
735738
azure_device_registry::ErrorKind::AIOProtocolError(_) => {
739+
log::warn!("Update asset status failed. Retrying: {e}");
736740
RetryError::transient(e)
737741
}
738742
// indicates an error in the configuration, might be transient in the future depending on what it can indicate
@@ -884,6 +888,7 @@ impl DatasetClient {
884888
match e.kind() {
885889
// network/retriable
886890
schema_registry::ErrorKind::AIOProtocolError(_) => {
891+
log::warn!("Reporting message schema failed. Retrying: {e}");
887892
RetryError::transient(e)
888893
}
889894
// indicates an error in the provided message schema, return to caller so they can fix

rust/azure_iot_operations_connector/src/data_transformer.rs

Lines changed: 0 additions & 51 deletions
This file was deleted.

rust/azure_iot_operations_connector/src/filemount/azure_device_registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ impl FileMountMap {
411411

412412
// Add the new asset to the tracked assets with the one shot sender so when the asset is
413413
// deleted the channel is closed and the receiver is notified.
414-
log::warn!("New asset: {asset:?}");
414+
log::info!("New asset: {asset:?}");
415415
tracked_assets.insert(asset.clone(), asset_deletion_tx);
416416

417417
// Notify that an asset has been created

rust/azure_iot_operations_connector/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use azure_iot_operations_services::{
1313

1414
pub mod base_connector;
1515
pub mod data_processor;
16-
pub mod data_transformer;
1716
pub mod destination_endpoint;
1817
pub mod filemount;
1918
pub mod source_endpoint;

0 commit comments

Comments
 (0)