Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,40 @@ public class EventDrivenTcpThermostatConnectorWorker : BackgroundService, IDispo
private readonly ILogger<EventDrivenTcpThermostatConnectorWorker> _logger;
private readonly ConnectorWorker _connector;

private const string InboundEndpointName = "my_tcp_endpoint";

public EventDrivenTcpThermostatConnectorWorker(ApplicationContext applicationContext, ILogger<EventDrivenTcpThermostatConnectorWorker> logger, ILogger<ConnectorWorker> connectorLogger, IMqttClient mqttClient, IMessageSchemaProvider datasetSamplerFactory, IAdrClientWrapperProvider adrClientFactory, IConnectorLeaderElectionConfigurationProvider leaderElectionConfigurationProvider)
{
_logger = logger;
_connector = new(applicationContext, connectorLogger, mqttClient, datasetSamplerFactory, adrClientFactory, leaderElectionConfigurationProvider)
{
WhileAssetIsAvailable = WhileAssetAvailableAsync
WhileAssetIsAvailable = WhileAssetAvailableAsync,
WhileDeviceIsAvailable = WhileDeviceAvailableAsync,
};
}

private async Task WhileDeviceAvailableAsync(DeviceAvailableEventArgs args, CancellationToken cancellationToken)
{
_logger.LogInformation("Device with name {0} is now available", args.DeviceName);

try
{
_logger.LogInformation("Reporting device status as okay to Azure Device Registry service...");
await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => {
currentDeviceStatus.Config ??= new();
currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentDeviceStatus.Endpoints ??= new();
currentDeviceStatus.Endpoints.Inbound ??= new();
currentDeviceStatus.Endpoints.Inbound[args.InboundEndpointName] ??= new();
return currentDeviceStatus;
}, null, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to report device status to Azure Device Registry service");
}
}

private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, CancellationToken cancellationToken)
{
_logger.LogInformation("Asset with name {0} is now sampleable", args.AssetName);
Expand Down Expand Up @@ -49,14 +74,44 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel

if (args.Asset.EventGroups == null || args.Asset.EventGroups.Count != 1)
{
_logger.LogError("Asset with name {0} does not have the expected event group", args.AssetName);
_logger.LogWarning("Asset with name {0} does not have the expected event group. No events will be received.", args.AssetName);

try
{
await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => {
currentAssetStatus.Config ??= new();
currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentAssetStatus.EventGroups ??= new();
currentAssetStatus.EventGroups.Clear();
return currentAssetStatus;
}, null, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to report device status to Azure Device Registry service");
}
return;
}

var eventGroup = args.Asset.EventGroups.First();
if (eventGroup.Events == null || eventGroup.Events.Count != 1)
{
_logger.LogError("Asset with name {0} does not have the expected event within its event group", args.AssetName);
_logger.LogWarning("Asset with name {0} does not have the expected event within its event group. No events will be received.", args.AssetName);

try
{
await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => {
currentAssetStatus.Config ??= new();
currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentAssetStatus.EventGroups ??= new();
currentAssetStatus.ClearEventGroupStatus(eventGroup.Name);
return currentAssetStatus;
}, null, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to report device status to Azure Device Registry service");
}
return;
}

Expand All @@ -66,14 +121,36 @@ private async Task WhileAssetAvailableAsync(AssetAvailableEventArgs args, Cancel
if (assetEvent.DataSource == null || !int.TryParse(assetEvent.DataSource, out int port))
{
// If the asset's has no event doesn't specify a port, then do nothing
_logger.LogInformation("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName);
_logger.LogError("Asset with name {0} has an event, but the event didn't configure a port, so the connector won't handle these events", args.AssetName);

try
{
_logger.LogWarning("Reporting asset status error to Azure Device Registry service...");
await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => {
currentAssetStatus.Config ??= new();
currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentAssetStatus.UpdateEventStatus(eventGroup.Name, new()
{
Name = assetEvent.Name,
Error = new ConfigError()
{
Message = "The configured event was either missing the expected port or had a non-integer value for the port",
}
});
return currentAssetStatus;
}, null, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to report device status to Azure Device Registry service");
}
return;
}

await OpenTcpConnectionAsync(args, assetEvent, port, cancellationToken);
await OpenTcpConnectionAsync(args, args.Asset.EventGroups.First().Name, assetEvent, port, cancellationToken);
}

private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEvent assetEvent, int port, CancellationToken cancellationToken)
private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, string eventGroupName, AssetEvent assetEvent, int port, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -87,12 +164,14 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve
return;
}

string host = args.Device.Endpoints.Inbound["my_tcp_endpoint"].Address.Split(":")[0];
string host = args.Device.Endpoints.Inbound[InboundEndpointName].Address.Split(":")[0];
_logger.LogInformation("Attempting to open TCP client with address {0} and port {1}", host, port);
using TcpClient client = new();
await client.ConnectAsync(host, port, cancellationToken);
await using NetworkStream stream = client.GetStream();

bool alreadyReportedAssetStatus = false;

try
{
while (!cancellationToken.IsCancellationRequested)
Expand All @@ -102,7 +181,34 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve
Array.Resize(ref buffer, bytesRead);

_logger.LogInformation("Received data from event with name {0} on asset with name {1}. Forwarding this data to the MQTT broker.", assetEvent.Name, args.AssetName);
await _connector.ForwardReceivedEventAsync(args.DeviceName, args.InboundEndpointName, args.Asset, args.AssetName, assetEvent, buffer, null, cancellationToken);
await args.AssetClient.ForwardReceivedEventAsync(eventGroupName, assetEvent, buffer, null, cancellationToken);

if (!alreadyReportedAssetStatus)
{
try
{
// Report status of the asset once the first event has been received and forwarded
_logger.LogInformation("Reporting asset status as okay to Azure Device Registry service...");

await args.AssetClient.GetAndUpdateAssetStatusAsync((currentAssetStatus) => {
currentAssetStatus.Config ??= new();
currentAssetStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentAssetStatus.UpdateEventStatus(eventGroupName, new()
{
Name = assetEvent.Name,
Error = null,
MessageSchemaReference = args.AssetClient.GetRegisteredEventMessageSchema(eventGroupName, assetEvent.Name)
});
return currentAssetStatus;
}, null, cancellationToken);

alreadyReportedAssetStatus = true;
}
catch (Exception e2)
{
_logger.LogError(e2, "Failed to report device status to Azure Device Registry service");
}
}
}
}
catch (Exception e)
Expand All @@ -115,6 +221,25 @@ private async Task OpenTcpConnectionAsync(AssetAvailableEventArgs args, AssetEve
{
_logger.LogError(e, "Failed to open TCP connection to asset");
}

try
{
await args.DeviceEndpointClient.GetAndUpdateDeviceStatusAsync((currentDeviceStatus) => {
currentDeviceStatus.Config ??= new();
currentDeviceStatus.Config.LastTransitionTime = DateTime.UtcNow;
currentDeviceStatus.SetEndpointError(
InboundEndpointName,
new ConfigError()
{
Message = "Unable to connect to the TCP endpoint. The connector will retry to connect."
});
return currentDeviceStatus;
}, null, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to report device status to Azure Device Registry service");
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions dotnet/src/Azure.Iot.Operations.Connector/AdrClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ public Task<CreateOrUpdateDiscoveredDeviceResponsePayload> CreateOrUpdateDiscove
return _client.CreateOrUpdateDiscoveredDeviceAsync(request, inboundEndpointType, commandTimeout, cancellationToken);
}

/// <inheritdoc/>
public Task<DeviceStatus> GetDeviceStatusAsync(string deviceName, string inboundEndpointName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default)
{
return _client.GetDeviceStatusAsync(deviceName, inboundEndpointName, commandTimeout, cancellationToken);
}

/// <inheritdoc/>
public Task<AssetStatus> GetAssetStatusAsync(string deviceName, string inboundEndpointName, string assetName, TimeSpan? commandTimeout = null, CancellationToken cancellationToken = default)
{
return _client.GetAssetStatusAsync(deviceName, inboundEndpointName, assetName, commandTimeout, cancellationToken);
}

/// <inheritdoc/>
public ValueTask DisposeAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ namespace Azure.Iot.Operations.Connector
/// <summary>
/// The event args for when an asset becomes available to sample.
/// </summary>
public class AssetAvailableEventArgs : EventArgs
public class AssetAvailableEventArgs : EventArgs, IDisposable
{
/// <summary>
/// The name of the device that this asset belongs to.
/// </summary>
public string DeviceName { get; }

/// <summary>
/// The device that this asset belongs to.
/// </summary>
public Device Device { get; }

/// <summary>
/// The name of the endpoint that this asset belongs to.
/// </summary>
public string InboundEndpointName { get; }

/// <summary>
Expand All @@ -41,14 +50,31 @@ public class AssetAvailableEventArgs : EventArgs
/// </remarks>
public ILeaderElectionClient? LeaderElectionClient { get; }

internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient)
/// <summary>
/// The client to use to send status updates for assets on and to use to forward sampled datasets/received events with.
/// </summary>
public AssetClient AssetClient { get; }

/// <summary>
/// The client to use to send status updates for this asset's device on.
/// </summary>
public DeviceEndpointClient DeviceEndpointClient { get; }

internal AssetAvailableEventArgs(string deviceName, Device device, string inboundEndpointName, string assetName, Asset asset, ILeaderElectionClient? leaderElectionClient, IAdrClientWrapper adrClient, ConnectorWorker connector)
{
DeviceName = deviceName;
Device = device;
InboundEndpointName = inboundEndpointName;
AssetName = assetName;
Asset = asset;
LeaderElectionClient = leaderElectionClient;
AssetClient = new(adrClient, deviceName, inboundEndpointName, assetName, connector, device, asset);
DeviceEndpointClient = new(adrClient, deviceName, inboundEndpointName);
}

public void Dispose()
{
AssetClient.Dispose();
}
}
}
Loading
Loading