diff --git a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs index ae27c3c030..eac3de5efd 100644 --- a/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs +++ b/dotnet/samples/Connectors/EventDrivenTcpThermostatConnector/EventDrivenTcpThermostatConnectorWorker.cs @@ -13,15 +13,40 @@ public class EventDrivenTcpThermostatConnectorWorker : BackgroundService, IDispo private readonly ILogger _logger; private readonly ConnectorWorker _connector; + private const string InboundEndpointName = "my_tcp_endpoint"; + public EventDrivenTcpThermostatConnectorWorker(ApplicationContext applicationContext, ILogger logger, ILogger connectorLogger, IMqttClient mqttClient, IMessageSchemaProvider datasetSamplerFactory, IAdrClientWrapperProvider adrClientFactory, IConnectorLeaderElectionConfigurationProvider leaderElectionConfigurationProvider) { _logger = logger; _connector = new(applicationContext, connectorLogger, mqttClient, datasetSamplerFactory, adrClientFactory, leaderElectionConfigurationProvider) { - WhileAssetIsAvailable = WhileAssetAvailableAsync + WhileAssetIsAvailable = WhileAssetAvailableAsync, + WhileDeviceIsAvailable = WhileDeviceAvailableAsync, }; } + private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) + { + _logger.LogInformation("Device with name {0} is now available", args.DeviceName); + + try + { + _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentDeviceStatus.Endpoints ??= new(); + currentDeviceStatus.Endpoints.Inbound ??= new(); + currentDeviceStatus.Endpoints.Inbound[args.InboundEndpointName] ??= new(); + return currentDeviceStatus; + }, null, cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } + } + private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) { _logger.LogInformation("Asset with name {0} is now sampleable", args.AssetName); @@ -49,14 +74,44 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (args.Asset.EventGroups == null || args.Asset.EventGroups.Count != 1) { - _logger.LogError("Asset with name {0} does not have the expected event group", args.AssetName); + _logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName); + + try + { + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.EventGroups ??= new(); + currentAssetStatus.EventGroups.Clear(); + return currentAssetStatus; + }, null, cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } return; } var eventGroup = args.Asset.EventGroups.First(); if (eventGroup.Events == null || eventGroup.Events.Count != 1) { - _logger.LogError("Asset with name {0} does not have the expected event within its event group", args.AssetName); + _logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName); + + try + { + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.EventGroups ??= new(); + currentAssetStatus.ClearEventGroupStatus(eventGroup.Name); + return currentAssetStatus; + }, null, cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } return; } @@ -66,14 +121,36 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel if (assetEvent.DataSource == null || !int.TryParse(assetEvent.DataSource, out int port)) { // If the asset's has no event doesn't specify a port, then do nothing - _logger.LogInformation("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); + _logger.LogError("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName); + + try + { + _logger.LogWarning("Reporting asset status error to Azure Device Registry service..."); + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateEventStatus(eventGroup.Name, new() + { + Name = assetEvent.Name, + Error = new ConfigError() + { + Message = "The configured event was either missing the expected port or had a non-integer value for the port", + } + }); + return currentAssetStatus; + }, null, cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } return; } - await OpenTcpConnectionAsync(args, assetEvent, port, cancellationToken); + await OpenTcpConnectionAsync(args, args.Asset.EventGroups.First().Name, assetEvent, port, cancellationToken); } - private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEvent assetEvent, int port, CancellationToken cancellationToken) + private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string eventGroupName, AssetEvent assetEvent, int port, CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { @@ -87,12 +164,14 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve return; } - string host = args.Device.Endpoints.Inbound["my_tcp_endpoint"].Address.Split(":")[0]; + string host = args.Device.Endpoints.Inbound[InboundEndpointName].Address.Split(":")[0]; _logger.LogInformation("Attempting to open TCP client with address {0} and port {1}", host, port); using TcpClient client = new(); await client.ConnectAsync(host, port, cancellationToken); await using NetworkStream stream = client.GetStream(); + bool alreadyReportedAssetStatus = false; + try { while (!cancellationToken.IsCancellationRequested) @@ -102,7 +181,34 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve Array.Resize(ref buffer, bytesRead); _logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName); - await _connector.ForwardReceivedEventAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName, assetEvent, buffer, null, cancellationToken); + await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken); + + if (!alreadyReportedAssetStatus) + { + try + { + // Report status of the asset once the first event has been received and forwarded + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); + + await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateEventStatus(eventGroupName, new() + { + Name = assetEvent.Name, + Error = null, + MessageSchemaReference = args.AssetClient.GetRegisteredEventMessageSchema(eventGroupName, assetEvent.Name) + }); + return currentAssetStatus; + }, null, cancellationToken); + + alreadyReportedAssetStatus = true; + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } + } } } catch (Exception e) @@ -115,6 +221,25 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve { _logger.LogError(e, "Failed to open TCP connection to asset"); } + + try + { + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentDeviceStatus.SetEndpointError( + InboundEndpointName, + new ConfigError() + { + Message = "Unable to connect to the TCP endpoint. The connector will retry to connect." + }); + return currentDeviceStatus; + }, null, cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); + } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs b/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs index 4f9817f6cb..8305a9b959 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs @@ -171,6 +171,18 @@ public Task CreateOrUpdateDiscove return _client.CreateOrUpdateDiscoveredDeviceAsync(request, inboundEndpointType, commandTimeout, cancellationToken); } + /// + public Task GetDeviceStatusAsync(string deviceName, string inboundEndpointName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return _client.GetDeviceStatusAsync(deviceName, inboundEndpointName, commandTimeout, cancellationToken); + } + + /// + public Task GetAssetStatusAsync(string deviceName, string inboundEndpointName, string assetName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return _client.GetAssetStatusAsync(deviceName, inboundEndpointName, assetName, commandTimeout, cancellationToken); + } + /// public ValueTask DisposeAsync() { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs index 0d84d0edc8..67028f1ff3 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetAvailableEventArgs.cs @@ -9,12 +9,21 @@ namespace Azure.Iot.Operations.Connector /// /// The event args for when an asset becomes available to sample. /// - public class AssetAvailableEventArgs : EventArgs + public class AssetAvailableEventArgs : EventArgs, IDisposable { + /// + /// The name of the device that this asset belongs to. + /// public string DeviceName { get; } + /// + /// The device that this asset belongs to. + /// public Device Device { get; } + /// + /// The name of the endpoint that this asset belongs to. + /// public string InboundEndpointName { get; } /// @@ -41,7 +50,17 @@ public class AssetAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } - internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient) + /// + /// The client to use to send status updates for assets on and to use to forward sampled datasets/received events with. + /// + public AssetClient AssetClient { get; } + + /// + /// The client to use to send status updates for this asset's device on. + /// + public DeviceEndpointClient DeviceEndpointClient { get; } + + internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrClient, ConnectorWorker connector) { DeviceName = deviceName; Device = device; @@ -49,6 +68,13 @@ internal AssetAvailableEventArgs(string deviceName, Device device, string inboun AssetName = assetName; Asset = asset; LeaderElectionClient = leaderElectionClient; + AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector, device, asset); + DeviceEndpointClient = new(adrClient, deviceName, inboundEndpointName); + } + + public void Dispose() + { + AssetClient.Dispose(); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs new file mode 100644 index 0000000000..8b2ee52a08 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Connector/AssetClient.cs @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +namespace Azure.Iot.Operations.Connector +{ + /// + /// A client for updating the status of an asset and for forwarding received events and/or sampled datasets. + /// + /// + /// + /// + public class AssetClient : IDisposable + { + private readonly IAdrClientWrapper _adrClient; + private readonly ConnectorWorker _connector; + private readonly string _deviceName; + private readonly string _inboundEndpointName; + private readonly string _assetName; + private readonly Device _device; + private readonly Asset _asset; + + // Used to make getAndUpdate calls behave atomically. Also respected by get and update methods so that a user + // does not accidentally update an asset while another thread is in the middle of a getAndUpdate call. + private readonly SemaphoreSlim _semaphore = new(0, 1); + + internal AssetClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName, string assetName, ConnectorWorker connector, Device device, Asset asset) + { + _adrClient = adrClient; + _deviceName = deviceName; + _inboundEndpointName = inboundEndpointName; + _assetName = assetName; + _connector = connector; + _device = device; + _asset = asset; + } + + /// + /// Get the current status of this asset and then optionally update it. + /// + /// The function that determines the new asset status when given the current asset status. + /// The timeout for each of the 'get' and 'update' commands. + /// Cancellation token. + /// The latest asset status after this operation. + /// + /// If after retrieving the current status, you don't want to send any updates, should return null. + /// If this happens, this function will return the latest asset status without trying to update it. + /// + /// This method uses a semaphore to ensure that this same client doesn't accidentally update the asset status while + /// another thread is in the middle of updating the same asset. This ensures that the current device status provided in + /// stays accurate while any updating occurs. + /// + public async Task GetAndUpdateAssetStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + try + { + AssetStatus currentStatus = await GetAssetStatusAsync(commandTimeout, cancellationToken); + AssetStatus? desiredStatus = handler.Invoke(currentStatus); + if (desiredStatus != null) + { + return await UpdateAssetStatusAsync(desiredStatus, commandTimeout, cancellationToken); + } + + return currentStatus; + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Push a sampled dataset to the configured destinations. + /// + /// The dataset that was sampled. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardSampledDatasetAsync(AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardSampledDatasetAsync(_deviceName, _device, _inboundEndpointName, _assetName, _asset, dataset, serializedPayload, userData, cancellationToken); + } + + /// + /// Push a received event payload to the configured destinations. + /// + /// The name of the event group that this event belongs to. + /// The event. + /// The payload to push to the configured destinations. + /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. + /// Cancellation token. + public async Task ForwardReceivedEventAsync(string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + { + await _connector.ForwardReceivedEventAsync(_deviceName, _device, _inboundEndpointName, _assetName, _asset, eventGroupName, assetEvent, serializedPayload, userData, cancellationToken); + } + + public MessageSchemaReference? GetRegisteredDatasetMessageSchema(string datasetName) + { + return _connector.GetRegisteredDatasetMessageSchema(_deviceName, _inboundEndpointName, _assetName, datasetName); + } + + public MessageSchemaReference? GetRegisteredEventMessageSchema(string eventGroupName, string eventName) + { + return _connector.GetRegisteredEventMessageSchema(_deviceName, _inboundEndpointName, _assetName, eventGroupName, eventName); + } + + + public void Dispose() + { + try + { + _semaphore.Dispose(); + } + catch (ObjectDisposedException) + { + // It's fine if this sempahore is already disposed. + } + } + + /// + /// Update the status of this asset in the Azure Device Registry service + /// + /// The status of this asset and its datasets/event groups/streams/management groups + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service + /// + /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure + /// Device Registry service with what is provided. + /// + private async Task UpdateAssetStatusAsync( + AssetStatus status, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.UpdateAssetStatusAsync( + _deviceName, + _inboundEndpointName, + new UpdateAssetStatusRequest() + { + AssetName = _assetName, + AssetStatus = status, + }, + commandTimeout, + cancellationToken); + } + + /// + /// Get the status of this asset from the Azure Device Registry service + /// + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service + private async Task GetAssetStatusAsync( + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.GetAssetStatusAsync( + _deviceName, + _inboundEndpointName, + _assetName, + commandTimeout, + cancellationToken); + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs index d171e300da..a735e72162 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/ConnectorWorker.cs @@ -3,7 +3,6 @@ using System.Collections.Concurrent; using System.Data; -using System.Collections.Generic; using System.Text; using Azure.Iot.Operations.Connector.ConnectorConfigurations; using Azure.Iot.Operations.Connector.Exceptions; @@ -46,7 +45,7 @@ public class ConnectorWorker : ConnectorBackgroundService // keys are "{composite device name}_{asset name}_{dataset name}. The value is the message schema registered for that device's asset's dataset private readonly ConcurrentDictionary _registeredDatasetMessageSchemas = new(); - // keys are "{composite device name}_{asset name}_{event name}. The value is the message schema registered for that device's asset's event + // keys are "{composite device name}_{asset name}_{event group name}_{event name}. The value is the message schema registered for that device's asset's event private readonly ConcurrentDictionary _registeredEventMessageSchemas = new(); /// @@ -257,23 +256,76 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) await _mqttClient.DisconnectAsync(null, CancellationToken.None); } - /// - /// Push a sampled dataset to the configured destinations. - /// - /// The name of the device that this dataset belongs to - /// The name of the inbound endpoint that this dataset belongs to - /// The asset that the dataset belongs to. - /// The name of the asset that the dataset belongs to - /// The dataset that was sampled. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEndpointName, Asset asset, string assetName, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + public MessageSchemaReference? GetRegisteredDatasetMessageSchema(string deviceName, string inboundEndpointName, string assetName, string datasetName) + { + if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{datasetName}", out Schema? schema)) + { + return new MessageSchemaReference() + { + SchemaName = schema.Name, + SchemaRegistryNamespace = schema.Namespace, + SchemaVersion = schema.Version, + }; + } + + return null; + } + + public MessageSchemaReference? GetRegisteredEventMessageSchema(string deviceName, string inboundEndpointName, string assetName, string eventGroupName, string eventName) + { + if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{eventName}", out Schema? schema)) + { + return new MessageSchemaReference() + { + SchemaName = schema.Name, + SchemaRegistryNamespace = schema.Namespace, + SchemaVersion = schema.Version, + }; + } + + return null; + } + + // Called by AssetClient instances + internal async Task ForwardSampledDatasetAsync(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, AssetDataset dataset, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); CloudEvent? cloudEvent = null; - if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", out var registeredDatasetMessageSchema)) + Schema? registeredDatasetMessageSchema = null; + if (!_registeredDatasetMessageSchemas.ContainsKey($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}")) + { + // This may register a message schema that has already been uploaded, but the schema registry service is idempotent + var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, dataset.Name!, dataset); + if (datasetMessageSchema != null) + { + try + { + _logger.LogInformation($"Registering message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); + registeredDatasetMessageSchema = await schemaRegistryClient.PutAsync( + datasetMessageSchema.SchemaContent, + datasetMessageSchema.SchemaFormat, + datasetMessageSchema.SchemaType, + datasetMessageSchema.Version ?? "1", + datasetMessageSchema.Tags); + + _logger.LogInformation($"Registered message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); + + _registeredDatasetMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", registeredDatasetMessageSchema); + } + catch (Exception ex) + { + _logger.LogError($"Failed to register message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); + } + } + else + { + _logger.LogInformation($"No message schema will be registered for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + } + } + + if (_registeredDatasetMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", out registeredDatasetMessageSchema)) { if (Uri.IsWellFormedUriString(inboundEndpointName, UriKind.RelativeOrAbsolute)) { @@ -285,7 +337,7 @@ public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEn } } - _logger.LogInformation($"Received sampled payload from dataset with name {dataset.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker: {Encoding.UTF8.GetString(serializedPayload)}"); + _logger.LogInformation($"Received sampled payload from dataset with name {dataset.Name} in asset with name {assetName}. Now publishing it to MQTT broker: {Encoding.UTF8.GetString(serializedPayload)}"); if (dataset.Destinations == null) { @@ -299,7 +351,7 @@ public async Task ForwardSampledDatasetAsync(string deviceName, string inboundEn { var topic = destination.Configuration.Topic ?? throw new AssetConfigurationException( - $"Dataset with name {dataset.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); + $"Dataset with name {dataset.Name} in asset with name {assetName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); var messageMetadata = new OutgoingTelemetryMetadata { @@ -362,22 +414,12 @@ await telemetrySender.SendTelemetryAsync( } } - /// - /// Push a received event payload to the configured destinations. - /// - /// The name of the device that this event belongs to - /// The name of the inbound endpoint that this event belongs to - /// The asset that this event came from. - /// The name of the asset that this event belongs to. - /// The event. - /// The payload to push to the configured destinations. - /// Optional headers to include in the telemetry. Only applicable for datasets with a destination of the MQTT broker. - /// Cancellation token. - public async Task ForwardReceivedEventAsync(string deviceName, string inboundEndpointName, Asset asset, string assetName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) + // Called by AssetClient instances + internal async Task ForwardReceivedEventAsync(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, string eventGroupName, AssetEvent assetEvent, byte[] serializedPayload, Dictionary? userData = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); - _logger.LogInformation($"Received event with name {assetEvent.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker."); + _logger.LogInformation($"Received event with name {assetEvent.Name} in event group with name {eventGroupName} in asset with name {assetName}. Now publishing it to MQTT broker."); if (assetEvent.Destinations == null) { @@ -385,8 +427,41 @@ public async Task ForwardReceivedEventAsync(string deviceName, string inboundEnd return; } + Schema? registeredEventMessageSchema = null; + if (!_registeredEventMessageSchemas.ContainsKey($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}")) + { + // This may register a message schema that has already been uploaded, but the schema registry service is idempotent + var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, assetEvent.Name, assetEvent); + if (eventMessageSchema != null) + { + try + { + _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} in event group with name {eventGroupName} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); + registeredEventMessageSchema = await schemaRegistryClient.PutAsync( + eventMessageSchema.SchemaContent, + eventMessageSchema.SchemaFormat, + eventMessageSchema.SchemaType, + eventMessageSchema.Version ?? "1", + eventMessageSchema.Tags); + + _logger.LogInformation($"Registered message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); + + _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent.Name}", registeredEventMessageSchema); + } + catch (Exception ex) + { + _logger.LogError($"Failed to register message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); + } + } + else + { + _logger.LogInformation($"No message schema will be registered for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); + } + } + CloudEvent? cloudEvent = null; - if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEvent.Name}", out var registeredEventMessageSchema)) + if (_registeredEventMessageSchemas.TryGetValue($"{deviceName}_{inboundEndpointName}_{assetName}_{eventGroupName}_{assetEvent}", out registeredEventMessageSchema)) { if (Uri.IsWellFormedUriString(inboundEndpointName, UriKind.RelativeOrAbsolute)) { @@ -404,7 +479,7 @@ public async Task ForwardReceivedEventAsync(string deviceName, string inboundEnd { string topic = destination.Configuration.Topic ?? throw new AssetConfigurationException( - $"Dataset with name {assetEvent.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); + $"Dataset with name {assetEvent.Name} in asset with name {assetName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); var messageMetadata = new OutgoingTelemetryMetadata { @@ -483,7 +558,7 @@ private async void OnAssetChanged(object? _, AssetChangedEventArgs args) if (args.ChangeType == ChangeType.Created) { _logger.LogInformation("Asset with name {0} created on endpoint with name {1} on device with name {2}", args.AssetName, args.InboundEndpointName, args.DeviceName); - await AssetAvailableAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); + AssetAvailable(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); _adrClient!.ObserveAssets(args.DeviceName, args.InboundEndpointName); } else if (args.ChangeType == ChangeType.Deleted) @@ -499,7 +574,7 @@ private async void OnAssetChanged(object? _, AssetChangedEventArgs args) { _logger.LogInformation("Asset with name {0} updated on endpoint with name {1} on device with name {2}", args.AssetName, args.InboundEndpointName, args.DeviceName); await AssetUnavailableAsync(args.DeviceName, args.InboundEndpointName, args.AssetName, true); - await AssetAvailableAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); + AssetAvailable(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName); } } @@ -521,7 +596,7 @@ private async void OnDeviceChanged(object? _, DeviceChangedEventArgs args) { try { - await WhileDeviceIsAvailable.Invoke(new(args.Device, args.InboundEndpointName, _leaderElectionClient), deviceTaskCancellationTokenSource.Token); + await WhileDeviceIsAvailable.Invoke(new(args.DeviceName, args.Device, args.InboundEndpointName, _leaderElectionClient, _adrClient!), deviceTaskCancellationTokenSource.Token); } catch (OperationCanceledException) { @@ -587,7 +662,7 @@ private async Task DeviceUnavailableAsync(DeviceChangedEventArgs args, string co } } - private async Task AssetAvailableAsync(string deviceName, string inboundEndpointName, Asset? asset, string assetName) + private void AssetAvailable(string deviceName, string inboundEndpointName, Asset? asset, string assetName) { string compoundDeviceName = $"{deviceName}_{inboundEndpointName}"; @@ -618,88 +693,11 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { _logger.LogInformation($"Asset with name {assetName} has no datasets to sample"); } - else - { - foreach (var dataset in asset.Datasets) - { - // This may register a message schema that has already been uploaded, but the schema registry service is idempotent - var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, dataset.Name!, dataset); - if (datasetMessageSchema != null) - { - try - { - _logger.LogInformation($"Registering message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); - var registeredDatasetMessageSchema = await schemaRegistryClient.PutAsync( - datasetMessageSchema.SchemaContent, - datasetMessageSchema.SchemaFormat, - datasetMessageSchema.SchemaType, - datasetMessageSchema.Version ?? "1", - datasetMessageSchema.Tags); - - _logger.LogInformation($"Registered message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); - - _registeredDatasetMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{dataset.Name}", registeredDatasetMessageSchema); - } - catch (Exception ex) - { - _logger.LogError($"Failed to register message schema for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); - } - } - else - { - _logger.LogInformation($"No message schema will be registered for dataset with name {dataset.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - } - } - } if (asset.EventGroups == null) { _logger.LogInformation($"Asset with name {assetName} has no events to listen for"); } - else - { - foreach (var assetEventGroup in asset.EventGroups) - { - if (assetEventGroup.Events == null) - { - _logger.LogInformation($"Event group with name {assetEventGroup.Name} has no events to register message schemas for"); - continue; - } - - foreach (var assetEvent in assetEventGroup.Events) - { - // This may register a message schema that has already been uploaded, but the schema registry service is idempotent - var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(device, asset, assetEvent.Name, assetEvent); - if (eventMessageSchema != null) - { - try - { - _logger.LogInformation($"Registering message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); - var registeredEventSchema = await schemaRegistryClient.PutAsync( - eventMessageSchema.SchemaContent, - eventMessageSchema.SchemaFormat, - eventMessageSchema.SchemaType, - eventMessageSchema.Version ?? "1", - eventMessageSchema.Tags); - - _logger.LogInformation($"Registered message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}."); - - _registeredEventMessageSchemas.TryAdd($"{deviceName}_{inboundEndpointName}_{assetName}_{assetEvent.Name}", registeredEventSchema); - } - catch (Exception ex) - { - _logger.LogError($"Failed to register message schema for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}. Error: {ex.Message}"); - } - } - else - { - _logger.LogInformation($"No message schema will be registered for event with name {assetEvent.Name} on asset with name {assetName} associated with device with name {deviceName} and inbound endpoint name {inboundEndpointName}"); - } - } - } - } if (WhileAssetIsAvailable != null) { @@ -710,7 +708,8 @@ private async Task AssetAvailableAsync(string deviceName, string inboundEndpoint { try { - await WhileAssetIsAvailable.Invoke(new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient), assetTaskCancellationTokenSource.Token); + using AssetAvailableEventArgs args = new(deviceName, device, inboundEndpointName, assetName, asset, _leaderElectionClient, _adrClient!, this); + await WhileAssetIsAvailable.Invoke(args, assetTaskCancellationTokenSource.Token); } catch (OperationCanceledException) { diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs index e228824741..b7db776e57 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceAvailableEventArgs.cs @@ -8,8 +8,19 @@ namespace Azure.Iot.Operations.Connector { public class DeviceAvailableEventArgs : EventArgs { + /// + /// The name of this device. + /// + public string DeviceName { get; } + + /// + /// This device. + /// public Device Device { get; } + /// + /// The name of the endpoint that became available on this device. + /// public string InboundEndpointName { get; } /// @@ -26,11 +37,18 @@ public class DeviceAvailableEventArgs : EventArgs /// public ILeaderElectionClient? LeaderElectionClient { get; } - internal DeviceAvailableEventArgs(Device device, string inboundEndpointName, ILeaderElectionClient? leaderElectionClient) + /// + /// The client to use to send status updates for this device with. + /// + public DeviceEndpointClient DeviceEndpointClient { get; } + + internal DeviceAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrclient) { + DeviceName = deviceName; Device = device; InboundEndpointName = inboundEndpointName; LeaderElectionClient = leaderElectionClient; + DeviceEndpointClient = new(adrclient, deviceName, inboundEndpointName); } } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs new file mode 100644 index 0000000000..7a348257e9 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Connector/DeviceEndpointClient.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Threading; +using Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +namespace Azure.Iot.Operations.Connector +{ + /// + /// A client for reporting the status of this device and its endpoint + /// + public class DeviceEndpointClient + { + private readonly IAdrClientWrapper _adrClient; + private readonly string _deviceName; + private readonly string _inboundEndpointName; + + // Used to make getAndUpdate calls behave atomically. Also respected by get and update methods so that a user + // does not accidentally update a device while another thread is in the middle of a getAndUpdate call. + private readonly SemaphoreSlim _semaphore = new(0, 1); + + internal DeviceEndpointClient(IAdrClientWrapper adrClient, string deviceName, string inboundEndpointName) + { + _adrClient = adrClient; + _deviceName = deviceName; + _inboundEndpointName = inboundEndpointName; + } + + /// + /// Get the current status of this device and then optionally update it. + /// + /// The function that determines the new device status when given the current device status. + /// Cancellation token. + /// The timeout for each of the 'get' and 'update' commands. + /// The latest device status after this operation. + /// + /// If after retrieving the current status, you don't want to send any updates, should return null. + /// If this happens, this function will return the latest asset status without trying to update it. + /// + /// This method uses a semaphore to ensure that this same client doesn't accidentally update the device status while + /// another thread is in the middle of updating the same device. This ensures that the current device status provided in + /// stays accurate while any updating occurs. + /// + public async Task GetAndUpdateDeviceStatusAsync(Func handler, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + await _semaphore.WaitAsync(cancellationToken); + try + { + DeviceStatus currentStatus = await GetDeviceStatusAsync(commandTimeout, cancellationToken); + DeviceStatus? desiredStatus = handler.Invoke(currentStatus); + + if (desiredStatus != null) + { + return await UpdateDeviceStatusAsync(desiredStatus, commandTimeout, cancellationToken); + } + + return currentStatus; + } + finally + { + _semaphore.Release(); + } + } + + /// + /// Update the status of a specific device in the Azure Device Registry service + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// The new status of the device. + /// Optional timeout for the command. + /// Optional cancellation token. + /// The status returned by the Azure Device Registry service + /// + /// This update call will act as a 'patch' for all endpoint-level statuses, but will act as a 'put' for the device-level status. + /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to + /// report without needing to include the existing status of previously reported endpoints. + /// + private async Task UpdateDeviceStatusAsync( + DeviceStatus status, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.UpdateDeviceStatusAsync( + _deviceName, + _inboundEndpointName, + status, + commandTimeout, + cancellationToken); + } + + /// + /// Get the status of this device from the Azure Device Registry service + /// + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service + private async Task GetDeviceStatusAsync( + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default) + { + return await _adrClient.GetDeviceStatusAsync( + _deviceName, + _inboundEndpointName, + commandTimeout, + cancellationToken); + } + } +} diff --git a/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs b/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs index dcb0a1c86f..f21c5a7d61 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/Files/AssetFileMonitor.cs @@ -190,11 +190,8 @@ public void ObserveDevices() /// public void UnobserveDevices() { - if (_deviceDirectoryMonitor != null) - { - _deviceDirectoryMonitor.Stop(); - _deviceDirectoryMonitor = null; - } + _deviceDirectoryMonitor?.Stop(); + _deviceDirectoryMonitor = null; } /// diff --git a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs index 5a4755bb07..592660ecfa 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/IAdrClientWrapper.cs @@ -94,15 +94,20 @@ public interface IAdrClientWrapper : IAsyncDisposable /// The names of all available devices IEnumerable GetDeviceNames(); + /// - /// Updates the status of a specific asset. + /// Update the status of a specific asset in the Azure Device Registry service /// - /// The name of the device. - /// The name of the inbound endpoint. - /// The request containing asset status update parameters. - /// Optional timeout for the command. - /// Optional cancellation token. - /// A task that represents the asynchronous operation, containing the updated asset details. + /// The name of the device the asset belongs to. + /// The name of the inbound endpoint the asset belongs to. + /// The status of this asset and its datasets/event groups/streams/management groups. + /// The timeout for this RPC command invocation. + /// The cancellation token. + /// The status returned by the Azure Device Registry service. + /// + /// This update behaves like a 'put' in that it will replace all current state for this asset in the Azure + /// Device Registry service with what is provided. + /// Task UpdateAssetStatusAsync( string deviceName, string inboundEndpointName, @@ -111,14 +116,19 @@ Task UpdateAssetStatusAsync( CancellationToken cancellationToken = default); /// - /// Updates the status of a specific device. + /// Update the status of a specific device in the Azure Device Registry service /// /// The name of the device. /// The name of the inbound endpoint. /// The new status of the device. /// Optional timeout for the command. /// Optional cancellation token. - /// A task that represents the asynchronous operation, containing the updated device details. + /// The status returned by the Azure Device Registry service + /// + /// This update call will act as a patch for all endpoint level statuses, but will act as a put for the device-level status. + /// That means that, for devices with multiple endpoints, you can safely call this method when each endpoint has a status to + /// report without needing to include the existing status of previously reported endpoints. + /// Task UpdateDeviceStatusAsync( string deviceName, string inboundEndpointName, @@ -154,5 +164,34 @@ Task CreateOrUpdateDiscoveredDevi string inboundEndpointType, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default); + + /// + /// Retrieves the status of a specific device. + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// Optional timeout for the command. + /// Optional cancellation token. + /// A task that represents the asynchronous operation, containing the device status. + Task GetDeviceStatusAsync( + string deviceName, + string inboundEndpointName, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default); + + /// + /// Retrieves the status of a specific asset. + /// + /// The name of the device. + /// The name of the inbound endpoint. + /// The name of the asset. + /// Optional timeout for the command. + /// Optional cancellation token. + /// A task that represents the asynchronous operation, containing the asset status. + Task GetAssetStatusAsync(string deviceName, + string inboundEndpointName, + string assetName, + TimeSpan? commandTimeout = null, + CancellationToken cancellationToken = default); } } diff --git a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs index c3376fb0ea..71fc64b6f5 100644 --- a/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs +++ b/dotnet/src/Azure.Iot.Operations.Connector/PollingTelemetryConnectorWorker.cs @@ -15,33 +15,32 @@ public class PollingTelemetryConnectorWorker : ConnectorWorker public PollingTelemetryConnectorWorker(ApplicationContext applicationContext, ILogger logger, IMqttClient mqttClient, IDatasetSamplerFactory datasetSamplerFactory, IMessageSchemaProvider messageSchemaFactory, IAdrClientWrapperProvider adrClientFactory, IConnectorLeaderElectionConfigurationProvider? leaderElectionConfigurationProvider = null) : base(applicationContext, logger, mqttClient, messageSchemaFactory, adrClientFactory, leaderElectionConfigurationProvider) { + base.WhileDeviceIsAvailable = WhileDeviceAvailableAsync; base.WhileAssetIsAvailable = WhileAssetAvailableAsync; _datasetSamplerFactory = datasetSamplerFactory; } - public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) + public async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken) { - cancellationToken.ThrowIfCancellationRequested(); - - // Skip sampling if the device is explicitly disabled (Enabled is false). Undefined (null) value is treated as enabled. - if (args.Device.Enabled != true && args.Device.Enabled != null) + try { - _logger.LogWarning("Device {0} is disabled. Skipping asset {1} sampling until device is enabled.", args.DeviceName, args.AssetName); - // Note: When the device is updated, ConnectorWorker will automatically cancel this handler - // and reinvoke it with the updated device information if it becomes enabled. - return; + // Report device status is okay + _logger.LogInformation("Reporting device status as okay to Azure Device Registry service..."); + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new(); + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + return currentDeviceStatus; + }, null, cancellationToken); } - - // Skip sampling if the device is explicitly disabled (Enabled is false). Undefined (null) value is treated as enabled. - if (args.Device.Enabled != true && args.Device.Enabled != null) + catch (Exception e) { - _logger.LogWarning("Asset {0} is disabled. Skipping sampling until asset is enabled.", args.AssetName); - // Note: When the asset is updated, ConnectorWorker will automatically cancel this handler - // and reinvoke it with the updated asset information if it becomes enabled. - return; + _logger.LogError(e, "Failed to report device status to Azure Device Registry service"); } + } - _logger.LogInformation("Starting to sample enabled asset {0} on enabled device {1}", args.AssetName, args.DeviceName); + public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); if (args.Asset.Datasets == null) { @@ -50,8 +49,10 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell Dictionary datasetsTimers = new(); _assetsSamplingTimers[args.AssetName] = datasetsTimers; - foreach (AssetDataset dataset in args.Asset.Datasets!) + Dictionary okayStatusReportedByDataset = new(); + foreach (AssetDataset dataset in args.Asset.Datasets!) //TODO only send status once per dataset { + okayStatusReportedByDataset.Add(dataset.Name, false); EndpointCredentials? credentials = null; if (args.Device.Endpoints != null && args.Device.Endpoints.Inbound != null @@ -71,11 +72,59 @@ public async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancell try { byte[] sampledData = await datasetSampler.SampleDatasetAsync(dataset); - await ForwardSampledDatasetAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName, dataset, sampledData); + await args.AssetClient.ForwardSampledDatasetAsync(dataset, sampledData); + + // Only report a status for this dataset if it hasn't been reported yet already + if (!okayStatusReportedByDataset[dataset.Name]) + { + try + { + // The dataset was sampled as expected, so report the asset status as okay + _logger.LogInformation("Reporting asset status as okay to Azure Device Registry service..."); + await args.AssetClient.GetAndUpdateAssetStatusAsync( + (currentAssetStatus) => + { + currentAssetStatus.Config ??= new(); + currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow; + currentAssetStatus.UpdateDatasetStatus(new AssetDatasetEventStreamStatus() + { + Name = dataset.Name, + MessageSchemaReference = args.AssetClient.GetRegisteredDatasetMessageSchema(dataset.Name), + Error = null + }); + return currentAssetStatus; + }, + null, + cancellationToken); + okayStatusReportedByDataset[dataset.Name] = true; + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report asset status to Azure Device Registry service"); + } + } } catch (Exception e) { _logger.LogError(e, "Failed to sample the dataset"); + + try + { + await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => { + currentDeviceStatus.Config ??= new ConfigStatus(); + currentDeviceStatus.Config.Error = + new ConfigError() + { + Message = $"Unable to sample the device. Error message: {e.Message}", + }; + currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow; + return currentDeviceStatus; + }, null, cancellationToken); + } + catch (Exception e2) + { + _logger.LogError(e2, "Failed to report device status to Azure Device Registry service"); + } } }, null, TimeSpan.FromSeconds(0), samplingInterval); diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs similarity index 84% rename from dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs rename to dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs index e799b5ba82..c764c775e3 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatusSchemaElement.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupActionStatus.cs @@ -1,6 +1,6 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; -public record AssetManagementGroupActionStatusSchemaElement +public record AssetManagementGroupActionStatus { public ConfigError? Error { get; set; } public required string Name { get; set; } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs new file mode 100644 index 0000000000..e0da076e37 --- /dev/null +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatus.cs @@ -0,0 +1,8 @@ +namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; + +public record AssetManagementGroupStatus +{ + public List? Actions { get; set; } + + public required string Name { get; set; } +} diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs deleted file mode 100644 index 4a681b7a71..0000000000 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetManagementGroupStatusSchemaElement.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; - -public record AssetManagementGroupStatusSchemaElement -{ - public List? Actions { get; set; } - - public required string Name { get; set; } -} diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs index 03f9234976..dcc919c231 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/AssetStatus.cs @@ -5,13 +5,201 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; public record AssetStatus { + /// + /// The status of the asset + /// + /// + /// This status is independent from the status of any nested event groups/datasets/streams. That means that, + /// even if a dataset has a config error, the asset status may still be okay. + /// public ConfigStatus? Config { get; set; } + /// + /// The status of all datasets associated with this asset (if it has any datasets). + /// + /// + /// Each dataset should only report its latest status. + /// public List? Datasets { get; set; } + /// + /// The status of all event groups associated with this asset (if it has any event groups). + /// + /// + /// Each event group should only report its latest status. + /// public List? EventGroups { get; set; } = default; - public List? ManagementGroups { get; set; } + /// + /// The status of all management groups associated with this asset (if it has any management groups). + /// + /// + /// Each management group should only report its latest status. + /// + public List? ManagementGroups { get; set; } + /// + /// The status of all streams associated with this asset (if it has any streams). + /// + /// + /// Each stream should only report its latest status. + /// public List? Streams { get; set; } + + /// + /// Update to replace any existing status for the dataset named in . + /// + /// The new status of the dataset. + /// + /// If the dataset has no status in yet, will be added. If the + /// dataset does have status in already, that status will be replaced entirely by . + /// + public void UpdateDatasetStatus(AssetDatasetEventStreamStatus newStatus) + { + Datasets ??= new(); + + foreach (AssetDatasetEventStreamStatus currentStatus in Datasets) + { + if (currentStatus.Name.Equals(newStatus.Name)) + { + // If the dataset status is already present in the list, update it in place + currentStatus.Error = newStatus.Error; + currentStatus.MessageSchemaReference = newStatus.MessageSchemaReference; + return; + } + } + + // If the dataset status did not exist in the list, just add it + Datasets.Add(newStatus); + } + + /// + /// Update to replace any existing status for the stream named in . + /// + /// The new status of the stream. + /// + /// If the stream has no status in yet, will be added. If the + /// stream does have status in already, that status will be replaced entirely by . + /// + public void UpdateStreamStatus(AssetDatasetEventStreamStatus newStatus) + { + Streams ??= new(); + + foreach (AssetDatasetEventStreamStatus currentStatus in Streams) + { + if (currentStatus.Name.Equals(newStatus.Name)) + { + // If the dataset status is already present in the list, update it in place + currentStatus.Error = newStatus.Error; + currentStatus.MessageSchemaReference = newStatus.MessageSchemaReference; + return; + } + } + + // If the dataset status did not exist in the list, just add it + Streams.Add(newStatus); + } + + /// + /// Remove any statuses related to the provided event group name from . + /// + /// The name of the event group to clear all statuses from. + public void ClearEventGroupStatus(string eventGroupName) + { + if (EventGroups != null) + { + List eventGroupStatusesToRemove = new(); + foreach (AssetEventGroupStatus eventGroupStatus in EventGroups) + { + if (eventGroupStatus.Name.Equals(eventGroupName)) + { + eventGroupStatusesToRemove.Add(eventGroupStatus); + } + } + + foreach (AssetEventGroupStatus eventGroupStatusToRemove in eventGroupStatusesToRemove) + { + EventGroups.Remove(eventGroupStatusToRemove); + } + } + } + + /// + /// Update to replace any existing status for the provided event group's event's status. + /// + /// The name of the event group that this event belongs to. + /// The new status of the event within this event group. + public void UpdateEventStatus(string eventGroupName, AssetDatasetEventStreamStatus eventNewStatus) + { + EventGroups ??= new(); + + foreach (AssetEventGroupStatus eventGroupStatus in EventGroups) + { + if (eventGroupStatus.Name.Equals(eventGroupName)) + { + eventGroupStatus.Events ??= new(); + foreach (AssetDatasetEventStreamStatus eventStatus in eventGroupStatus.Events) + { + if (eventStatus.Name.Equals(eventNewStatus.Name)) + { + // If the event group status exists and it contains this event's status, update it in place + eventStatus.Error = eventNewStatus.Error; + eventStatus.MessageSchemaReference = eventNewStatus.MessageSchemaReference; + return; + } + } + + // If the event group status exists, but no status was found for this event, just add it. + eventGroupStatus.Events.Add(eventNewStatus); + return; + } + } + + // If the event group status did not exist, just add it + EventGroups.Add(new AssetEventGroupStatus() + { + Name = eventGroupName, + Events = new List { eventNewStatus } + }); + } + + /// + /// Update to replace any existing status for the provided management group action. + /// + /// The name of the management group that this action belongs to. + /// The new status of this action. + public void UpdateManagementGroupStatus(string managementGroupName, AssetManagementGroupActionStatus actionNewStatus) + { + ManagementGroups ??= new(); + + foreach (AssetManagementGroupStatus managementGroupStatus in ManagementGroups) + { + if (managementGroupStatus.Name.Equals(managementGroupName)) + { + managementGroupStatus.Actions ??= new(); + foreach (AssetManagementGroupActionStatus actionStatus in managementGroupStatus.Actions) + { + if (actionStatus.Name.Equals(actionNewStatus.Name)) + { + // If the management group status exists and it contains this action's status, update it in place + actionStatus.Error = actionNewStatus.Error; + actionStatus.RequestMessageSchemaReference = actionNewStatus.RequestMessageSchemaReference; + actionStatus.ResponseMessageSchemaReference = actionNewStatus.ResponseMessageSchemaReference; + return; + } + } + + // If the management group status exists, but no status was found for this action, just add it. + managementGroupStatus.Actions.Add(actionNewStatus); + return; + } + } + + // If the management group status did not exist, just add it + ManagementGroups.Add(new AssetManagementGroupStatus() + { + Name = managementGroupName, + Actions = new List { actionNewStatus } + }); + } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs index 625381bd29..a9366ce912 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ConfigStatus.cs @@ -7,25 +7,21 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models public partial class ConfigStatus { /// - /// The 'error' Field. + /// The error that this entity encountered, if any error was encountered. /// - [JsonPropertyName("error")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + /// + /// If a device/endpoint/asset/dataset/etc has no errors to report, this value should be null. + /// public ConfigError? Error { get; set; } = default; /// /// A read only timestamp indicating the last time the configuration has been modified from the perspective of the current actual (Edge) state of the CRD. Edge would be the only writer of this value and would sync back up to the cloud. /// - [JsonPropertyName("lastTransitionTime")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public DateTime? LastTransitionTime { get; set; } = default; /// /// A read only incremental counter indicating the number of times the configuration has been modified from the perspective of the current actual (Edge) state of the CRD. Edge would be the only writer of this value and would sync back up to the cloud. In steady state, this should equal version. /// - [JsonPropertyName("version")] - [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public ulong? Version { get; set; } = default; - } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs index a4f689ea81..d95368d1f5 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/DeviceStatus.cs @@ -5,7 +5,20 @@ namespace Azure.Iot.Operations.Services.AssetAndDeviceRegistry.Models; public record DeviceStatus { + /// + /// The status of the device + /// public ConfigStatus? Config { get; set; } + /// + /// The statuses of each of the endpoints that belong to the device + /// public DeviceStatusEndpoint? Endpoints { get; set; } + + public void SetEndpointError(string inboundEndpointName, ConfigError endpointError) + { + Endpoints ??= new(); + Endpoints.Inbound ??= new(); + Endpoints.Inbound[inboundEndpointName].Error = endpointError; + } } diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs index d2f085ec2c..81d4868a6f 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ModelsConverter.cs @@ -461,18 +461,18 @@ internal static AssetDatasetEventStreamStatus ToModel(this AdrBaseService.AssetD }; } - internal static AssetManagementGroupStatusSchemaElement ToModel(this AssetManagementGroupStatusSchemaElementSchema source) + internal static AssetManagementGroupStatus ToModel(this AssetManagementGroupStatusSchemaElementSchema source) { - return new AssetManagementGroupStatusSchemaElement + return new AssetManagementGroupStatus { Name = source.Name, Actions = source.Actions?.Select(x => x.ToModel()).ToList(), }; } - internal static AssetManagementGroupActionStatusSchemaElement ToModel(this AssetManagementGroupActionStatusSchemaElementSchema source) + internal static AssetManagementGroupActionStatus ToModel(this AssetManagementGroupActionStatusSchemaElementSchema source) { - return new AssetManagementGroupActionStatusSchemaElement + return new AssetManagementGroupActionStatus { Error = source.Error?.ToModel(), Name = source.Name, diff --git a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs index e7e7e03447..47555aaf51 100644 --- a/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs +++ b/dotnet/src/Azure.Iot.Operations.Services/AssetAndDeviceRegistry/Models/ProtocolConverter.cs @@ -325,7 +325,7 @@ internal static AdrBaseService.AssetEventGroupStatusSchemaElementSchema ToProtoc }; } - internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this AssetManagementGroupStatusSchemaElement source) + internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this AssetManagementGroupStatus source) { return new AssetManagementGroupStatusSchemaElementSchema { @@ -334,7 +334,7 @@ internal static AssetManagementGroupStatusSchemaElementSchema ToProtocol(this As }; } - internal static AssetManagementGroupActionStatusSchemaElementSchema ToProtocol(this AssetManagementGroupActionStatusSchemaElement source) + internal static AssetManagementGroupActionStatusSchemaElementSchema ToProtocol(this AssetManagementGroupActionStatus source) { return new AssetManagementGroupActionStatusSchemaElementSchema { diff --git a/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs b/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs index 0bb59f1ffc..93c5d2ffb0 100644 --- a/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs +++ b/dotnet/test/Azure.Iot.Operations.Connector.UnitTests/MockAdrClientWrapper.cs @@ -89,6 +89,16 @@ public Task CreateOrUpdateDiscove return mockClientWrapper.Object.CreateOrUpdateDiscoveredDeviceAsync(request, inboundEndpointType, commandTimeout, cancellationToken); } + public Task GetDeviceStatusAsync(string deviceName, string inboundEndpointName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return mockClientWrapper.Object.GetDeviceStatusAsync(deviceName, inboundEndpointName, commandTimeout, cancellationToken); + } + + public Task GetAssetStatusAsync(string deviceName, string inboundEndpointName, string assetName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default) + { + return mockClientWrapper.Object.GetAssetStatusAsync(deviceName, inboundEndpointName, assetName, commandTimeout, cancellationToken); + } + public ValueTask DisposeAsync() { return ValueTask.CompletedTask;