Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
<Content Include="..\..\..\eng\dtdl\aep-type-based-operations.json">
<Link>AssetAndDeviceRegistry\aep-type-based-operations.json</Link>
</Content>
<Content Include="..\..\..\eng\dtdl\akri-observability-metrics-operations.json">
<Link>Observability\akri-observability-metrics-operations.json</Link>
</Content>
<Content Include="..\..\..\eng\dtdl\device-name-based-operations.json">
<Link>AssetAndDeviceRegistry\device-name-based-operations.json</Link>
</Content>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.2.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Iot.Operations.Services", "Azure.Iot.Operations.Services.csproj", "{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5D7B8265-62A7-4E82-9C32-536AADFE007E}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */

#nullable enable

namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
{
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

/// <summary>
/// Enumerates structured error types for failed metric operations.
/// </summary>
[JsonConverter(typeof(JsonStringEnumMemberConverter))]
[System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
public enum AkriMetricErrorKind
{
[EnumMember(Value = @"ConflictingMetricDefinition")]
ConflictingMetricDefinition = 0,
[EnumMember(Value = @"DuplicateOperationId")]
DuplicateOperationId = 1,
[EnumMember(Value = @"InternalError")]
InternalError = 2,
[EnumMember(Value = @"TooManyUniqueLabelSets")]
TooManyUniqueLabelSets = 3,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */

#nullable enable

namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
{
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;
using Azure.Iot.Operations.Services.Observability;

/// <summary>
/// Represents the result of a single metric operation, including structured error information if applicable.
/// </summary>
[System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
public partial class AkriMetricOperationResponse : IJsonOnDeserialized, IJsonOnSerializing
{
/// <summary>
/// Structured classification of the error type, included only if status is Error.
/// </summary>
[JsonPropertyName("errorKind")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public AkriMetricErrorKind? ErrorKind { get; set; } = default;

/// <summary>
/// Optional human-readable description of the error.
/// </summary>
[JsonPropertyName("errorMessage")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string? ErrorMessage { get; set; } = default;

/// <summary>
/// ID of the operation this response refers to.
/// </summary>
[JsonPropertyName("operationId")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
[JsonRequired]
public string OperationId { get; set; } = default!;

/// <summary>
/// Optional name of the field or conceptual field associated with the error.
/// </summary>
[JsonPropertyName("propertyName")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string? PropertyName { get; set; } = default;

/// <summary>
/// Status of the operation: either Success or Error.
/// </summary>
[JsonPropertyName("status")]
[JsonIgnore(Condition = JsonIgnoreCondition.Never)]
[JsonRequired]
public AkriMetricOperationResponseStatus Status { get; set; } = default!;

void IJsonOnDeserialized.OnDeserialized()
{
if (OperationId is null)
{
throw new ArgumentNullException("operationId field cannot be null");
}
}

void IJsonOnSerializing.OnSerializing()
{
if (OperationId is null)
{
throw new ArgumentNullException("operationId field cannot be null");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */

#nullable enable

namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
{
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

/// <summary>
/// Defines possible statuses of a metric operation.
/// </summary>
[JsonConverter(typeof(JsonStringEnumMemberConverter))]
[System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
public enum AkriMetricOperationResponseStatus
{
[EnumMember(Value = @"Error")]
Error = 0,
[EnumMember(Value = @"Success")]
Success = 1,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */

#nullable enable

namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Iot.Operations.Protocol.Models;
using Azure.Iot.Operations.Protocol;
using Azure.Iot.Operations.Protocol.RPC;
using Azure.Iot.Operations.Protocol.Telemetry;
using Azure.Iot.Operations.Services.Observability;

[CommandTopic("akri/observability/{ex:connectorClientId}/metrics")]
[System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
public static partial class AkriObservabilityService
{
public abstract partial class Service : IAsyncDisposable
{
private ApplicationContext applicationContext;
private IMqttPubSubClient mqttClient;
private readonly PublishMetricsCommandExecutor publishMetricsCommandExecutor;

/// <summary>
/// Construct a new instance of this service.
/// </summary>
/// <param name="applicationContext">The shared context for your application.</param>
/// <param name="mqttClient">The MQTT client to use.</param>
/// <param name="topicTokenMap">
/// The topic token replacement map to use for all operations by default. Generally, this will include the token values
/// for topic tokens such as "modelId" which should be the same for the duration of this service's lifetime. Note that
/// additional topic tokens can be specified when starting the service with <see cref="StartAsync(Dictionary{string, string}?, int?, CancellationToken)"/> and
/// can be specified per-telemetry message.
/// </param>
public Service(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, Dictionary<string, string>? topicTokenMap = null)
{
this.applicationContext = applicationContext;
this.mqttClient = mqttClient;

string? clientId = this.mqttClient.ClientId;
if (string.IsNullOrEmpty(clientId))
{
throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before invoking command.");
}

this.publishMetricsCommandExecutor = new PublishMetricsCommandExecutor(applicationContext, mqttClient) { OnCommandReceived = PublishMetricsInt };

if (topicTokenMap != null)
{
foreach (string topicTokenKey in topicTokenMap.Keys)
{
this.publishMetricsCommandExecutor.TopicTokenMap.TryAdd("ex:" + topicTokenKey, topicTokenMap[topicTokenKey]);
}
}

this.publishMetricsCommandExecutor.TopicTokenMap.TryAdd("executorId", clientId);
}

public PublishMetricsCommandExecutor PublishMetricsCommandExecutor { get => this.publishMetricsCommandExecutor; }

public abstract Task<ExtendedResponse<PublishMetricsResponsePayload>> PublishMetricsAsync(PublishMetricsRequestPayload request, CommandRequestMetadata requestMetadata, CancellationToken cancellationToken);

/// <summary>
/// Begin accepting command invocations for all command executors.
/// </summary>
/// <param name="additionalTopicTokenMap">
/// The topic token replacements to use in addition to any topic tokens specified in the constructor. If this map
/// contains any keys that topic tokens provided in the constructor also has, then values specified in this map will take precedence.
/// </param>
/// <param name="preferredDispatchConcurrency">The dispatch concurrency count for the command response cache to use.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <remarks>
/// Specifying custom topic tokens in <paramref name="additionalTopicTokenMap"/> allows you to make command executors only
/// accept commands over a specific topic.
///
/// Note that a given command executor can only be started with one set of topic token replacements. If you want a command executor
/// to only handle commands for several specific sets of topic token values (as opposed to all possible topic token values), then you will
/// instead need to create a command executor per topic token set.
/// </remarks>
public async Task StartAsync(int? preferredDispatchConcurrency = null, CancellationToken cancellationToken = default)
{
string? clientId = this.mqttClient.ClientId;
if (string.IsNullOrEmpty(clientId))
{
throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before starting service.");
}

await Task.WhenAll(
this.publishMetricsCommandExecutor.StartAsync(preferredDispatchConcurrency, cancellationToken)).ConfigureAwait(false);
}

public async Task StopAsync(CancellationToken cancellationToken = default)
{
await Task.WhenAll(
this.publishMetricsCommandExecutor.StopAsync(cancellationToken)).ConfigureAwait(false);
}

private async Task<ExtendedResponse<PublishMetricsResponsePayload>> PublishMetricsInt(ExtendedRequest<PublishMetricsRequestPayload> req, CancellationToken cancellationToken)
{
ExtendedResponse<PublishMetricsResponsePayload> extended = await this.PublishMetricsAsync(req.Request!, req.RequestMetadata!, cancellationToken);
return new ExtendedResponse<PublishMetricsResponsePayload> { Response = extended.Response, ResponseMetadata = extended.ResponseMetadata };
}

public async ValueTask DisposeAsync()
{
await this.publishMetricsCommandExecutor.DisposeAsync().ConfigureAwait(false);
}

public async ValueTask DisposeAsync(bool disposing)
{
await this.publishMetricsCommandExecutor.DisposeAsync(disposing).ConfigureAwait(false);
}
}

public abstract partial class Client : IAsyncDisposable
{
private ApplicationContext applicationContext;
private IMqttPubSubClient mqttClient;
private readonly PublishMetricsCommandInvoker publishMetricsCommandInvoker;

/// <summary>
/// Construct a new instance of this client.
/// </summary>
/// <param name="applicationContext">The shared context for your application.</param>
/// <param name="mqttClient">The MQTT client to use.</param>
/// <param name="topicTokenMap">
/// The topic token replacement map to use for all operations by default. Generally, this will include the token values
/// for topic tokens such as "modelId" which should be the same for the duration of this client's lifetime. Note that
/// additional topic tokens can be specified when starting the client with <see cref="StartAsync(Dictionary{string, string}?, int?, CancellationToken)"/>.
/// </param>
public Client(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, Dictionary<string, string>? topicTokenMap = null)
{
this.applicationContext = applicationContext;
this.mqttClient = mqttClient;

this.publishMetricsCommandInvoker = new PublishMetricsCommandInvoker(applicationContext, mqttClient);
if (topicTokenMap != null)
{
foreach (string topicTokenKey in topicTokenMap.Keys)
{
this.publishMetricsCommandInvoker.TopicTokenMap.TryAdd("ex:" + topicTokenKey, topicTokenMap[topicTokenKey]);
}
}
}

public PublishMetricsCommandInvoker PublishMetricsCommandInvoker { get => this.publishMetricsCommandInvoker; }

/// <summary>
/// Invoke a command.
/// </summary>
/// <param name="requestMetadata">The metadata for this command request.</param>
/// <param name="additionalTopicTokenMap">
/// The topic token replacement map to use in addition to the topic tokens specified in the constructor. If this map
/// contains any keys that the topic tokens specified in the constructor also has, then values specified in this map will take precedence.
/// </param>
/// <param name="commandTimeout">How long the command will be available on the broker for an executor to receive.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The command response.</returns>
public RpcCallAsync<PublishMetricsResponsePayload> PublishMetricsAsync(PublishMetricsRequestPayload request, CommandRequestMetadata? requestMetadata = null, Dictionary<string, string>? additionalTopicTokenMap = null, TimeSpan? commandTimeout = default, CancellationToken cancellationToken = default)
{
string? clientId = this.mqttClient.ClientId;
if (string.IsNullOrEmpty(clientId))
{
throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before invoking command.");
}

CommandRequestMetadata metadata = requestMetadata ?? new CommandRequestMetadata();
additionalTopicTokenMap ??= new();

Dictionary<string, string> prefixedAdditionalTopicTokenMap = new();
foreach (string key in additionalTopicTokenMap.Keys)
{
prefixedAdditionalTopicTokenMap["ex:" + key] = additionalTopicTokenMap[key];
}

prefixedAdditionalTopicTokenMap["invokerClientId"] = clientId;

return new RpcCallAsync<PublishMetricsResponsePayload>(this.publishMetricsCommandInvoker.InvokeCommandAsync(request, metadata, prefixedAdditionalTopicTokenMap, commandTimeout, cancellationToken), metadata.CorrelationId);
}

public async ValueTask DisposeAsync()
{
await this.publishMetricsCommandInvoker.DisposeAsync().ConfigureAwait(false);
}

public async ValueTask DisposeAsync(bool disposing)
{
await this.publishMetricsCommandInvoker.DisposeAsync(disposing).ConfigureAwait(false);
}
}
}
}
Loading
Loading