Skip to content

Commit ace2c4d

Browse files
authored
[Rust] [Connector] Fix status updates in base connector (#801)
# Problem only vectors on statuses are applied as patch merges on the service, all other fields need to be specified as the current state to not be overwritten # This PR Updates status reporting APIs to send existing status fields when only trying to do a partial update. Note that some of this logic may change in the future, depending on how the merging strategy evolves
1 parent 4a6f658 commit ace2c4d

File tree

1 file changed

+73
-17
lines changed

1 file changed

+73
-17
lines changed

rust/azure_iot_operations_connector/src/base_connector/managed_azure_device_registry.rs

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -279,14 +279,14 @@ impl DeviceEndpointClient {
279279
/// Used to report the status of just the device,
280280
/// and then updates the [`Device`] with the new status returned
281281
pub async fn report_device_status(&mut self, device_status: Result<(), AdrConfigError>) {
282-
// Create status with empty endpoint status
282+
// Create status without updating the endpoint status
283283
let status = azure_device_registry::DeviceStatus {
284284
config: Some(azure_device_registry::StatusConfig {
285285
version: self.specification.version,
286286
error: device_status.err(),
287287
last_transition_time: None, // this field will be removed, so we don't need to worry about it for now
288288
}),
289-
// inserts the inbound endpoint name with None if there's no error, or Some(AdrConfigError) if there is
289+
// Endpoints are merged on the service, so sending an empty map won't update anything
290290
endpoints: HashMap::new(),
291291
};
292292

@@ -296,10 +296,23 @@ impl DeviceEndpointClient {
296296

297297
/// Used to report the status of just the endpoint,
298298
/// and then updates the [`Device`] with the new status returned
299+
/// # Panics
300+
/// if the status mutex has been poisoned, which should not be possible
299301
pub async fn report_endpoint_status(&mut self, endpoint_status: Result<(), AdrConfigError>) {
300-
// Create status with empty device status
302+
// If the version of the current status config matches the current version, then include the existing config.
303+
// If there's no current config or the version doesn't match, don't report a status since the status for this version hasn't been reported yet
304+
let current_config = self.status.read().unwrap().as_ref().and_then(|status| {
305+
if status.config.as_ref().and_then(|config| config.version)
306+
== self.specification.version
307+
{
308+
status.config.clone()
309+
} else {
310+
None
311+
}
312+
});
313+
// Create status without updating the device status
301314
let status = azure_device_registry::DeviceStatus {
302-
config: None,
315+
config: current_config,
303316
// inserts the inbound endpoint name with None if there's no error, or Some(AdrConfigError) if there is
304317
endpoints: HashMap::from([(
305318
self.device_endpoint_ref.inbound_endpoint_name.clone(),
@@ -831,16 +844,32 @@ impl DatasetClient {
831844
}
832845

833846
/// Used to report the status of a dataset
847+
/// # Panics
848+
/// if the asset status mutex has been poisoned, which should not be possible
834849
pub async fn report_status(&self, status: Result<(), AdrConfigError>) {
850+
// If the version of the current status config matches the current version, then include the existing config.
851+
// If there's no current config or the version doesn't match, don't report a status since the status for this version hasn't been reported yet
852+
let current_asset_config = self
853+
.asset_status
854+
.read()
855+
.unwrap()
856+
.as_ref()
857+
.and_then(|status| {
858+
if status.config.as_ref().and_then(|config| config.version)
859+
== self.asset_specification.version
860+
{
861+
status.config.clone()
862+
} else {
863+
None
864+
}
865+
});
866+
// Get current message schema reference, so that it isn't overwritten
867+
let current_message_schema_reference = self.message_schema_reference();
835868
let adr_asset_status = azure_device_registry::AssetStatus {
836-
// TODO: Do I need to include the version here?
837-
// config: Some(azure_device_registry::StatusConfig {
838-
// version: self.asset_specification.version,
839-
// ..azure_device_registry::StatusConfig::default()
840-
// }),
869+
config: current_asset_config,
841870
datasets: Some(vec![azure_device_registry::DatasetEventStreamStatus {
842871
name: self.dataset_ref.dataset_name.clone(),
843-
message_schema_reference: None,
872+
message_schema_reference: current_message_schema_reference,
844873
error: status.err(),
845874
}]),
846875
..azure_device_registry::AssetStatus::default()
@@ -868,6 +897,8 @@ impl DatasetClient {
868897
/// # Panics
869898
/// If the Schema Registry Service returns a schema without required values. This should get updated
870899
/// to be validated by the Schema Registry API surface in the future
900+
///
901+
/// If the asset status mutex has been poisoned, which should not be possible
871902
pub async fn report_message_schema(
872903
&self,
873904
message_schema: MessageSchema,
@@ -918,17 +949,42 @@ impl DatasetClient {
918949
.expect("schema namespace will always be present."), // waiting on change to service DTDL for this to be guaranteed in code
919950
}
920951
})?;
921-
952+
// If the version of the current status config matches the current version, then include the existing config.
953+
// If there's no current config or the version doesn't match, don't report a status since the status for this version hasn't been reported yet
954+
let current_asset_config = self
955+
.asset_status
956+
.read()
957+
.unwrap()
958+
.as_ref()
959+
.and_then(|status| {
960+
if status.config.as_ref().and_then(|config| config.version)
961+
== self.asset_specification.version
962+
{
963+
status.config.clone()
964+
} else {
965+
None
966+
}
967+
});
968+
// Get the current dataset config error, if it exists, so that it isn't overwritten
969+
let current_dataset_config_error =
970+
self.asset_status
971+
.read()
972+
.unwrap()
973+
.as_ref()
974+
.and_then(|status| {
975+
status.datasets.as_ref().and_then(|datasets| {
976+
datasets
977+
.iter()
978+
.find(|dataset| dataset.name == self.dataset_ref.dataset_name)
979+
.and_then(|dataset| dataset.error.clone())
980+
})
981+
});
922982
let adr_asset_status = azure_device_registry::AssetStatus {
923-
// TODO: Do I need to include the version here?
924-
// config: Some(azure_device_registry::StatusConfig {
925-
// version: self.asset_specification.version,
926-
// ..azure_device_registry::StatusConfig::default()
927-
// }),
983+
config: current_asset_config,
928984
datasets: Some(vec![azure_device_registry::DatasetEventStreamStatus {
929985
name: self.dataset_ref.dataset_name.clone(),
930986
message_schema_reference: Some(message_schema_reference.clone()),
931-
error: None,
987+
error: current_dataset_config_error,
932988
}]),
933989
..azure_device_registry::AssetStatus::default()
934990
};

0 commit comments

Comments
 (0)