From 19ffb4b4be42409a80f91162050b4f5a42cfbccd Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 5 Nov 2025 11:33:24 -0800 Subject: [PATCH 01/21] asset and device endpoint client --- ...EventDrivenTcpThermostatConnectorWorker.cs | 2 +- .../AssetAvailableEventArgs.cs | 5 +- .../AssetClient.cs | 76 +++++++++++++++++++ .../ConnectorWorker.cs | 40 +++------- .../DeviceAvailableEventArgs.cs | 7 +- .../DeviceEndpointClient.cs | 40 ++++++++++ .../PollingTelemetryConnectorWorker.cs | 2 +- .../Models/AssetStatus.cs | 20 +++++ .../Models/ConfigStatus.cs | 11 +-- .../Models/DeviceStatus.cs | 6 ++ 10 files changed, 168 insertions(+), 41 deletions(-) create mode 100644 dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs create mode 100644 dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 8bb24a6a7c..4770cbe142 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -82,7 +82,7 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve Array.Resize(ref buffer, bytesRead); _logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName); - await _connector.ForwardReceivedEventAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName, assetEvent, buffer, null, cancellationToken); + await args.AssetClient.ForwardReceivedEventAsync(assetEvent, buffer, null, cancellationToken); } } catch (Exception e) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index 0d84d0edc8..f50eaff858 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -41,7 +41,9 @@ public class AssetAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } - internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient) + public AssetClient AssetClient { get; } + + internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrClient, ConnectorWorker connector) { DeviceName = deviceName; Device = device; @@ -49,6 +51,7 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun AssetName = assetName; Asset = asset; LeaderElectionClient = leaderElectionClient; + AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs new file mode 100644 index 0000000000..07f13dd8ab --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +namespace Azure.Iot.Operations.Connector +{ + /// + /// A client for updating the status of an asset and for forwarding received events and/or sampled datasets. + /// + public class AssetClient + { + private readonly IAdrClientWrapper _adrClient; + private readonly ConnectorWorker _connector; + private readonly string _deviceName; + private readonly string _inboundEndpointName; + private readonly string _assetName; + + internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector) + { + _adrClient = adrClient; + _deviceName = deviceName; + _inboundEndpointName = inboundEndpointName; + _assetName = assetName; + _connector = connector; + } + + /// + /// Report the status of this asset to the Azure Device Registry service + /// + /// The status of this asset and its datasets/event groups/streams/management groups + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service + public async Task UpdateAssetStatusAsync( + AssetStatus status, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.UpdateAssetStatusAsync( + _deviceName, + _inboundEndpointName, + new UpdateAssetStatusRequest() + { + AssetName = _assetName, + AssetStatus = status, + }, + commandTimeout, + cancellationToken); + } + + /// + /// Push a sampled dataset to the configured destinations. + /// + /// The dataset that was sampled. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardSampledDatasetAsync(AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardSampledDatasetAsync(_deviceName, _inboundEndpointName, _assetName, dataset, serializedPayload, userData, cancellationToken); + } + + /// + /// Push a received event payload to the configured destinations. + /// + /// The event. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardReceivedEventAsync(AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardReceivedEventAsync(_deviceName, _inboundEndpointName, _assetName, assetEvent, serializedPayload, userData, cancellationToken); + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs index 8c618138b3..8b00cd3abb 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs @@ -245,18 +245,8 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) await _mqttClient.DisconnectAsync(null, CancellationToken.None); } - /// - /// Push a sampled dataset to the configured destinations. - /// - /// The name of the device that this dataset belongs to - /// The name of the inbound endpoint that this dataset belongs to - /// The asset that the dataset belongs to. - /// The name of the asset that the dataset belongs to - /// The dataset that was sampled. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEndpointName, Asset asset, string assetName, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + // Called by AssetClient instances + internal async Task ForwardSampledDatasetAsync(string deviceName, string inboundEndpointName, string assetName, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); @@ -273,7 +263,7 @@ public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEn } } - _logger.LogInformation($"Received sampled payload from dataset with name {dataset.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker: {Encoding.UTF8.GetString(serializedPayload)}"); + _logger.LogInformation($"Received sampled payload from dataset with name {dataset.Name} in asset with name {assetName}. Now publishing it to MQTT broker: {Encoding.UTF8.GetString(serializedPayload)}"); if (dataset.Destinations == null) { @@ -287,7 +277,7 @@ public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEn { var topic = destination.Configuration.Topic ?? throw new AssetConfigurationException( - $"Dataset with name {dataset.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); + $"Dataset with name {dataset.Name} in asset with name {assetName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); var messageMetadata = new OutgoingTelemetryMetadata { @@ -350,22 +340,12 @@ await telemetrySender.SendTelemetryAsync( } } - /// - /// Push a received event payload to the configured destinations. - /// - /// The name of the device that this event belongs to - /// The name of the inbound endpoint that this event belongs to - /// The asset that this event came from. - /// The name of the asset that this event belongs to. - /// The event. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, Asset asset, string assetName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + // Called by AssetClient instances + internal async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, string assetName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); - _logger.LogInformation($"Received event with name {assetEvent.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker."); + _logger.LogInformation($"Received event with name {assetEvent.Name} in asset with name {assetName}. Now publishing it to MQTT broker."); if (assetEvent.Destinations == null) { @@ -392,7 +372,7 @@ public async Task ForwardReceivedEventAsync(string deviceName, string inboundEnd { string topic = destination.Configuration.Topic ?? throw new AssetConfigurationException( - $"Dataset with name {assetEvent.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); + $"Dataset with name {assetEvent.Name} in asset with name {assetName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); var messageMetadata = new OutgoingTelemetryMetadata { @@ -509,7 +489,7 @@ private async void OnDeviceChanged(object? _, DeviceChangedEventArgs args) { try { - await WhileDeviceIsAvailable.Invoke(new(args.Device, args.InboundEndpointName, _leaderElectionClient), deviceTaskCancellationTokenSource.Token); + await WhileDeviceIsAvailable.Invoke(new(args.DeviceName, args.Device, args.InboundEndpointName, _leaderElectionClient, _adrClient!), deviceTaskCancellationTokenSource.Token); } catch (OperationCanceledException) { @@ -698,7 +678,7 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { try { - await WhileAssetIsAvailable.Invoke(new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient), assetTaskCancellationTokenSource.Token); + await WhileAssetIsAvailable.Invoke(new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient, _adrClient!, this), assetTaskCancellationTokenSource.Token); } catch (OperationCanceledException) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs index e228824741..439d165d44 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs @@ -8,6 +8,7 @@ namespace Azure.Iot.Operations.Connector { public class DeviceAvailableEventArgs : EventArgs { + public string DeviceName { get; } public Device Device { get; } public string InboundEndpointName { get; } @@ -26,11 +27,15 @@ public class DeviceAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } - internal DeviceAvailableEventArgs(Device device, string inboundEndpointName, ILeaderElectionClient? leaderElectionClient) + public DeviceEndpointClient DeviceEndpointClient { get; } + + internal DeviceAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrclient) { + DeviceName = deviceName; Device = device; InboundEndpointName = inboundEndpointName; LeaderElectionClient = leaderElectionClient; + DeviceEndpointClient = new(adrclient, deviceName, inboundEndpointName); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs new file mode 100644 index 0000000000..84ed78edf8 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +namespace Azure.Iot.Operations.Connector +{ + public class DeviceEndpointClient + { + private readonly IAdrClientWrapper _adrClient; + private readonly string _deviceName; + private readonly string _inboundEndpointName; + + internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName) + { + _adrClient = adrClient; + _deviceName = deviceName; + _inboundEndpointName = inboundEndpointName; + } + + //TODO must include endpoint status always, correct? + public async Task UpdateDeviceStatusAsync( + DeviceStatus status, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.UpdateDeviceStatusAsync( + _deviceName, + _inboundEndpointName, + status, + commandTimeout, + cancellationToken); + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 3d383296d4..2a5aebf960 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -51,7 +51,7 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell try { byte[] sampledData = await datasetSampler.SampleDatasetAsync(dataset); - await ForwardSampledDatasetAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName, dataset, sampledData); + await args.AssetClient.ForwardSampledDatasetAsync(dataset, sampledData); } catch (Exception e) { diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs index 03f9234976..d376ba05f3 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs @@ -5,13 +5,33 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; public record AssetStatus { + /// + /// The status of the asset + /// + /// + /// This status is independent from the status of any nested event groups/datasets/streams. That means that, + /// even if a dataset has a config error, the asset status may still be okay. + /// public ConfigStatus? Config { get; set; } + /// + /// The status of all datasets associated with this asset (if it has any datasets). + /// public List? Datasets { get; set; } + /// + /// The status of all event groups associated with this asset (if it has any event groups). + /// public List? EventGroups { get; set; } = default; + /// + /// The status of all management groups associated with this asset (if it has any management groups). + /// + public List? ManagementGroups { get; set; } + /// + /// The status of all streams associated with this asset (if it has any streams). + /// public List? Streams { get; set; } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs index 625381bd29..1ef14c2ce8 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs @@ -7,24 +7,21 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models public partial class ConfigStatus { /// - /// The 'error' Field. + /// The error that this entity encountered, if any error was encountered. /// - [JsonPropertyName("error")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + /// + /// If a device/endpoint/asset/dataset/etc has no errors to report, this value should be null. + /// public ConfigError? Error { get; set; } = default; /// /// A read only timestamp indicating the last time the configuration has been modified from the perspective of the current actual (Edge) state of the CRD. Edge would be the only writer of this value and would sync back up to the cloud. /// - [JsonPropertyName("lastTransitionTime")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public DateTime? LastTransitionTime { get; set; } = default; /// /// A read only incremental counter indicating the number of times the configuration has been modified from the perspective of the current actual (Edge) state of the CRD. Edge would be the only writer of this value and would sync back up to the cloud. In steady state, this should equal version. /// - [JsonPropertyName("version")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public ulong? Version { get; set; } = default; } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs index a4f689ea81..244461c34b 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs @@ -5,7 +5,13 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; public record DeviceStatus { + /// + /// The status of the device + /// public ConfigStatus? Config { get; set; } + /// + /// The statuses of each of the endpoints that belong to the device + /// public DeviceStatusEndpoint? Endpoints { get; set; } } From 11859eebbd6874cd0a2016c1a0cf151d1a47bbe8 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 6 Nov 2025 11:40:49 -0800 Subject: [PATCH 02/21] report okay status --- ...EventDrivenTcpThermostatConnectorWorker.cs | 19 ++++-- .../AssetAvailableEventArgs.cs | 2 +- .../AssetClient.cs | 61 ++++++++++++++++++- .../ConnectorWorker.cs | 43 +++++++++++-- .../DeviceEndpointClient.cs | 26 ++++++-- .../Models/ConfigStatus.cs | 9 +++ 6 files changed, 141 insertions(+), 19 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 4770cbe142..28ca280b94 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -18,10 +18,17 @@ public EventDrivenTcpThermostatConnectorWorker(ApplicationContext applicationCon _logger = logger; _connector = new(applicationContext, connectorLogger, mqttClient, datasetSamplerFactory, adrClientFactory, leaderElectionConfigurationProvider) { - WhileAssetIsAvailable = WhileAssetAvailableAsync + WhileAssetIsAvailable = WhileAssetAvailableAsync, + WhileDeviceIsAvailable = WhileDeviceAvailableAsync, }; } + private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) + { + DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + } + private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) { _logger.LogInformation("Asset with name {0} is now sampleable", args.AssetName); @@ -50,10 +57,10 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel return; } - await OpenTcpConnectionAsync(args, assetEvent, port, cancellationToken); + await OpenTcpConnectionAsync(args, args.Asset.EventGroups.First().Name, assetEvent, port, cancellationToken); } - private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEvent assetEvent, int port, CancellationToken cancellationToken) + private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string eventGroupName, AssetEvent assetEvent, int port, CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { @@ -82,7 +89,11 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve Array.Resize(ref buffer, bytesRead); _logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName); - await args.AssetClient.ForwardReceivedEventAsync(assetEvent, buffer, null, cancellationToken); + await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken); + + // Report status of the asset once the first event has been received and forwarded + AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } } catch (Exception e) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index f50eaff858..b855088f39 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -51,7 +51,7 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun AssetName = assetName; Asset = asset; LeaderElectionClient = leaderElectionClient; - AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector); + AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, asset, connector); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index 07f13dd8ab..80f8e47fa4 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -15,13 +15,15 @@ public class AssetClient private readonly string _deviceName; private readonly string _inboundEndpointName; private readonly string _assetName; + private readonly Asset _asset; - internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector) + internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, Asset asset, ConnectorWorker connector) { _adrClient = adrClient; _deviceName = deviceName; _inboundEndpointName = inboundEndpointName; _assetName = assetName; + _asset = asset; _connector = connector; } @@ -49,6 +51,58 @@ public async Task UpdateAssetStatusAsync( cancellationToken); } + public AssetStatus BuildOkayStatus() + { + AssetStatus status = new() + { + Config = ConfigStatus.Okay(), + }; + + if (_asset.Datasets != null) + { + status.Datasets = new List(); + foreach (var dataset in _asset.Datasets) + { + status.Datasets.Add(new AssetDatasetEventStreamStatus() + { + Name = dataset.Name, + Error = null, + MessageSchemaReference = _connector.GetRegisteredDatasetMessageSchema(_deviceName, _inboundEndpointName, _assetName, dataset.Name) + }); + } + } + + if (_asset.EventGroups != null) + { + status.EventGroups = new List(); + foreach (var eventGroup in _asset.EventGroups) + { + var eventGroupStatus = new AssetEventGroupStatus() + { + Name = eventGroup.Name, + }; + + if (eventGroup.Events != null) + { + eventGroupStatus.Events = new List(); + foreach (var assetEvent in eventGroup.Events) + { + eventGroupStatus.Events.Add(new AssetDatasetEventStreamStatus() + { + Name = assetEvent.Name, + Error = null, + MessageSchemaReference = _connector.GetRegisteredEventMessageSchema(_deviceName, _inboundEndpointName, _assetName, eventGroup.Name, assetEvent.Name) + }); + } + } + + status.EventGroups.Add(eventGroupStatus); + } + } + + return status; + } + /// /// Push a sampled dataset to the configured destinations. /// @@ -64,13 +118,14 @@ public async Task ForwardSampledDatasetAsync(AssetDataset dataset, byte[] serial /// /// Push a received event payload to the configured destinations. /// + /// The name of the event group that this event belongs to. /// The event. /// The payload to push to the configured destinations. /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. /// Cancellation token. - public async Task ForwardReceivedEventAsync(AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + public async Task ForwardReceivedEventAsync(string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { - await _connector.ForwardReceivedEventAsync(_deviceName, _inboundEndpointName, _assetName, assetEvent, serializedPayload, userData, cancellationToken); + await _connector.ForwardReceivedEventAsync(_deviceName, _inboundEndpointName, _assetName, eventGroupName, assetEvent, serializedPayload, userData, cancellationToken); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs index 8b00cd3abb..37c69cc83e 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs @@ -46,7 +46,8 @@ public class ConnectorWorker : ConnectorBackgroundService // keys are "{composite device name}_{asset name}_{dataset name}. The value is the message schema registered for that device's asset's dataset private readonly ConcurrentDictionary _registeredDatasetMessageSchemas = new(); - // keys are "{composite device name}_{asset name}_{event name}. The value is the message schema registered for that device's asset's event + //TODO event group name relevant here? + // keys are "{composite device name}_{asset name}_{event group name}_{event name}. The value is the message schema registered for that device's asset's event private readonly ConcurrentDictionary _registeredEventMessageSchemas = new(); /// @@ -245,6 +246,36 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) await _mqttClient.DisconnectAsync(null, CancellationToken.None); } + public MessageSchemaReference? GetRegisteredDatasetMessageSchema(string deviceName, string inboundEndpointName, string assetName, string datasetName) + { + if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{datasetName}", out Schema? schema)) + { + return new MessageSchemaReference() + { + SchemaName = schema.Name, + SchemaRegistryNamespace = schema.Namespace, + SchemaVersion = schema.Version, + }; + } + + return null; + } + + public MessageSchemaReference? GetRegisteredEventMessageSchema(string deviceName, string inboundEndpointName, string assetName, string eventGroupName, string eventName) + { + if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{eventName}", out Schema? schema)) + { + return new MessageSchemaReference() + { + SchemaName = schema.Name, + SchemaRegistryNamespace = schema.Namespace, + SchemaVersion = schema.Version, + }; + } + + return null; + } + // Called by AssetClient instances internal async Task ForwardSampledDatasetAsync(string deviceName, string inboundEndpointName, string assetName, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { @@ -341,11 +372,11 @@ await telemetrySender.SendTelemetryAsync( } // Called by AssetClient instances - internal async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, string assetName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + internal async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, string assetName, string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); - _logger.LogInformation($"Received event with name {assetEvent.Name} in asset with name {assetName}. Now publishing it to MQTT broker."); + _logger.LogInformation($"Received event with name {assetEvent.Name} in event group with name {eventGroupName} in asset with name {assetName}. Now publishing it to MQTT broker."); if (assetEvent.Destinations == null) { @@ -354,7 +385,7 @@ internal async Task ForwardReceivedEventAsync(string deviceName, string inboundE } CloudEvent? cloudEvent = null; - if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEvent.Name}", out var registeredEventMessageSchema)) + if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}", out var registeredEventMessageSchema)) { if (Uri.IsWellFormedUriString(inboundEndpointName, UriKind.RelativeOrAbsolute)) { @@ -643,7 +674,7 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { try { - _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} in event group with name {assetEventGroup.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); var registeredEventSchema = await schemaRegistryClient.PutAsync( eventMessageSchema.SchemaContent, @@ -654,7 +685,7 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint _logger.LogInformation($"Registered message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); - _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEvent.Name}", registeredEventSchema); + _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEventGroup.Name}_{assetEvent.Name}", registeredEventSchema); } catch (Exception ex) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index 84ed78edf8..a87d1c9790 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -1,11 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; namespace Azure.Iot.Operations.Connector @@ -36,5 +31,26 @@ public async Task UpdateDeviceStatusAsync( commandTimeout, cancellationToken); } + + public DeviceStatus BuildOkayStatus() + { + return new() + { + Config = ConfigStatus.Okay(), + Endpoints = new() //TODO multiple endpoints support? Do we need to report a complete status, or just a patch? + { + Inbound = new() + { + { + _inboundEndpointName, + new DeviceStatusInboundEndpointSchemaMapValue() + { + Error = null + } + } + } + } + }; + } } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs index 1ef14c2ce8..28558de31c 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs @@ -24,5 +24,14 @@ public partial class ConfigStatus /// public ulong? Version { get; set; } = default; + public static ConfigStatus Okay() + { + return new() + { + Error = null, + LastTransitionTime = DateTime.UtcNow, + Version = null, //TODO do we need to report this? + }; + } } } From 8c5e62f2db0cda182e41c486dc16afa350d3dbe9 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 15:00:07 -0800 Subject: [PATCH 03/21] negative status --- ...EventDrivenTcpThermostatConnectorWorker.cs | 31 ++++++++++++++++--- .../AssetAvailableEventArgs.cs | 3 ++ .../Models/DeviceStatus.cs | 7 +++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 28ca280b94..a415fd12e2 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -13,6 +13,8 @@ public class EventDrivenTcpThermostatConnectorWorker : BackgroundService, IDispo private readonly ILogger _logger; private readonly ConnectorWorker _connector; + private const string InboundEndpointName = "my_tcp_endpoint"; + public EventDrivenTcpThermostatConnectorWorker(ApplicationContext applicationContext, ILogger logger, ILogger connectorLogger, IMqttClient mqttClient, IMessageSchemaProvider datasetSamplerFactory, IAdrClientWrapperProvider adrClientFactory, IConnectorLeaderElectionConfigurationProvider leaderElectionConfigurationProvider) { _logger = logger; @@ -36,14 +38,18 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (args.Asset.EventGroups == null || args.Asset.EventGroups.Count != 1) { - _logger.LogError("Asset with name {0} does not have the expected event group", args.AssetName); + _logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName); + AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); return; } var eventGroup = args.Asset.EventGroups.First(); if (eventGroup.Events == null || eventGroup.Events.Count != 1) { - _logger.LogError("Asset with name {0} does not have the expected event within its event group", args.AssetName); + _logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName); + AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); return; } @@ -53,7 +59,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (assetEvent.DataSource == null || !int.TryParse(assetEvent.DataSource, out int port)) { // If the asset's has no event doesn't specify a port, then do nothing - _logger.LogInformation("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); + _logger.LogError("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); + AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + assetStatus.EventGroups!.First().Events!.First().Error = new ConfigError() + { + Message = "The configured event was either missing the expected port or had a non-integer value for the port", + }; + + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); return; } @@ -74,7 +87,7 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e return; } - string host = args.Device.Endpoints.Inbound["my_tcp_endpoint"].Address.Split(":")[0]; + string host = args.Device.Endpoints.Inbound[InboundEndpointName].Address.Split(":")[0]; _logger.LogInformation("Attempting to open TCP client with address {0} and port {1}", host, port); using TcpClient client = new(); await client.ConnectAsync(host, port, cancellationToken); @@ -106,6 +119,16 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e { _logger.LogError(e, "Failed to open TCP connection to asset"); } + + DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + deviceStatus.SetEndpointError( + InboundEndpointName, + new ConfigError() + { + Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." + }); + + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index b855088f39..aad85c1cec 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -43,6 +43,8 @@ public class AssetAvailableEventArgs : EventArgs public AssetClient AssetClient { get; } + public DeviceEndpointClient DeviceEndpointClient { get; } + internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrClient, ConnectorWorker connector) { DeviceName = deviceName; @@ -52,6 +54,7 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun Asset = asset; LeaderElectionClient = leaderElectionClient; AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, asset, connector); + DeviceEndpointClient = new(adrClient, deviceName, inboundEndpointName); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs index 244461c34b..d95368d1f5 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs @@ -14,4 +14,11 @@ public record DeviceStatus /// The statuses of each of the endpoints that belong to the device /// public DeviceStatusEndpoint? Endpoints { get; set; } + + public void SetEndpointError(string inboundEndpointName, ConfigError endpointError) + { + Endpoints ??= new(); + Endpoints.Inbound ??= new(); + Endpoints.Inbound[inboundEndpointName].Error = endpointError; + } } From 019f7c86e250540459312cd3cd5d55f6d46e97a4 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 15:14:30 -0800 Subject: [PATCH 04/21] it was --- dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs index 37c69cc83e..d947b7dff8 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs @@ -46,7 +46,6 @@ public class ConnectorWorker : ConnectorBackgroundService // keys are "{composite device name}_{asset name}_{dataset name}. The value is the message schema registered for that device's asset's dataset private readonly ConcurrentDictionary _registeredDatasetMessageSchemas = new(); - //TODO event group name relevant here? // keys are "{composite device name}_{asset name}_{event group name}_{event name}. The value is the message schema registered for that device's asset's event private readonly ConcurrentDictionary _registeredEventMessageSchemas = new(); From 5fe4651ace2ab90686d26950b22cc94b91d01791 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 15:43:36 -0800 Subject: [PATCH 05/21] documentation, clarifications --- .../AssetClient.cs | 16 +++++++++-- .../DeviceEndpointClient.cs | 26 +++++++++++++++-- .../IAdrClientWrapper.cs | 28 +++++++++++++------ 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index 80f8e47fa4..3743419b19 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -28,12 +28,16 @@ internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inbo } /// - /// Report the status of this asset to the Azure Device Registry service + /// Update the status of this asset in the Azure Device Registry service /// /// The status of this asset and its datasets/event groups/streams/management groups /// The timeout for this RPC command invocation. /// The cancellation token. /// The status returned by the Azure Device Registry service + /// + /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure + /// Device Registry service with what is provided. + /// public async Task UpdateAssetStatusAsync( AssetStatus status, TimeSpan? commandTimeout = null, @@ -51,7 +55,15 @@ public async Task UpdateAssetStatusAsync( cancellationToken); } - public AssetStatus BuildOkayStatus() + /// + /// Build an instance of where this asset and all of its datasets/events/streams/management groups + /// have no errors. + /// + /// + /// An instance of where this asset and all of its datasets/events/streams/management groups + /// have no errors. + /// + public AssetStatus BuildOkayStatus() //TODO want some cached version of the last known status to avoid 'put' calls overwriting? { AssetStatus status = new() { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index a87d1c9790..7e884e8441 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -5,6 +5,9 @@ namespace Azure.Iot.Operations.Connector { + /// + /// A client for reporting the status of this device and its endpoint + /// public class DeviceEndpointClient { private readonly IAdrClientWrapper _adrClient; @@ -18,9 +21,22 @@ internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, st _inboundEndpointName = inboundEndpointName; } - //TODO must include endpoint status always, correct? + /// + /// Update the status of a specific device in the Azure Device Registry service + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// The new status of the device. + /// Optional timeout for the command. + /// Optional cancellation token. + /// The status returned by the Azure Device Registry service + /// + /// This update call will act as a 'patch' for all endpoint-level statuses, but will act as a 'put' for the device-level status. + /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to + /// report without needing to include the existing status of previously reported endpoints. + /// public async Task UpdateDeviceStatusAsync( - DeviceStatus status, + DeviceStatus status, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { @@ -32,12 +48,16 @@ public async Task UpdateDeviceStatusAsync( cancellationToken); } + /// + /// Build an instance of where this device and this endpoint have no errors. + /// + /// An instance of where this device and this endpoint have no errors. public DeviceStatus BuildOkayStatus() { return new() { Config = ConfigStatus.Okay(), - Endpoints = new() //TODO multiple endpoints support? Do we need to report a complete status, or just a patch? + Endpoints = new() { Inbound = new() { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs index 5a4755bb07..66dd6c2a0f 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs @@ -94,15 +94,20 @@ public interface IAdrClientWrapper : IAsyncDisposable /// The names of all available devices IEnumerable GetDeviceNames(); + /// - /// Updates the status of a specific asset. + /// Update the status of a specific asset in the Azure Device Registry service /// - /// The name of the device. - /// The name of the inbound endpoint. - /// The request containing asset status update parameters. - /// Optional timeout for the command. - /// Optional cancellation token. - /// A task that represents the asynchronous operation, containing the updated asset details. + /// The name of the device the asset belongs to. + /// The name of the inbound endpoint the asset belongs to. + /// The status of this asset and its datasets/event groups/streams/management groups. + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service. + /// + /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure + /// Device Registry service with what is provided. + /// Task UpdateAssetStatusAsync( string deviceName, string inboundEndpointName, @@ -111,14 +116,19 @@ Task UpdateAssetStatusAsync( CancellationToken cancellationToken = default); /// - /// Updates the status of a specific device. + /// Update the status of a specific device in the Azure Device Registry service /// /// The name of the device. /// The name of the inbound endpoint. /// The new status of the device. /// Optional timeout for the command. /// Optional cancellation token. - /// A task that represents the asynchronous operation, containing the updated device details. + /// The status returned by the Azure Device Registry service + /// + /// This update call will act as a patch for all endpoint level statuses, but will act as a put for the device-level status. + /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to + /// report without needing to include the existing status of previously reported endpoints. + /// Task UpdateDeviceStatusAsync( string deviceName, string inboundEndpointName, From e7bec22a39107446c1558b2e5ee41f1d4b5a4aa3 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 15:48:52 -0800 Subject: [PATCH 06/21] documentation --- .../AssetAvailableEventArgs.cs | 15 +++++++++++++++ .../DeviceAvailableEventArgs.cs | 13 +++++++++++++ 2 files changed, 28 insertions(+) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index aad85c1cec..488f733d47 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -11,10 +11,19 @@ namespace Azure.Iot.Operations.Connector /// public class AssetAvailableEventArgs : EventArgs { + /// + /// The name of the device that this asset belongs to. + /// public string DeviceName { get; } + /// + /// The device that this asset belongs to. + /// public Device Device { get; } + /// + /// The name of the endpoint that this asset belongs to. + /// public string InboundEndpointName { get; } /// @@ -41,8 +50,14 @@ public class AssetAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } + /// + /// The client to use to send status updates for assets on and to use to forward sampled datasets/received events with. + /// public AssetClient AssetClient { get; } + /// + /// The client to use to send status updates for this asset's device on. + /// public DeviceEndpointClient DeviceEndpointClient { get; } internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrClient, ConnectorWorker connector) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs index 439d165d44..b7db776e57 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs @@ -8,9 +8,19 @@ namespace Azure.Iot.Operations.Connector { public class DeviceAvailableEventArgs : EventArgs { + /// + /// The name of this device. + /// public string DeviceName { get; } + + /// + /// This device. + /// public Device Device { get; } + /// + /// The name of the endpoint that became available on this device. + /// public string InboundEndpointName { get; } /// @@ -27,6 +37,9 @@ public class DeviceAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } + /// + /// The client to use to send status updates for this device with. + /// public DeviceEndpointClient DeviceEndpointClient { get; } internal DeviceAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrclient) From 2a531cc439e98b8f3883b8f019c619354b684ee5 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 15:58:30 -0800 Subject: [PATCH 07/21] logs --- .../EventDrivenTcpThermostatConnectorWorker.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index a415fd12e2..7222af2a6a 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -26,7 +26,9 @@ public EventDrivenTcpThermostatConnectorWorker(ApplicationContext applicationCon } private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) - { + { + _logger.LogInformation("Device with name {0} is now available", args.DeviceName); + DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } From f306675fa3279e3a1b008fedddc2ec6db04a6f6c Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Fri, 7 Nov 2025 16:01:39 -0800 Subject: [PATCH 08/21] ? --- .../PollingTelemetryConnectorWorker.cs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 2a5aebf960..08f9c36328 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -55,6 +55,18 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell } catch (Exception e) { + DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + deviceStatus.Config = new ConfigStatus() + { + Error = new ConfigError() + { + Message = $"Unable to sample the device. Error message: {e.Message}", + } + }; + + //TODO not really specific enough. Move all this class to user code so they can be more specific? + // Or expect advanced users to just copy this class anyways? + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); _logger.LogError(e, "Failed to sample the dataset"); } }, null, TimeSpan.FromSeconds(0), samplingInterval); From 79c551a4af6b5b9bb9c5486ace23925ff0a9fde3 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 10:44:52 -0800 Subject: [PATCH 09/21] wrap --- ...EventDrivenTcpThermostatConnectorWorker.cs | 54 ++++++++++++++++--- .../PollingTelemetryConnectorWorker.cs | 38 +++++++++++-- 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 7222af2a6a..efb4e60be7 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -30,7 +30,14 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc _logger.LogInformation("Device with name {0} is now available", args.DeviceName); DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + try + { + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) @@ -42,7 +49,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel { _logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName); AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + try + { + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } return; } @@ -51,7 +65,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel { _logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName); AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + try + { + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } return; } @@ -68,7 +89,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel Message = "The configured event was either missing the expected port or had a non-integer value for the port", }; - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + try + { + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } return; } @@ -108,7 +136,14 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e // Report status of the asset once the first event has been received and forwarded AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + try + { + await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } } catch (Exception e) @@ -130,7 +165,14 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." }); - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + try + { + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 08f9c36328..60ba1cd086 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -15,10 +15,25 @@ public class PollingTelemetryConnectorWorker : ConnectorWorker public PollingTelemetryConnectorWorker(ApplicationContext applicationContext, ILogger logger, IMqttClient mqttClient, IDatasetSamplerFactory datasetSamplerFactory, IMessageSchemaProvider messageSchemaFactory, IAdrClientWrapperProvider adrClientFactory, IConnectorLeaderElectionConfigurationProvider? leaderElectionConfigurationProvider = null) : base(applicationContext, logger, mqttClient, messageSchemaFactory, adrClientFactory, leaderElectionConfigurationProvider) { + base.WhileDeviceIsAvailable = WhileDeviceAvailableAsync; base.WhileAssetIsAvailable = WhileAssetAvailableAsync; _datasetSamplerFactory = datasetSamplerFactory; } + public async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) + { + DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + try + { + // Report device status is okay + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } + } + public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -52,6 +67,17 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell { byte[] sampledData = await datasetSampler.SampleDatasetAsync(dataset); await args.AssetClient.ForwardSampledDatasetAsync(dataset, sampledData); + + AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + try + { + // The dataset was sampled as expected, so report the asset status as okay + await args.AssetClient.UpdateAssetStatusAsync(assetStatus); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report asset status to Azure Device Registry service"); + } } catch (Exception e) { @@ -64,10 +90,16 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell } }; - //TODO not really specific enough. Move all this class to user code so they can be more specific? - // Or expect advanced users to just copy this class anyways? - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); _logger.LogError(e, "Failed to sample the dataset"); + + try + { + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } }, null, TimeSpan.FromSeconds(0), samplingInterval); From 13118d441d962076c90db7c2d258946756b8b011 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 10:57:27 -0800 Subject: [PATCH 10/21] logs --- .../EventDrivenTcpThermostatConnectorWorker.cs | 2 ++ .../PollingTelemetryConnectorWorker.cs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index efb4e60be7..20173d1dc5 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -32,6 +32,7 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); try { + _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } catch (Exception e2) @@ -91,6 +92,7 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel try { + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } catch (Exception e2) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 60ba1cd086..21d72aee26 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -26,6 +26,7 @@ public async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Cance try { // Report device status is okay + _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); } catch (Exception e) @@ -72,6 +73,7 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell try { // The dataset was sampled as expected, so report the asset status as okay + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); await args.AssetClient.UpdateAssetStatusAsync(assetStatus); } catch (Exception e2) From 2eaffb4e6e256efe398eaad65a9e817490d057c2 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 11:09:58 -0800 Subject: [PATCH 11/21] documentation --- .../AssetAndDeviceRegistry/Models/AssetStatus.cs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs index d376ba05f3..2c53b9d332 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs @@ -17,21 +17,32 @@ public record AssetStatus /// /// The status of all datasets associated with this asset (if it has any datasets). /// + /// + /// Each dataset should only report its latest status. + /// public List? Datasets { get; set; } /// /// The status of all event groups associated with this asset (if it has any event groups). /// + /// + /// Each event group should only report its latest status. + /// public List? EventGroups { get; set; } = default; /// /// The status of all management groups associated with this asset (if it has any management groups). /// - + /// + /// Each management group should only report its latest status. + /// public List? ManagementGroups { get; set; } /// /// The status of all streams associated with this asset (if it has any streams). /// + /// + /// Each stream should only report its latest status. + /// public List? Streams { get; set; } } From 670f9fddaf4b91214f4dff57135aeb5207ef92a5 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 11:43:01 -0800 Subject: [PATCH 12/21] get device/asset status APIs --- .../AdrClientWrapper.cs | 12 ++++++++ .../IAdrClientWrapper.cs | 29 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs b/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs index 4f9817f6cb..8305a9b959 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs @@ -171,6 +171,18 @@ public Task CreateOrUpdateDiscove return _client.CreateOrUpdateDiscoveredDeviceAsync(request, inboundEndpointType, commandTimeout, cancellationToken); } + /// + public Task GetDeviceStatusAsync(string deviceName, string inboundEndpointName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return _client.GetDeviceStatusAsync(deviceName, inboundEndpointName, commandTimeout, cancellationToken); + } + + /// + public Task GetAssetStatusAsync(string deviceName, string inboundEndpointName, string assetName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return _client.GetAssetStatusAsync(deviceName, inboundEndpointName, assetName, commandTimeout, cancellationToken); + } + /// public ValueTask DisposeAsync() { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs index 66dd6c2a0f..592660ecfa 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs @@ -164,5 +164,34 @@ Task CreateOrUpdateDiscoveredDevi string inboundEndpointType, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default); + + /// + /// Retrieves the status of a specific device. + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// Optional timeout for the command. + /// Optional cancellation token. + /// A task that represents the asynchronous operation, containing the device status. + Task GetDeviceStatusAsync( + string deviceName, + string inboundEndpointName, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default); + + /// + /// Retrieves the status of a specific asset. + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// The name of the asset. + /// Optional timeout for the command. + /// Optional cancellation token. + /// A task that represents the asynchronous operation, containing the asset status. + Task GetAssetStatusAsync(string deviceName, + string inboundEndpointName, + string assetName, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default); } } From 1b47f63a1292ca26e3c2d5dd40adeac21bb06b21 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 11:53:52 -0800 Subject: [PATCH 13/21] Just get status before making any changes --- ...EventDrivenTcpThermostatConnectorWorker.cs | 12 +-- .../AssetAvailableEventArgs.cs | 2 +- .../AssetClient.cs | 75 ++++--------------- .../DeviceEndpointClient.cs | 37 +++------ .../PollingTelemetryConnectorWorker.cs | 6 +- .../MockAdrClientWrapper.cs | 10 +++ 6 files changed, 45 insertions(+), 97 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 20173d1dc5..dd004d26cd 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -29,7 +29,7 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc { _logger.LogInformation("Device with name {0} is now available", args.DeviceName); - DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); try { _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); @@ -49,7 +49,7 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (args.Asset.EventGroups == null || args.Asset.EventGroups.Count != 1) { _logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName); - AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); try { await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); @@ -65,7 +65,7 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (eventGroup.Events == null || eventGroup.Events.Count != 1) { _logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName); - AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); try { await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); @@ -84,7 +84,7 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel { // If the asset's has no event doesn't specify a port, then do nothing _logger.LogError("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); - AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); assetStatus.EventGroups!.First().Events!.First().Error = new ConfigError() { Message = "The configured event was either missing the expected port or had a non-integer value for the port", @@ -137,7 +137,7 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken); // Report status of the asset once the first event has been received and forwarded - AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); try { await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); @@ -159,7 +159,7 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e _logger.LogError(e, "Failed to open TCP connection to asset"); } - DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); deviceStatus.SetEndpointError( InboundEndpointName, new ConfigError() diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index 488f733d47..4df9258863 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -68,7 +68,7 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun AssetName = assetName; Asset = asset; LeaderElectionClient = leaderElectionClient; - AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, asset, connector); + AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector); DeviceEndpointClient = new(adrClient, deviceName, inboundEndpointName); } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index 3743419b19..f87ad61d05 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -15,15 +15,13 @@ public class AssetClient private readonly string _deviceName; private readonly string _inboundEndpointName; private readonly string _assetName; - private readonly Asset _asset; - internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, Asset asset, ConnectorWorker connector) + internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector) { _adrClient = adrClient; _deviceName = deviceName; _inboundEndpointName = inboundEndpointName; _assetName = assetName; - _asset = asset; _connector = connector; } @@ -38,12 +36,13 @@ internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inbo /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure /// Device Registry service with what is provided. /// - public async Task UpdateAssetStatusAsync( + public Task UpdateAssetStatusAsync( AssetStatus status, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return await _adrClient.UpdateAssetStatusAsync( + //TODO update lastUpdatetimeUtc + return _adrClient.UpdateAssetStatusAsync( _deviceName, _inboundEndpointName, new UpdateAssetStatusRequest() @@ -55,64 +54,16 @@ public async Task UpdateAssetStatusAsync( cancellationToken); } - /// - /// Build an instance of where this asset and all of its datasets/events/streams/management groups - /// have no errors. - /// - /// - /// An instance of where this asset and all of its datasets/events/streams/management groups - /// have no errors. - /// - public AssetStatus BuildOkayStatus() //TODO want some cached version of the last known status to avoid 'put' calls overwriting? + public Task GetAssetStatusAsync( + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) { - AssetStatus status = new() - { - Config = ConfigStatus.Okay(), - }; - - if (_asset.Datasets != null) - { - status.Datasets = new List(); - foreach (var dataset in _asset.Datasets) - { - status.Datasets.Add(new AssetDatasetEventStreamStatus() - { - Name = dataset.Name, - Error = null, - MessageSchemaReference = _connector.GetRegisteredDatasetMessageSchema(_deviceName, _inboundEndpointName, _assetName, dataset.Name) - }); - } - } - - if (_asset.EventGroups != null) - { - status.EventGroups = new List(); - foreach (var eventGroup in _asset.EventGroups) - { - var eventGroupStatus = new AssetEventGroupStatus() - { - Name = eventGroup.Name, - }; - - if (eventGroup.Events != null) - { - eventGroupStatus.Events = new List(); - foreach (var assetEvent in eventGroup.Events) - { - eventGroupStatus.Events.Add(new AssetDatasetEventStreamStatus() - { - Name = assetEvent.Name, - Error = null, - MessageSchemaReference = _connector.GetRegisteredEventMessageSchema(_deviceName, _inboundEndpointName, _assetName, eventGroup.Name, assetEvent.Name) - }); - } - } - - status.EventGroups.Add(eventGroupStatus); - } - } - - return status; + return _adrClient.GetAssetStatusAsync( + _deviceName, + _inboundEndpointName, + _assetName, + commandTimeout, + cancellationToken); } /// diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index 7e884e8441..29e56aa504 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -35,12 +35,13 @@ internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, st /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to /// report without needing to include the existing status of previously reported endpoints. /// - public async Task UpdateDeviceStatusAsync( - DeviceStatus status, + public Task UpdateDeviceStatusAsync( + DeviceStatus status, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return await _adrClient.UpdateDeviceStatusAsync( + //TODO update lastUpdatetimeUtc + return _adrClient.UpdateDeviceStatusAsync( _deviceName, _inboundEndpointName, status, @@ -48,29 +49,15 @@ public async Task UpdateDeviceStatusAsync( cancellationToken); } - /// - /// Build an instance of where this device and this endpoint have no errors. - /// - /// An instance of where this device and this endpoint have no errors. - public DeviceStatus BuildOkayStatus() + public Task GetDeviceStatusAsync( + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) { - return new() - { - Config = ConfigStatus.Okay(), - Endpoints = new() - { - Inbound = new() - { - { - _inboundEndpointName, - new DeviceStatusInboundEndpointSchemaMapValue() - { - Error = null - } - } - } - } - }; + return _adrClient.GetDeviceStatusAsync( + _deviceName, + _inboundEndpointName, + commandTimeout, + cancellationToken); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 21d72aee26..ded5c26647 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -22,7 +22,7 @@ public PollingTelemetryConnectorWorker(ApplicationContext applicationContext, IL public async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) { - DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); try { // Report device status is okay @@ -69,7 +69,7 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell byte[] sampledData = await datasetSampler.SampleDatasetAsync(dataset); await args.AssetClient.ForwardSampledDatasetAsync(dataset, sampledData); - AssetStatus assetStatus = args.AssetClient.BuildOkayStatus(); + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); try { // The dataset was sampled as expected, so report the asset status as okay @@ -83,7 +83,7 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell } catch (Exception e) { - DeviceStatus deviceStatus = args.DeviceEndpointClient.BuildOkayStatus(); + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); deviceStatus.Config = new ConfigStatus() { Error = new ConfigError() diff --git a/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs b/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs index 0bb59f1ffc..93c5d2ffb0 100644 --- a/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs +++ b/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs @@ -89,6 +89,16 @@ public Task CreateOrUpdateDiscove return mockClientWrapper.Object.CreateOrUpdateDiscoveredDeviceAsync(request, inboundEndpointType, commandTimeout, cancellationToken); } + public Task GetDeviceStatusAsync(string deviceName, string inboundEndpointName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return mockClientWrapper.Object.GetDeviceStatusAsync(deviceName, inboundEndpointName, commandTimeout, cancellationToken); + } + + public Task GetAssetStatusAsync(string deviceName, string inboundEndpointName, string assetName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return mockClientWrapper.Object.GetAssetStatusAsync(deviceName, inboundEndpointName, assetName, commandTimeout, cancellationToken); + } + public ValueTask DisposeAsync() { return ValueTask.CompletedTask; From 5af3e911beab6f82f240665ca8e96ffe42f5f2b8 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Mon, 10 Nov 2025 15:40:19 -0800 Subject: [PATCH 14/21] all in one? --- dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index f87ad61d05..feae59059d 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -54,6 +54,14 @@ public Task UpdateAssetStatusAsync( cancellationToken); } + public Task UpdateAssetStatusAsync( + Func status, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + + } + public Task GetAssetStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) From 6ee7df8da34debb08da1b1fbf8605c0bbc527183 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Tue, 11 Nov 2025 11:59:32 -0800 Subject: [PATCH 15/21] more --- ...EventDrivenTcpThermostatConnectorWorker.cs | 65 ++++++--- .../AssetClient.cs | 8 -- ...cs => AssetManagementGroupActionStatus.cs} | 2 +- .../Models/AssetManagementGroupStatus.cs | 8 ++ ...AssetManagementGroupStatusSchemaElement.cs | 8 -- .../Models/AssetStatus.cs | 129 +++++++++++++++++- .../Models/ModelsConverter.cs | 8 +- .../Models/ProtocolConverter.cs | 4 +- 8 files changed, 190 insertions(+), 42 deletions(-) rename dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/{AssetManagementGroupActionStatusSchemaElement.cs => AssetManagementGroupActionStatus.cs} (84%) create mode 100644 dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs delete mode 100644 dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index dd004d26cd..d1fd8afaac 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -29,9 +29,16 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc { _logger.LogInformation("Device with name {0} is now available", args.DeviceName); - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); try { + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); + deviceStatus.Config ??= new(); + deviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + + deviceStatus.Endpoints ??= new(); + deviceStatus.Endpoints.Inbound ??= new(); + deviceStatus.Endpoints.Inbound[args.InboundEndpointName] ??= new(); + _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } @@ -49,9 +56,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (args.Asset.EventGroups == null || args.Asset.EventGroups.Count != 1) { _logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName); - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + try { + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + assetStatus.Config ??= new(); + assetStatus.Config.LastTransitionTime = DateTime.UtcNow; + assetStatus.EventGroups ??= new(); + assetStatus.EventGroups.Clear(); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } catch (Exception e2) @@ -65,9 +77,14 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (eventGroup.Events == null || eventGroup.Events.Count != 1) { _logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName); - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + try { + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + assetStatus.Config ??= new(); + assetStatus.Config.LastTransitionTime = DateTime.UtcNow; + assetStatus.EventGroups ??= new(); + assetStatus.ClearEventGroupStatus(eventGroup.Name); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } catch (Exception e2) @@ -84,14 +101,22 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel { // If the asset's has no event doesn't specify a port, then do nothing _logger.LogError("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - assetStatus.EventGroups!.First().Events!.First().Error = new ConfigError() - { - Message = "The configured event was either missing the expected port or had a non-integer value for the port", - }; try { + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + assetStatus.Config ??= new(); + assetStatus.Config.LastTransitionTime = DateTime.UtcNow; + + assetStatus.UpdateEventStatus(eventGroup.Name, new() + { + Name = assetEvent.Name, + Error = new ConfigError() + { + Message = "The configured event was either missing the expected port or had a non-integer value for the port", + } + }); + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } @@ -136,10 +161,12 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e _logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName); await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken); - // Report status of the asset once the first event has been received and forwarded - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); try { + // Report status of the asset once the first event has been received and forwarded + AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); + assetStatus.Config ??= new(); + assetStatus.Config.LastTransitionTime = DateTime.UtcNow; await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } catch (Exception e2) @@ -159,16 +186,18 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e _logger.LogError(e, "Failed to open TCP connection to asset"); } - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); - deviceStatus.SetEndpointError( - InboundEndpointName, - new ConfigError() - { - Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." - }); - try { + DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); + deviceStatus.Config ??= new(); + deviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + deviceStatus.SetEndpointError( + InboundEndpointName, + new ConfigError() + { + Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." + }); + await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } catch (Exception e2) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index feae59059d..f87ad61d05 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -54,14 +54,6 @@ public Task UpdateAssetStatusAsync( cancellationToken); } - public Task UpdateAssetStatusAsync( - Func status, - TimeSpan? commandTimeout = null, - CancellationToken cancellationToken = default) - { - - } - public Task GetAssetStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs similarity index 84% rename from dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs rename to dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs index e799b5ba82..c764c775e3 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs @@ -1,6 +1,6 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; -public record AssetManagementGroupActionStatusSchemaElement +public record AssetManagementGroupActionStatus { public ConfigError? Error { get; set; } public required string Name { get; set; } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs new file mode 100644 index 0000000000..e0da076e37 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs @@ -0,0 +1,8 @@ +namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +public record AssetManagementGroupStatus +{ + public List? Actions { get; set; } + + public required string Name { get; set; } +} diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs deleted file mode 100644 index 4a681b7a71..0000000000 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; - -public record AssetManagementGroupStatusSchemaElement -{ - public List? Actions { get; set; } - - public required string Name { get; set; } -} diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs index 2c53b9d332..d5f8186101 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs @@ -36,7 +36,7 @@ public record AssetStatus /// /// Each management group should only report its latest status. /// - public List? ManagementGroups { get; set; } + public List? ManagementGroups { get; set; } /// /// The status of all streams associated with this asset (if it has any streams). @@ -45,4 +45,131 @@ public record AssetStatus /// Each stream should only report its latest status. /// public List? Streams { get; set; } + + public void UpdateDatasetStatus(AssetDatasetEventStreamStatus newStatus) + { + Datasets ??= new(); + + foreach (AssetDatasetEventStreamStatus currentStatus in Datasets) + { + if (currentStatus.Name.Equals(newStatus.Name)) + { + // If the dataset status is already present in the list, update it in place + currentStatus.Error = newStatus.Error; + currentStatus.MessageSchemaReference = newStatus.MessageSchemaReference; + return; + } + } + + // If the dataset status did not exist in the list, just add it + Datasets.Add(newStatus); + } + + public void UpdateStreamStatus(AssetDatasetEventStreamStatus newStatus) + { + Streams ??= new(); + + foreach (AssetDatasetEventStreamStatus currentStatus in Streams) + { + if (currentStatus.Name.Equals(newStatus.Name)) + { + // If the dataset status is already present in the list, update it in place + currentStatus.Error = newStatus.Error; + currentStatus.MessageSchemaReference = newStatus.MessageSchemaReference; + return; + } + } + + // If the dataset status did not exist in the list, just add it + Streams.Add(newStatus); + } + + public void ClearEventGroupStatus(string eventGroupName) + { + if (EventGroups != null) + { + AssetEventGroupStatus? eventGroupStatusToRemove = null; + foreach (AssetEventGroupStatus eventGroupStatus in EventGroups) + { + if (eventGroupStatus.Name.Equals(eventGroupName)) + { + eventGroupStatusToRemove = eventGroupStatus; + } + } + + if (eventGroupStatusToRemove != null) + { + EventGroups.Remove(eventGroupStatusToRemove); + } + } + } + + public void UpdateEventStatus(string eventGroupName, AssetDatasetEventStreamStatus eventNewStatus) + { + EventGroups ??= new(); + + foreach (AssetEventGroupStatus eventGroupStatus in EventGroups) + { + if (eventGroupStatus.Name.Equals(eventGroupName)) + { + eventGroupStatus.Events ??= new(); + foreach (AssetDatasetEventStreamStatus eventStatus in eventGroupStatus.Events) + { + if (eventStatus.Name.Equals(eventNewStatus.Name)) + { + // If the event group status exists and it contains this event's status, update it in place + eventStatus.Error = eventNewStatus.Error; + eventStatus.MessageSchemaReference = eventNewStatus.MessageSchemaReference; + return; + } + } + + // If the event group status exists, but no status was found for this event, just add it. + eventGroupStatus.Events.Add(eventNewStatus); + return; + } + } + + // If the event group status did not exist, just add it + EventGroups.Add(new AssetEventGroupStatus() + { + Name = eventGroupName, + Events = new List { eventNewStatus } + }); + } + + public void UpdateManagementGroupStatus(string managementGroupName, AssetManagementGroupActionStatus actionNewStatus) + { + ManagementGroups ??= new(); + + foreach (AssetManagementGroupStatus managementGroupStatus in ManagementGroups) + { + if (managementGroupStatus.Name.Equals(managementGroupName)) + { + managementGroupStatus.Actions ??= new(); + foreach (AssetManagementGroupActionStatus actionStatus in managementGroupStatus.Actions) + { + if (actionStatus.Name.Equals(actionNewStatus.Name)) + { + // If the management group status exists and it contains this action's status, update it in place + actionStatus.Error = actionNewStatus.Error; + actionStatus.RequestMessageSchemaReference = actionNewStatus.RequestMessageSchemaReference; + actionStatus.ResponseMessageSchemaReference = actionNewStatus.ResponseMessageSchemaReference; + return; + } + } + + // If the management group status exists, but no status was found for this action, just add it. + managementGroupStatus.Actions.Add(actionNewStatus); + return; + } + } + + // If the management group status did not exist, just add it + ManagementGroups.Add(new AssetManagementGroupStatus() + { + Name = managementGroupName, + Actions = new List { actionNewStatus } + }); + } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs index d2f085ec2c..81d4868a6f 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs @@ -461,18 +461,18 @@ internal static AssetDatasetEventStreamStatus ToModel(this AdrBaseService.AssetD }; } - internal static AssetManagementGroupStatusSchemaElement ToModel(this AssetManagementGroupStatusSchemaElementSchema source) + internal static AssetManagementGroupStatus ToModel(this AssetManagementGroupStatusSchemaElementSchema source) { - return new AssetManagementGroupStatusSchemaElement + return new AssetManagementGroupStatus { Name = source.Name, Actions = source.Actions?.Select(x => x.ToModel()).ToList(), }; } - internal static AssetManagementGroupActionStatusSchemaElement ToModel(this AssetManagementGroupActionStatusSchemaElementSchema source) + internal static AssetManagementGroupActionStatus ToModel(this AssetManagementGroupActionStatusSchemaElementSchema source) { - return new AssetManagementGroupActionStatusSchemaElement + return new AssetManagementGroupActionStatus { Error = source.Error?.ToModel(), Name = source.Name, diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs index e7e7e03447..47555aaf51 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs @@ -325,7 +325,7 @@ internal static AdrBaseService.AssetEventGroupStatusSchemaElementSchema ToProtoc }; } - internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this AssetManagementGroupStatusSchemaElement source) + internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this AssetManagementGroupStatus source) { return new AssetManagementGroupStatusSchemaElementSchema { @@ -334,7 +334,7 @@ internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this As }; } - internal static AssetManagementGroupActionStatusSchemaElementSchema ToProtocol(this AssetManagementGroupActionStatusSchemaElement source) + internal static AssetManagementGroupActionStatusSchemaElementSchema ToProtocol(this AssetManagementGroupActionStatus source) { return new AssetManagementGroupActionStatusSchemaElementSchema { From 23252f57a705563181573f01037d571c59adcb4e Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 12 Nov 2025 10:39:09 -0800 Subject: [PATCH 16/21] reviewing --- ...EventDrivenTcpThermostatConnectorWorker.cs | 20 +++++------ .../AssetClient.cs | 7 +++- .../DeviceEndpointClient.cs | 7 +++- .../Models/AssetStatus.cs | 36 +++++++++++++++++-- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index d1fd8afaac..20de16d9e3 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -42,9 +42,9 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } - catch (Exception e2) + catch (Exception e) { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } } @@ -66,9 +66,9 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel assetStatus.EventGroups.Clear(); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } - catch (Exception e2) + catch (Exception e) { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } return; } @@ -87,9 +87,9 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel assetStatus.ClearEventGroupStatus(eventGroup.Name); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } - catch (Exception e2) + catch (Exception e) { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } return; } @@ -120,9 +120,9 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } - catch (Exception e2) + catch (Exception e) { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } return; } @@ -200,9 +200,9 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); } - catch (Exception e2) + catch (Exception e) { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index f87ad61d05..fd0df7847c 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -41,7 +41,6 @@ public Task UpdateAssetStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - //TODO update lastUpdatetimeUtc return _adrClient.UpdateAssetStatusAsync( _deviceName, _inboundEndpointName, @@ -54,6 +53,12 @@ public Task UpdateAssetStatusAsync( cancellationToken); } + /// + /// Get the status of this asset from the Azure Device Registry service + /// + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service public Task GetAssetStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index 29e56aa504..fe15c22cb4 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -40,7 +40,6 @@ public Task UpdateDeviceStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - //TODO update lastUpdatetimeUtc return _adrClient.UpdateDeviceStatusAsync( _deviceName, _inboundEndpointName, @@ -49,6 +48,12 @@ public Task UpdateDeviceStatusAsync( cancellationToken); } + /// + /// Get the status of this device from the Azure Device Registry service + /// + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service public Task GetDeviceStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs index d5f8186101..dcc919c231 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs @@ -46,6 +46,14 @@ public record AssetStatus /// public List? Streams { get; set; } + /// + /// Update to replace any existing status for the dataset named in . + /// + /// The new status of the dataset. + /// + /// If the dataset has no status in yet, will be added. If the + /// dataset does have status in already, that status will be replaced entirely by . + /// public void UpdateDatasetStatus(AssetDatasetEventStreamStatus newStatus) { Datasets ??= new(); @@ -65,6 +73,14 @@ public void UpdateDatasetStatus(AssetDatasetEventStreamStatus newStatus) Datasets.Add(newStatus); } + /// + /// Update to replace any existing status for the stream named in . + /// + /// The new status of the stream. + /// + /// If the stream has no status in yet, will be added. If the + /// stream does have status in already, that status will be replaced entirely by . + /// public void UpdateStreamStatus(AssetDatasetEventStreamStatus newStatus) { Streams ??= new(); @@ -84,26 +100,35 @@ public void UpdateStreamStatus(AssetDatasetEventStreamStatus newStatus) Streams.Add(newStatus); } + /// + /// Remove any statuses related to the provided event group name from . + /// + /// The name of the event group to clear all statuses from. public void ClearEventGroupStatus(string eventGroupName) { if (EventGroups != null) { - AssetEventGroupStatus? eventGroupStatusToRemove = null; + List eventGroupStatusesToRemove = new(); foreach (AssetEventGroupStatus eventGroupStatus in EventGroups) { if (eventGroupStatus.Name.Equals(eventGroupName)) { - eventGroupStatusToRemove = eventGroupStatus; + eventGroupStatusesToRemove.Add(eventGroupStatus); } } - if (eventGroupStatusToRemove != null) + foreach (AssetEventGroupStatus eventGroupStatusToRemove in eventGroupStatusesToRemove) { EventGroups.Remove(eventGroupStatusToRemove); } } } + /// + /// Update to replace any existing status for the provided event group's event's status. + /// + /// The name of the event group that this event belongs to. + /// The new status of the event within this event group. public void UpdateEventStatus(string eventGroupName, AssetDatasetEventStreamStatus eventNewStatus) { EventGroups ??= new(); @@ -138,6 +163,11 @@ public void UpdateEventStatus(string eventGroupName, AssetDatasetEventStreamStat }); } + /// + /// Update to replace any existing status for the provided management group action. + /// + /// The name of the management group that this action belongs to. + /// The new status of this action. public void UpdateManagementGroupStatus(string managementGroupName, AssetManagementGroupActionStatus actionNewStatus) { ManagementGroups ??= new(); From d06c883c58e1bac48c24ce77f3d37d6083b404e4 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 12 Nov 2025 10:44:17 -0800 Subject: [PATCH 17/21] not used --- .../AssetAndDeviceRegistry/Models/ConfigStatus.cs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs index 28558de31c..a9366ce912 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs @@ -23,15 +23,5 @@ public partial class ConfigStatus /// A read only incremental counter indicating the number of times the configuration has been modified from the perspective of the current actual (Edge) state of the CRD. Edge would be the only writer of this value and would sync back up to the cloud. In steady state, this should equal version. /// public ulong? Version { get; set; } = default; - - public static ConfigStatus Okay() - { - return new() - { - Error = null, - LastTransitionTime = DateTime.UtcNow, - Version = null, //TODO do we need to report this? - }; - } } } From 75d91c6ddb67229910e7f29c82eceaef15e2e76f Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Wed, 12 Nov 2025 11:20:16 -0800 Subject: [PATCH 18/21] log --- .../EventDrivenTcpThermostatConnectorWorker.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 179209b401..2360d45b49 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -137,7 +137,7 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel } }); - _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); + _logger.LogWarning("Reporting asset status error to Azure Device Registry service..."); await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); } catch (Exception e) @@ -184,6 +184,7 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e try { // Report status of the asset once the first event has been received and forwarded + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); assetStatus.Config ??= new(); assetStatus.Config.LastTransitionTime = DateTime.UtcNow; From ec0414cbaadae45b47f310f0483ee0d8d131cded Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 13 Nov 2025 12:06:24 -0800 Subject: [PATCH 19/21] just getAndUpdate APIs, defer message schema registration --- ...EventDrivenTcpThermostatConnectorWorker.cs | 129 +++++++------- .../AssetAvailableEventArgs.cs | 9 +- .../AssetClient.cs | 127 ++++++++++---- .../ConnectorWorker.cs | 161 ++++++++---------- .../DeviceEndpointClient.cs | 45 ++++- .../PollingTelemetryConnectorWorker.cs | 87 +++++----- 6 files changed, 336 insertions(+), 222 deletions(-) diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index 2360d45b49..eac3de5efd 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -31,16 +31,15 @@ private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, Canc try { - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); - deviceStatus.Config ??= new(); - deviceStatus.Config.LastTransitionTime = DateTime.UtcNow; - - deviceStatus.Endpoints ??= new(); - deviceStatus.Endpoints.Inbound ??= new(); - deviceStatus.Endpoints.Inbound[args.InboundEndpointName] ??= new(); - _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentDeviceStatus.Endpoints ??= new(); + currentDeviceStatus.Endpoints.Inbound ??= new(); + currentDeviceStatus.Endpoints.Inbound[args.InboundEndpointName] ??= new(); + return currentDeviceStatus; + }, null, cancellationToken); } catch (Exception e) { @@ -79,12 +78,13 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel try { - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - assetStatus.Config ??= new(); - assetStatus.Config.LastTransitionTime = DateTime.UtcNow; - assetStatus.EventGroups ??= new(); - assetStatus.EventGroups.Clear(); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.EventGroups ??= new(); + currentAssetStatus.EventGroups.Clear(); + return currentAssetStatus; + }, null, cancellationToken); } catch (Exception e) { @@ -100,12 +100,13 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel try { - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - assetStatus.Config ??= new(); - assetStatus.Config.LastTransitionTime = DateTime.UtcNow; - assetStatus.EventGroups ??= new(); - assetStatus.ClearEventGroupStatus(eventGroup.Name); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.EventGroups ??= new(); + currentAssetStatus.ClearEventGroupStatus(eventGroup.Name); + return currentAssetStatus; + }, null, cancellationToken); } catch (Exception e) { @@ -124,21 +125,20 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel try { - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - assetStatus.Config ??= new(); - assetStatus.Config.LastTransitionTime = DateTime.UtcNow; - - assetStatus.UpdateEventStatus(eventGroup.Name, new() - { - Name = assetEvent.Name, - Error = new ConfigError() - { - Message = "The configured event was either missing the expected port or had a non-integer value for the port", - } - }); - _logger.LogWarning("Reporting asset status error to Azure Device Registry service..."); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateEventStatus(eventGroup.Name, new() + { + Name = assetEvent.Name, + Error = new ConfigError() + { + Message = "The configured event was either missing the expected port or had a non-integer value for the port", + } + }); + return currentAssetStatus; + }, null, cancellationToken); } catch (Exception e) { @@ -170,6 +170,8 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e await client.ConnectAsync(host, port, cancellationToken); await using NetworkStream stream = client.GetStream(); + bool alreadyReportedAssetStatus = false; + try { while (!cancellationToken.IsCancellationRequested) @@ -181,18 +183,31 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e _logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName); await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken); - try + if (!alreadyReportedAssetStatus) { - // Report status of the asset once the first event has been received and forwarded - _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - assetStatus.Config ??= new(); - assetStatus.Config.LastTransitionTime = DateTime.UtcNow; - await args.AssetClient.UpdateAssetStatusAsync(assetStatus, null, cancellationToken); - } - catch (Exception e2) - { - _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + try + { + // Report status of the asset once the first event has been received and forwarded + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); + + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateEventStatus(eventGroupName, new() + { + Name = assetEvent.Name, + Error = null, + MessageSchemaReference = args.AssetClient.GetRegisteredEventMessageSchema(eventGroupName, assetEvent.Name) + }); + return currentAssetStatus; + }, null, cancellationToken); + + alreadyReportedAssetStatus = true; + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } } } @@ -209,17 +224,17 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string e try { - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); - deviceStatus.Config ??= new(); - deviceStatus.Config.LastTransitionTime = DateTime.UtcNow; - deviceStatus.SetEndpointError( - InboundEndpointName, - new ConfigError() - { - Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." - }); - - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus, null, cancellationToken); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentDeviceStatus.SetEndpointError( + InboundEndpointName, + new ConfigError() + { + Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." + }); + return currentDeviceStatus; + }, null, cancellationToken); } catch (Exception e) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index 4df9258863..67028f1ff3 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -9,7 +9,7 @@ namespace Azure.Iot.Operations.Connector /// /// The event args for when an asset becomes available to sample. /// - public class AssetAvailableEventArgs : EventArgs + public class AssetAvailableEventArgs : EventArgs, IDisposable { /// /// The name of the device that this asset belongs to. @@ -68,8 +68,13 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun AssetName = assetName; Asset = asset; LeaderElectionClient = leaderElectionClient; - AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector); + AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector, device, asset); DeviceEndpointClient = new(adrClient, deviceName, inboundEndpointName); } + + public void Dispose() + { + AssetClient.Dispose(); + } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index fd0df7847c..cd59702c18 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -8,21 +8,111 @@ namespace Azure.Iot.Operations.Connector /// /// A client for updating the status of an asset and for forwarding received events and/or sampled datasets. /// - public class AssetClient + /// + /// + /// + public class AssetClient : IDisposable { private readonly IAdrClientWrapper _adrClient; private readonly ConnectorWorker _connector; private readonly string _deviceName; private readonly string _inboundEndpointName; private readonly string _assetName; + private readonly Device _device; + private readonly Asset _asset; - internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector) + // Used to make getAndUpdate calls behave atomically. Also respected by get and update methods so that a user + // does not accidentally update an asset while another thread is in the middle of a getAndUpdate call. + private readonly SemaphoreSlim _semaphore = new(0, 1); + + internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector, Device device, Asset asset) { _adrClient = adrClient; _deviceName = deviceName; _inboundEndpointName = inboundEndpointName; _assetName = assetName; _connector = connector; + _device = device; + _asset = asset; + } + + /// + /// Get the current status of this asset and then optionally update it. + /// + /// The function that determines the new asset status when given the current asset status. + /// The timeout for each of the 'get' and 'update' commands. + /// Cancellation token. + /// The latest asset status after this operation. + /// + /// If after retrieving the current status, you don't want to send any updates, should return null. + /// If this happens, this function will return the latest asset status without trying to update it. + /// + public async Task GetAndUpdateAssetStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + try + { + AssetStatus currentStatus = await GetAssetStatusAsync(commandTimeout, cancellationToken); + AssetStatus? desiredStatus = handler.Invoke(currentStatus); + if (desiredStatus != null) + { + return await UpdateAssetStatusAsync(desiredStatus, commandTimeout, cancellationToken); + } + + return currentStatus; + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Push a sampled dataset to the configured destinations. + /// + /// The dataset that was sampled. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardSampledDatasetAsync(AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardSampledDatasetAsync(_deviceName, _device, _inboundEndpointName, _assetName, _asset, dataset, serializedPayload, userData, cancellationToken); + } + + /// + /// Push a received event payload to the configured destinations. + /// + /// The name of the event group that this event belongs to. + /// The event. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardReceivedEventAsync(string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardReceivedEventAsync(_deviceName, _device, _inboundEndpointName, _assetName, _asset, eventGroupName, assetEvent, serializedPayload, userData, cancellationToken); + } + + public MessageSchemaReference? GetRegisteredDatasetMessageSchema(string datasetName) + { + return _connector.GetRegisteredDatasetMessageSchema(_deviceName, _inboundEndpointName, _assetName, datasetName); + } + + public MessageSchemaReference? GetRegisteredEventMessageSchema(string eventGroupName, string eventName) + { + return _connector.GetRegisteredEventMessageSchema(_deviceName, _inboundEndpointName, _assetName, eventGroupName, eventName); + } + + + public void Dispose() + { + try + { + _semaphore.Dispose(); + } + catch (ObjectDisposedException) + { + // It's fine if this sempahore is already disposed. + } } /// @@ -36,12 +126,12 @@ internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inbo /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure /// Device Registry service with what is provided. /// - public Task UpdateAssetStatusAsync( + private async Task UpdateAssetStatusAsync( AssetStatus status, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return _adrClient.UpdateAssetStatusAsync( + return await _adrClient.UpdateAssetStatusAsync( _deviceName, _inboundEndpointName, new UpdateAssetStatusRequest() @@ -59,41 +149,16 @@ public Task UpdateAssetStatusAsync( /// The timeout for this RPC command invocation. /// The cancellation token. /// The status returned by the Azure Device Registry service - public Task GetAssetStatusAsync( + private async Task GetAssetStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return _adrClient.GetAssetStatusAsync( + return await _adrClient.GetAssetStatusAsync( _deviceName, _inboundEndpointName, _assetName, commandTimeout, cancellationToken); } - - /// - /// Push a sampled dataset to the configured destinations. - /// - /// The dataset that was sampled. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardSampledDatasetAsync(AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) - { - await _connector.ForwardSampledDatasetAsync(_deviceName, _inboundEndpointName, _assetName, dataset, serializedPayload, userData, cancellationToken); - } - - /// - /// Push a received event payload to the configured destinations. - /// - /// The name of the event group that this event belongs to. - /// The event. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardReceivedEventAsync(string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) - { - await _connector.ForwardReceivedEventAsync(_deviceName, _inboundEndpointName, _assetName, eventGroupName, assetEvent, serializedPayload, userData, cancellationToken); - } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs index 29e9699ead..a735e72162 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Data; -using System.Collections.Generic; using System.Text; using Azure.Iot.Operations.Connector.ConnectorConfigurations; using Azure.Iot.Operations.Connector.Exceptions; @@ -288,12 +287,45 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } // Called by AssetClient instances - internal async Task ForwardSampledDatasetAsync(string deviceName, string inboundEndpointName, string assetName, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + internal async Task ForwardSampledDatasetAsync(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); CloudEvent? cloudEvent = null; - if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", out var registeredDatasetMessageSchema)) + Schema? registeredDatasetMessageSchema = null; + if (!_registeredDatasetMessageSchemas.ContainsKey($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}")) + { + // This may register a message schema that has already been uploaded, but the schema registry service is idempotent + var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, dataset.Name!, dataset); + if (datasetMessageSchema != null) + { + try + { + _logger.LogInformation($"Registering message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); + registeredDatasetMessageSchema = await schemaRegistryClient.PutAsync( + datasetMessageSchema.SchemaContent, + datasetMessageSchema.SchemaFormat, + datasetMessageSchema.SchemaType, + datasetMessageSchema.Version ?? "1", + datasetMessageSchema.Tags); + + _logger.LogInformation($"Registered message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); + + _registeredDatasetMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", registeredDatasetMessageSchema); + } + catch (Exception ex) + { + _logger.LogError($"Failed to register message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); + } + } + else + { + _logger.LogInformation($"No message schema will be registered for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + } + } + + if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", out registeredDatasetMessageSchema)) { if (Uri.IsWellFormedUriString(inboundEndpointName, UriKind.RelativeOrAbsolute)) { @@ -383,7 +415,7 @@ await telemetrySender.SendTelemetryAsync( } // Called by AssetClient instances - internal async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, string assetName, string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + internal async Task ForwardReceivedEventAsync(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); @@ -395,8 +427,41 @@ internal async Task ForwardReceivedEventAsync(string deviceName, string inboundE return; } + Schema? registeredEventMessageSchema = null; + if (!_registeredEventMessageSchemas.ContainsKey($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}")) + { + // This may register a message schema that has already been uploaded, but the schema registry service is idempotent + var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, assetEvent.Name, assetEvent); + if (eventMessageSchema != null) + { + try + { + _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} in event group with name {eventGroupName} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); + registeredEventMessageSchema = await schemaRegistryClient.PutAsync( + eventMessageSchema.SchemaContent, + eventMessageSchema.SchemaFormat, + eventMessageSchema.SchemaType, + eventMessageSchema.Version ?? "1", + eventMessageSchema.Tags); + + _logger.LogInformation($"Registered message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); + + _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent.Name}", registeredEventMessageSchema); + } + catch (Exception ex) + { + _logger.LogError($"Failed to register message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); + } + } + else + { + _logger.LogInformation($"No message schema will be registered for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + } + } + CloudEvent? cloudEvent = null; - if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}", out var registeredEventMessageSchema)) + if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}", out registeredEventMessageSchema)) { if (Uri.IsWellFormedUriString(inboundEndpointName, UriKind.RelativeOrAbsolute)) { @@ -493,7 +558,7 @@ private async void OnAssetChanged(object? _, AssetChangedEventArgs args) if (args.ChangeType == ChangeType.Created) { _logger.LogInformation("Asset with name {0} created on endpoint with name {1} on device with name {2}", args.AssetName, args.InboundEndpointName, args.DeviceName); - await AssetAvailableAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); + AssetAvailable(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); _adrClient!.ObserveAssets(args.DeviceName, args.InboundEndpointName); } else if (args.ChangeType == ChangeType.Deleted) @@ -509,7 +574,7 @@ private async void OnAssetChanged(object? _, AssetChangedEventArgs args) { _logger.LogInformation("Asset with name {0} updated on endpoint with name {1} on device with name {2}", args.AssetName, args.InboundEndpointName, args.DeviceName); await AssetUnavailableAsync(args.DeviceName, args.InboundEndpointName, args.AssetName, true); - await AssetAvailableAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); + AssetAvailable(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); } } @@ -597,7 +662,7 @@ private async Task DeviceUnavailableAsync(DeviceChangedEventArgs args, string co } } - private async Task AssetAvailableAsync(string deviceName, string inboundEndpointName, Asset? asset, string assetName) + private void AssetAvailable(string deviceName, string inboundEndpointName, Asset? asset, string assetName) { string compoundDeviceName = $"{deviceName}_{inboundEndpointName}"; @@ -628,88 +693,11 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { _logger.LogInformation($"Asset with name {assetName} has no datasets to sample"); } - else - { - foreach (var dataset in asset.Datasets) - { - // This may register a message schema that has already been uploaded, but the schema registry service is idempotent - var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, dataset.Name!, dataset); - if (datasetMessageSchema != null) - { - try - { - _logger.LogInformation($"Registering message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); - var registeredDatasetMessageSchema = await schemaRegistryClient.PutAsync( - datasetMessageSchema.SchemaContent, - datasetMessageSchema.SchemaFormat, - datasetMessageSchema.SchemaType, - datasetMessageSchema.Version ?? "1", - datasetMessageSchema.Tags); - - _logger.LogInformation($"Registered message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); - - _registeredDatasetMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", registeredDatasetMessageSchema); - } - catch (Exception ex) - { - _logger.LogError($"Failed to register message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); - } - } - else - { - _logger.LogInformation($"No message schema will be registered for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - } - } - } if (asset.EventGroups == null) { _logger.LogInformation($"Asset with name {assetName} has no events to listen for"); } - else - { - foreach (var assetEventGroup in asset.EventGroups) - { - if (assetEventGroup.Events == null) - { - _logger.LogInformation($"Event group with name {assetEventGroup.Name} has no events to register message schemas for"); - continue; - } - - foreach (var assetEvent in assetEventGroup.Events) - { - // This may register a message schema that has already been uploaded, but the schema registry service is idempotent - var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, assetEvent.Name, assetEvent); - if (eventMessageSchema != null) - { - try - { - _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} in event group with name {assetEventGroup.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); - var registeredEventSchema = await schemaRegistryClient.PutAsync( - eventMessageSchema.SchemaContent, - eventMessageSchema.SchemaFormat, - eventMessageSchema.SchemaType, - eventMessageSchema.Version ?? "1", - eventMessageSchema.Tags); - - _logger.LogInformation($"Registered message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); - - _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEventGroup.Name}_{assetEvent.Name}", registeredEventSchema); - } - catch (Exception ex) - { - _logger.LogError($"Failed to register message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); - } - } - else - { - _logger.LogInformation($"No message schema will be registered for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - } - } - } - } if (WhileAssetIsAvailable != null) { @@ -720,7 +708,8 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { try { - await WhileAssetIsAvailable.Invoke(new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient, _adrClient!, this), assetTaskCancellationTokenSource.Token); + using AssetAvailableEventArgs args = new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient, _adrClient!, this); + await WhileAssetIsAvailable.Invoke(args, assetTaskCancellationTokenSource.Token); } catch (OperationCanceledException) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index fe15c22cb4..4684432231 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Threading; using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; namespace Azure.Iot.Operations.Connector @@ -14,6 +15,10 @@ public class DeviceEndpointClient private readonly string _deviceName; private readonly string _inboundEndpointName; + // Used to make getAndUpdate calls behave atomically. Also respected by get and update methods so that a user + // does not accidentally update a device while another thread is in the middle of a getAndUpdate call. + private readonly SemaphoreSlim _semaphore = new(0, 1); + internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName) { _adrClient = adrClient; @@ -21,6 +26,38 @@ internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, st _inboundEndpointName = inboundEndpointName; } + /// + /// Get the current status of this device and then optionally update it. + /// + /// The function that determines the new device status when given the current device status. + /// Cancellation token. + /// The timeout for each of the 'get' and 'update' commands. + /// The latest device status after this operation. + /// + /// If after retrieving the current status, you don't want to send any updates, should return null. + /// If this happens, this function will return the latest asset status without trying to update it. + /// + public async Task GetAndUpdateDeviceStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + try + { + DeviceStatus currentStatus = await GetDeviceStatusAsync(commandTimeout, cancellationToken); + DeviceStatus? desiredStatus = handler.Invoke(currentStatus); + + if (desiredStatus != null) + { + return await UpdateDeviceStatusAsync(desiredStatus, commandTimeout, cancellationToken); + } + + return currentStatus; + } + finally + { + _semaphore.Release(); + } + } + /// /// Update the status of a specific device in the Azure Device Registry service /// @@ -35,12 +72,12 @@ internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, st /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to /// report without needing to include the existing status of previously reported endpoints. /// - public Task UpdateDeviceStatusAsync( + private async Task UpdateDeviceStatusAsync( DeviceStatus status, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return _adrClient.UpdateDeviceStatusAsync( + return await _adrClient.UpdateDeviceStatusAsync( _deviceName, _inboundEndpointName, status, @@ -54,11 +91,11 @@ public Task UpdateDeviceStatusAsync( /// The timeout for this RPC command invocation. /// The cancellation token. /// The status returned by the Azure Device Registry service - public Task GetDeviceStatusAsync( + private async Task GetDeviceStatusAsync( TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { - return _adrClient.GetDeviceStatusAsync( + return await _adrClient.GetDeviceStatusAsync( _deviceName, _inboundEndpointName, commandTimeout, diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index 74f1f1914e..71fc64b6f5 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -22,12 +22,15 @@ public PollingTelemetryConnectorWorker(ApplicationContext applicationContext, IL public async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) { - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); try { // Report device status is okay _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + return currentDeviceStatus; + }, null, cancellationToken); } catch (Exception e) { @@ -39,26 +42,6 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell { cancellationToken.ThrowIfCancellationRequested(); - // Skip sampling if the device is explicitly disabled (Enabled is false). Undefined (null) value is treated as enabled. - if (args.Device.Enabled != true && args.Device.Enabled != null) - { - _logger.LogWarning("Device {0} is disabled. Skipping asset {1} sampling until device is enabled.", args.DeviceName, args.AssetName); - // Note: When the device is updated, ConnectorWorker will automatically cancel this handler - // and reinvoke it with the updated device information if it becomes enabled. - return; - } - - // Skip sampling if the device is explicitly disabled (Enabled is false). Undefined (null) value is treated as enabled. - if (args.Device.Enabled != true && args.Device.Enabled != null) - { - _logger.LogWarning("Asset {0} is disabled. Skipping sampling until asset is enabled.", args.AssetName); - // Note: When the asset is updated, ConnectorWorker will automatically cancel this handler - // and reinvoke it with the updated asset information if it becomes enabled. - return; - } - - _logger.LogInformation("Starting to sample enabled asset {0} on enabled device {1}", args.AssetName, args.DeviceName); - if (args.Asset.Datasets == null) { return; @@ -66,8 +49,10 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell Dictionary datasetsTimers = new(); _assetsSamplingTimers[args.AssetName] = datasetsTimers; - foreach (AssetDataset dataset in args.Asset.Datasets!) + Dictionary okayStatusReportedByDataset = new(); + foreach (AssetDataset dataset in args.Asset.Datasets!) //TODO only send status once per dataset { + okayStatusReportedByDataset.Add(dataset.Name, false); EndpointCredentials? credentials = null; if (args.Device.Endpoints != null && args.Device.Endpoints.Inbound != null @@ -89,34 +74,52 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell byte[] sampledData = await datasetSampler.SampleDatasetAsync(dataset); await args.AssetClient.ForwardSampledDatasetAsync(dataset, sampledData); - AssetStatus assetStatus = await args.AssetClient.GetAssetStatusAsync(); - try + // Only report a status for this dataset if it hasn't been reported yet already + if (!okayStatusReportedByDataset[dataset.Name]) { - // The dataset was sampled as expected, so report the asset status as okay - _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); - await args.AssetClient.UpdateAssetStatusAsync(assetStatus); - } - catch (Exception e2) - { - _logger.LogError(e2, "Failed to report asset status to Azure Device Registry service"); + try + { + // The dataset was sampled as expected, so report the asset status as okay + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); + await args.AssetClient.GetAndUpdateAssetStatusAsync( + (currentAssetStatus) => + { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateDatasetStatus(new AssetDatasetEventStreamStatus() + { + Name = dataset.Name, + MessageSchemaReference = args.AssetClient.GetRegisteredDatasetMessageSchema(dataset.Name), + Error = null + }); + return currentAssetStatus; + }, + null, + cancellationToken); + okayStatusReportedByDataset[dataset.Name] = true; + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report asset status to Azure Device Registry service"); + } } } catch (Exception e) { - DeviceStatus deviceStatus = await args.DeviceEndpointClient.GetDeviceStatusAsync(); - deviceStatus.Config = new ConfigStatus() - { - Error = new ConfigError() - { - Message = $"Unable to sample the device. Error message: {e.Message}", - } - }; - _logger.LogError(e, "Failed to sample the dataset"); try { - await args.DeviceEndpointClient.UpdateDeviceStatusAsync(deviceStatus); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new ConfigStatus(); + currentDeviceStatus.Config.Error = + new ConfigError() + { + Message = $"Unable to sample the device. Error message: {e.Message}", + }; + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + return currentDeviceStatus; + }, null, cancellationToken); } catch (Exception e2) { From e47767b197335b4e3e3c6d4e1000d7fcd8c1f8ac Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 13 Nov 2025 12:08:28 -0800 Subject: [PATCH 20/21] documentation --- dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs | 4 ++++ .../Azure.Iot.Operations.Connector/DeviceEndpointClient.cs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs index cd59702c18..8b2ee52a08 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -46,6 +46,10 @@ internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inbo /// /// If after retrieving the current status, you don't want to send any updates, should return null. /// If this happens, this function will return the latest asset status without trying to update it. + /// + /// This method uses a semaphore to ensure that this same client doesn't accidentally update the asset status while + /// another thread is in the middle of updating the same asset. This ensures that the current device status provided in + /// stays accurate while any updating occurs. /// public async Task GetAndUpdateAssetStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs index 4684432231..7a348257e9 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -36,6 +36,10 @@ internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, st /// /// If after retrieving the current status, you don't want to send any updates, should return null. /// If this happens, this function will return the latest asset status without trying to update it. + /// + /// This method uses a semaphore to ensure that this same client doesn't accidentally update the device status while + /// another thread is in the middle of updating the same device. This ensures that the current device status provided in + /// stays accurate while any updating occurs. /// public async Task GetAndUpdateDeviceStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) { From 1ece6ccc60b9ab09de1cd5a3b00f63a2bd329991 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 13 Nov 2025 14:17:32 -0800 Subject: [PATCH 21/21] fix --- .../Files/AssetFileMonitor.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs b/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs index dcb0a1c86f..f21c5a7d61 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs @@ -190,11 +190,8 @@ public void ObserveDevices() /// public void UnobserveDevices() { - if (_deviceDirectoryMonitor != null) - { - _deviceDirectoryMonitor.Stop(); - _deviceDirectoryMonitor = null; - } + _deviceDirectoryMonitor?.Stop(); + _deviceDirectoryMonitor = null; } ///