Skip to content

Commit b6d56b3

Browse files
[Rust][Connector] Service-specific timeouts (#1010)
* Created separate timeout fields for ADR, SR, and DSS inside the `ConnectorContext` * Increased the timeout for SR to 90 seconds to account for throttling/bottlenecking issues --------- Co-authored-by: Copilot <[email protected]>
1 parent 4403ef3 commit b6d56b3

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

rust/azure_iot_operations_connector/src/base_connector.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ pub(crate) struct ConnectorContext {
2727
connector_artifacts: ConnectorArtifacts,
2828
/// Debounce duration for filemount operations for the connector
2929
debounce_duration: Duration,
30-
/// Default timeout for connector operations
31-
pub(crate) default_timeout: Duration,
30+
/// Timeout for Azure Device Registry operations
31+
pub(crate) azure_device_registry_timeout: Duration,
32+
/// Timeout for Schema Registry operations
33+
pub(crate) schema_registry_timeout: Duration,
34+
/// Timeout for State Store operations
35+
pub(crate) state_store_timeout: Duration,
3236
/// Clients used to perform connector operations
3337
azure_device_registry_client: azure_device_registry::Client<SessionManagedClient>,
3438
pub(crate) state_store_client: Arc<state_store::Client<SessionManagedClient>>,
@@ -40,7 +44,12 @@ impl std::fmt::Debug for ConnectorContext {
4044
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4145
f.debug_struct("ConnectorContext")
4246
.field("debounce_duration", &self.debounce_duration)
43-
.field("default_timeout", &self.default_timeout)
47+
.field(
48+
"azure_device_registry_timeout",
49+
&self.azure_device_registry_timeout,
50+
)
51+
.field("schema_registry_timeout", &self.schema_registry_timeout)
52+
.field("state_store_timeout", &self.state_store_timeout)
4453
.finish()
4554
}
4655
}
@@ -100,9 +109,15 @@ impl BaseConnector {
100109

101110
Ok(Self {
102111
connector_context: Arc::new(ConnectorContext {
103-
// TODO: validate these timeouts here once they come from somewhere
104-
debounce_duration: Duration::from_secs(5), // TODO: come from somewhere
105-
default_timeout: Duration::from_secs(10), // TODO: come from somewhere
112+
// TODO: These timeouts should come from somewhere, specifically, probably the artifacts.
113+
// These will need to be configured by the connector deployer, not the connector author,
114+
// so exposing them through API is not the correct solution.
115+
debounce_duration: Duration::from_secs(5),
116+
azure_device_registry_timeout: Duration::from_secs(10),
117+
// NOTE (2025-09-12): Schema Registry has an issue with scale causing throttling,
118+
// so this value has been set very high. This is probably not ideal.
119+
schema_registry_timeout: Duration::from_secs(90),
120+
state_store_timeout: Duration::from_secs(10),
106121
application_context,
107122
managed_client: session.create_managed_client(),
108123
connector_artifacts,

rust/azure_iot_operations_connector/src/base_connector/adr_discovery.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Client {
6060
device_name,
6161
device,
6262
inbound_endpoint_type,
63-
self.0.default_timeout,
63+
self.0.azure_device_registry_timeout,
6464
)
6565
.await
6666
}
@@ -106,7 +106,7 @@ impl Client {
106106
inbound_endpoint_name,
107107
asset_name,
108108
asset,
109-
self.0.default_timeout,
109+
self.0.azure_device_registry_timeout,
110110
)
111111
.await
112112
}

rust/azure_iot_operations_connector/src/base_connector/managed_azure_device_registry.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl DeviceEndpointStatusReporter {
334334
device_endpoint_ref.device_name.clone(),
335335
device_endpoint_ref.inbound_endpoint_name.clone(),
336336
adr_device_status.clone(),
337-
connector_context.default_timeout,
337+
connector_context.azure_device_registry_timeout,
338338
)
339339
.await
340340
.map_err(|e| adr_error_into_retry_error(e, "Update Device Status"))
@@ -442,7 +442,7 @@ impl DeviceEndpointClientCreationObservation {
442442
.observe_device_update_notifications(
443443
device_endpoint_ref.device_name.clone(),
444444
device_endpoint_ref.inbound_endpoint_name.clone(),
445-
connector_context.default_timeout,
445+
connector_context.azure_device_registry_timeout,
446446
)
447447
// retry on network errors, otherwise don't retry on config/dev errors
448448
.await
@@ -464,7 +464,7 @@ impl DeviceEndpointClientCreationObservation {
464464
.get_device(
465465
device_endpoint_ref.device_name.clone(),
466466
device_endpoint_ref.inbound_endpoint_name.clone(),
467-
connector_context.default_timeout,
467+
connector_context.azure_device_registry_timeout,
468468
)
469469
.await
470470
.map_err(|e| adr_error_into_retry_error(e, "Get Device Definition"))
@@ -493,7 +493,7 @@ impl DeviceEndpointClientCreationObservation {
493493
.get_device_status(
494494
device_endpoint_ref.device_name.clone(),
495495
device_endpoint_ref.inbound_endpoint_name.clone(),
496-
connector_context.default_timeout,
496+
connector_context.azure_device_registry_timeout,
497497
)
498498
.await
499499
.map_err(|e| adr_error_into_retry_error(e, "Get Device Status"))
@@ -738,7 +738,7 @@ impl DeviceEndpointClient {
738738
asset_ref.device_name.clone(),
739739
asset_ref.inbound_endpoint_name.clone(),
740740
asset_ref.name.clone(),
741-
connector_context.default_timeout,
741+
connector_context.azure_device_registry_timeout,
742742
)
743743
// retry on network errors, otherwise don't retry on config/dev errors
744744
.await
@@ -761,7 +761,7 @@ impl DeviceEndpointClient {
761761
asset_ref.device_name.clone(),
762762
asset_ref.inbound_endpoint_name.clone(),
763763
asset_ref.name.clone(),
764-
connector_context.default_timeout,
764+
connector_context.azure_device_registry_timeout,
765765
)
766766
.await
767767
.map_err(|e| adr_error_into_retry_error(e, "Get Asset Definition"))
@@ -790,7 +790,7 @@ impl DeviceEndpointClient {
790790
asset_ref.device_name.clone(),
791791
asset_ref.inbound_endpoint_name.clone(),
792792
asset_ref.name.clone(),
793-
connector_context.default_timeout,
793+
connector_context.azure_device_registry_timeout,
794794
)
795795
.await
796796
.map_err(|e| adr_error_into_retry_error(e, "Get Asset Status"))
@@ -851,7 +851,7 @@ impl DeviceEndpointClient {
851851
.unobserve_device_update_notifications(
852852
device_endpoint_ref.device_name.clone(),
853853
device_endpoint_ref.inbound_endpoint_name.clone(),
854-
connector_context.default_timeout,
854+
connector_context.azure_device_registry_timeout,
855855
)
856856
// retry on network errors, otherwise don't retry on config/dev errors
857857
.await
@@ -1006,7 +1006,7 @@ impl AssetStatusReporter {
10061006
asset_ref.inbound_endpoint_name.clone(),
10071007
asset_ref.name.clone(),
10081008
adr_asset_status.clone(),
1009-
connector_context.default_timeout,
1009+
connector_context.azure_device_registry_timeout,
10101010
)
10111011
.await
10121012
.map_err(|e| adr_error_into_retry_error(e, &format!("Update Asset Status for {log_identifier}")))
@@ -1737,7 +1737,7 @@ impl AssetClient {
17371737
asset_ref.device_name.clone(),
17381738
asset_ref.inbound_endpoint_name.clone(),
17391739
asset_ref.name.clone(),
1740-
connector_context.default_timeout,
1740+
connector_context.azure_device_registry_timeout,
17411741
)
17421742
// retry on network errors, otherwise don't retry on config/dev errors
17431743
.await
@@ -2297,7 +2297,7 @@ impl DataOperationClient {
22972297
.schema_registry_client
22982298
.put(
22992299
new_message_schema.clone(),
2300-
self.connector_context.default_timeout,
2300+
self.connector_context.schema_registry_timeout,
23012301
)
23022302
.await
23032303
.map_err(|e| {

rust/azure_iot_operations_connector/src/destination_endpoint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl Forwarder {
170170
.set(
171171
key.clone().into(),
172172
data.payload,
173-
self.connector_context.default_timeout,
173+
self.connector_context.state_store_timeout,
174174
None,
175175
state_store::SetOptions {
176176
expires: None, // TODO: expiry?

0 commit comments

Comments
 (0)