diff --git a/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.csproj b/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.csproj
index 6612675028..bd5657ecac 100644
--- a/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.csproj
+++ b/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.csproj
@@ -31,6 +31,9 @@
AssetAndDeviceRegistry\aep-type-based-operations.json
+
+ Observability\akri-observability-metrics-operations.json
+
AssetAndDeviceRegistry\device-name-based-operations.json
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.sln b/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.sln
new file mode 100644
index 0000000000..1f255cbefe
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Azure.Iot.Operations.Services.sln
@@ -0,0 +1,24 @@
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.5.2.0
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Iot.Operations.Services", "Azure.Iot.Operations.Services.csproj", "{A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A98A1401-F7CD-3CAC-A3A0-931D8873FEF9}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {5D7B8265-62A7-4E82-9C32-536AADFE007E}
+ EndGlobalSection
+EndGlobal
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricErrorKind.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricErrorKind.g.cs
new file mode 100644
index 0000000000..59e97c98dd
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricErrorKind.g.cs
@@ -0,0 +1,26 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System.Runtime.Serialization;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Enumerates structured error types for failed metric operations.
+ ///
+ [JsonConverter(typeof(JsonStringEnumMemberConverter))]
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public enum AkriMetricErrorKind
+ {
+ [EnumMember(Value = @"ConflictingMetricDefinition")]
+ ConflictingMetricDefinition = 0,
+ [EnumMember(Value = @"DuplicateOperationId")]
+ DuplicateOperationId = 1,
+ [EnumMember(Value = @"InternalError")]
+ InternalError = 2,
+ [EnumMember(Value = @"TooManyUniqueLabelSets")]
+ TooManyUniqueLabelSets = 3,
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponse.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponse.g.cs
new file mode 100644
index 0000000000..dcde75c210
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponse.g.cs
@@ -0,0 +1,71 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Represents the result of a single metric operation, including structured error information if applicable.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class AkriMetricOperationResponse : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Structured classification of the error type, included only if status is Error.
+ ///
+ [JsonPropertyName("errorKind")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public AkriMetricErrorKind? ErrorKind { get; set; } = default;
+
+ ///
+ /// Optional human-readable description of the error.
+ ///
+ [JsonPropertyName("errorMessage")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string? ErrorMessage { get; set; } = default;
+
+ ///
+ /// ID of the operation this response refers to.
+ ///
+ [JsonPropertyName("operationId")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public string OperationId { get; set; } = default!;
+
+ ///
+ /// Optional name of the field or conceptual field associated with the error.
+ ///
+ [JsonPropertyName("propertyName")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string? PropertyName { get; set; } = default;
+
+ ///
+ /// Status of the operation: either Success or Error.
+ ///
+ [JsonPropertyName("status")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public AkriMetricOperationResponseStatus Status { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponseStatus.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponseStatus.g.cs
new file mode 100644
index 0000000000..d9f7f299e9
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriMetricOperationResponseStatus.g.cs
@@ -0,0 +1,22 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System.Runtime.Serialization;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Defines possible statuses of a metric operation.
+ ///
+ [JsonConverter(typeof(JsonStringEnumMemberConverter))]
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public enum AkriMetricOperationResponseStatus
+ {
+ [EnumMember(Value = @"Error")]
+ Error = 0,
+ [EnumMember(Value = @"Success")]
+ Success = 1,
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriObservabilityService.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriObservabilityService.g.cs
new file mode 100644
index 0000000000..7af9c01e2e
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/AkriObservabilityService.g.cs
@@ -0,0 +1,196 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Azure.Iot.Operations.Protocol.Models;
+ using Azure.Iot.Operations.Protocol;
+ using Azure.Iot.Operations.Protocol.RPC;
+ using Azure.Iot.Operations.Protocol.Telemetry;
+ using Azure.Iot.Operations.Services.Observability;
+
+ [CommandTopic("akri/observability/{ex:connectorClientId}/metrics")]
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public static partial class AkriObservabilityService
+ {
+ public abstract partial class Service : IAsyncDisposable
+ {
+ private ApplicationContext applicationContext;
+ private IMqttPubSubClient mqttClient;
+ private readonly PublishMetricsCommandExecutor publishMetricsCommandExecutor;
+
+ ///
+ /// Construct a new instance of this service.
+ ///
+ /// The shared context for your application.
+ /// The MQTT client to use.
+ ///
+ /// The topic token replacement map to use for all operations by default. Generally, this will include the token values
+ /// for topic tokens such as "modelId" which should be the same for the duration of this service's lifetime. Note that
+ /// additional topic tokens can be specified when starting the service with and
+ /// can be specified per-telemetry message.
+ ///
+ public Service(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, Dictionary? topicTokenMap = null)
+ {
+ this.applicationContext = applicationContext;
+ this.mqttClient = mqttClient;
+
+ string? clientId = this.mqttClient.ClientId;
+ if (string.IsNullOrEmpty(clientId))
+ {
+ throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before invoking command.");
+ }
+
+ this.publishMetricsCommandExecutor = new PublishMetricsCommandExecutor(applicationContext, mqttClient) { OnCommandReceived = PublishMetricsInt };
+
+ if (topicTokenMap != null)
+ {
+ foreach (string topicTokenKey in topicTokenMap.Keys)
+ {
+ this.publishMetricsCommandExecutor.TopicTokenMap.TryAdd("ex:" + topicTokenKey, topicTokenMap[topicTokenKey]);
+ }
+ }
+
+ this.publishMetricsCommandExecutor.TopicTokenMap.TryAdd("executorId", clientId);
+ }
+
+ public PublishMetricsCommandExecutor PublishMetricsCommandExecutor { get => this.publishMetricsCommandExecutor; }
+
+ public abstract Task> PublishMetricsAsync(PublishMetricsRequestPayload request, CommandRequestMetadata requestMetadata, CancellationToken cancellationToken);
+
+ ///
+ /// Begin accepting command invocations for all command executors.
+ ///
+ ///
+ /// The topic token replacements to use in addition to any topic tokens specified in the constructor. If this map
+ /// contains any keys that topic tokens provided in the constructor also has, then values specified in this map will take precedence.
+ ///
+ /// The dispatch concurrency count for the command response cache to use.
+ /// Cancellation token.
+ ///
+ /// Specifying custom topic tokens in allows you to make command executors only
+ /// accept commands over a specific topic.
+ ///
+ /// Note that a given command executor can only be started with one set of topic token replacements. If you want a command executor
+ /// to only handle commands for several specific sets of topic token values (as opposed to all possible topic token values), then you will
+ /// instead need to create a command executor per topic token set.
+ ///
+ public async Task StartAsync(int? preferredDispatchConcurrency = null, CancellationToken cancellationToken = default)
+ {
+ string? clientId = this.mqttClient.ClientId;
+ if (string.IsNullOrEmpty(clientId))
+ {
+ throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before starting service.");
+ }
+
+ await Task.WhenAll(
+ this.publishMetricsCommandExecutor.StartAsync(preferredDispatchConcurrency, cancellationToken)).ConfigureAwait(false);
+ }
+
+ public async Task StopAsync(CancellationToken cancellationToken = default)
+ {
+ await Task.WhenAll(
+ this.publishMetricsCommandExecutor.StopAsync(cancellationToken)).ConfigureAwait(false);
+ }
+
+ private async Task> PublishMetricsInt(ExtendedRequest req, CancellationToken cancellationToken)
+ {
+ ExtendedResponse extended = await this.PublishMetricsAsync(req.Request!, req.RequestMetadata!, cancellationToken);
+ return new ExtendedResponse { Response = extended.Response, ResponseMetadata = extended.ResponseMetadata };
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await this.publishMetricsCommandExecutor.DisposeAsync().ConfigureAwait(false);
+ }
+
+ public async ValueTask DisposeAsync(bool disposing)
+ {
+ await this.publishMetricsCommandExecutor.DisposeAsync(disposing).ConfigureAwait(false);
+ }
+ }
+
+ public abstract partial class Client : IAsyncDisposable
+ {
+ private ApplicationContext applicationContext;
+ private IMqttPubSubClient mqttClient;
+ private readonly PublishMetricsCommandInvoker publishMetricsCommandInvoker;
+
+ ///
+ /// Construct a new instance of this client.
+ ///
+ /// The shared context for your application.
+ /// The MQTT client to use.
+ ///
+ /// The topic token replacement map to use for all operations by default. Generally, this will include the token values
+ /// for topic tokens such as "modelId" which should be the same for the duration of this client's lifetime. Note that
+ /// additional topic tokens can be specified when starting the client with .
+ ///
+ public Client(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, Dictionary? topicTokenMap = null)
+ {
+ this.applicationContext = applicationContext;
+ this.mqttClient = mqttClient;
+
+ this.publishMetricsCommandInvoker = new PublishMetricsCommandInvoker(applicationContext, mqttClient);
+ if (topicTokenMap != null)
+ {
+ foreach (string topicTokenKey in topicTokenMap.Keys)
+ {
+ this.publishMetricsCommandInvoker.TopicTokenMap.TryAdd("ex:" + topicTokenKey, topicTokenMap[topicTokenKey]);
+ }
+ }
+ }
+
+ public PublishMetricsCommandInvoker PublishMetricsCommandInvoker { get => this.publishMetricsCommandInvoker; }
+
+ ///
+ /// Invoke a command.
+ ///
+ /// The metadata for this command request.
+ ///
+ /// The topic token replacement map to use in addition to the topic tokens specified in the constructor. If this map
+ /// contains any keys that the topic tokens specified in the constructor also has, then values specified in this map will take precedence.
+ ///
+ /// How long the command will be available on the broker for an executor to receive.
+ /// Cancellation token.
+ /// The command response.
+ public RpcCallAsync PublishMetricsAsync(PublishMetricsRequestPayload request, CommandRequestMetadata? requestMetadata = null, Dictionary? additionalTopicTokenMap = null, TimeSpan? commandTimeout = default, CancellationToken cancellationToken = default)
+ {
+ string? clientId = this.mqttClient.ClientId;
+ if (string.IsNullOrEmpty(clientId))
+ {
+ throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before invoking command.");
+ }
+
+ CommandRequestMetadata metadata = requestMetadata ?? new CommandRequestMetadata();
+ additionalTopicTokenMap ??= new();
+
+ Dictionary prefixedAdditionalTopicTokenMap = new();
+ foreach (string key in additionalTopicTokenMap.Keys)
+ {
+ prefixedAdditionalTopicTokenMap["ex:" + key] = additionalTopicTokenMap[key];
+ }
+
+ prefixedAdditionalTopicTokenMap["invokerClientId"] = clientId;
+
+ return new RpcCallAsync(this.publishMetricsCommandInvoker.InvokeCommandAsync(request, metadata, prefixedAdditionalTopicTokenMap, commandTimeout, cancellationToken), metadata.CorrelationId);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await this.publishMetricsCommandInvoker.DisposeAsync().ConfigureAwait(false);
+ }
+
+ public async ValueTask DisposeAsync(bool disposing)
+ {
+ await this.publishMetricsCommandInvoker.DisposeAsync(disposing).ConfigureAwait(false);
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/CounterMetric.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/CounterMetric.g.cs
new file mode 100644
index 0000000000..3744c928b4
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/CounterMetric.g.cs
@@ -0,0 +1,58 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Defines a Counter metric and its associated Increment operations.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class CounterMetric : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Common identifying properties of the Counter metric.
+ ///
+ [JsonPropertyName("definition")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public MetricDefinition Definition { get; set; } = default!;
+
+ ///
+ /// List of Increment operations for this Counter metric.
+ ///
+ [JsonPropertyName("operations")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public List Operations { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/GaugeMetric.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/GaugeMetric.g.cs
new file mode 100644
index 0000000000..bfb446b1e6
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/GaugeMetric.g.cs
@@ -0,0 +1,58 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Defines a Gauge metric and its associated Record operations.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class GaugeMetric : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Common identifying properties of the Gauge metric.
+ ///
+ [JsonPropertyName("definition")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public MetricDefinition Definition { get; set; } = default!;
+
+ ///
+ /// List of Record operations for this Gauge metric.
+ ///
+ [JsonPropertyName("operations")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public List Operations { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/HistogramMetric.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/HistogramMetric.g.cs
new file mode 100644
index 0000000000..981da3c396
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/HistogramMetric.g.cs
@@ -0,0 +1,58 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Defines a Histogram metric and its associated Record operations.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class HistogramMetric : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Common identifying properties of the Histogram metric.
+ ///
+ [JsonPropertyName("definition")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public MetricDefinition Definition { get; set; } = default!;
+
+ ///
+ /// List of Record operations for this Histogram metric.
+ ///
+ [JsonPropertyName("operations")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public List Operations { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (Definition is null)
+ {
+ throw new ArgumentNullException("definition field cannot be null");
+ }
+ if (Operations is null)
+ {
+ throw new ArgumentNullException("operations field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/IncrementOperation.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/IncrementOperation.g.cs
new file mode 100644
index 0000000000..137cda1f94
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/IncrementOperation.g.cs
@@ -0,0 +1,58 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Represents a Increment operation used by Counter metric.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class IncrementOperation : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Unique identifier for the operation, used to correlate responses.
+ ///
+ [JsonPropertyName("operationId")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public string OperationId { get; set; } = default!;
+
+ ///
+ /// Timestamp indicating when the metric value was incremented.
+ ///
+ [JsonPropertyName("timestamp")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public DateTime Timestamp { get; set; } = default!;
+
+ ///
+ /// The numeric value by which the counter will be incremented.
+ ///
+ [JsonPropertyName("value")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public double Value { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/MetricDefinition.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/MetricDefinition.g.cs
new file mode 100644
index 0000000000..605f909e07
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/MetricDefinition.g.cs
@@ -0,0 +1,65 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Shared identifying and descriptive properties of a metric.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class MetricDefinition : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Key-value pairs that provide dimensional context for the metric.
+ ///
+ [JsonPropertyName("labels")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public Dictionary Labels { get; set; } = default!;
+
+ ///
+ /// A unique, human-readable name that identifies the metric.
+ ///
+ [JsonPropertyName("name")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public string Name { get; set; } = default!;
+
+ ///
+ /// The unit of measurement associated with the metric (e.g., seconds, bytes).
+ ///
+ [JsonPropertyName("unit")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public string? Unit { get; set; } = default;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (Labels is null)
+ {
+ throw new ArgumentNullException("labels field cannot be null");
+ }
+ if (Name is null)
+ {
+ throw new ArgumentNullException("name field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (Labels is null)
+ {
+ throw new ArgumentNullException("labels field cannot be null");
+ }
+ if (Name is null)
+ {
+ throw new ArgumentNullException("name field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandExecutor.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandExecutor.g.cs
new file mode 100644
index 0000000000..2cd8205af0
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandExecutor.g.cs
@@ -0,0 +1,38 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Xml;
+ using Azure.Iot.Operations.Protocol;
+ using Azure.Iot.Operations.Protocol.RPC;
+ using Azure.Iot.Operations.Protocol.Models;
+ using Azure.Iot.Operations.Services.Observability;
+
+ public static partial class AkriObservabilityService
+ {
+ ///
+ /// Specializes a CommandExecutor class for Command 'publishMetrics'.
+ ///
+ public class PublishMetricsCommandExecutor : CommandExecutor
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public PublishMetricsCommandExecutor(ApplicationContext applicationContext, IMqttPubSubClient mqttClient)
+ : base(applicationContext, mqttClient, "publishMetrics", new Utf8JsonSerializer())
+ {
+ TopicTokenMap["modelId"] = "dtmi:com:microsoft:akri:AkriObservabilityService;1";
+ if (mqttClient.ClientId != null)
+ {
+ TopicTokenMap["executorId"] = mqttClient.ClientId;
+ }
+ TopicTokenMap["commandName"] = "publishMetrics";
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandInvoker.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandInvoker.g.cs
new file mode 100644
index 0000000000..5f531e941d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsCommandInvoker.g.cs
@@ -0,0 +1,38 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using Azure.Iot.Operations.Protocol;
+ using Azure.Iot.Operations.Protocol.RPC;
+ using Azure.Iot.Operations.Protocol.Models;
+ using Azure.Iot.Operations.Services.Observability;
+
+ public static partial class AkriObservabilityService
+ {
+ ///
+ /// Specializes the CommandInvoker class for Command 'publishMetrics'.
+ ///
+ public class PublishMetricsCommandInvoker : CommandInvoker
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public PublishMetricsCommandInvoker(ApplicationContext applicationContext, IMqttPubSubClient mqttClient)
+ : base(applicationContext, mqttClient, "publishMetrics", new Utf8JsonSerializer())
+ {
+ this.ResponseTopicPrefix = "clients/{invokerClientId}"; // default value, can be overwritten by user code
+
+ TopicTokenMap["modelId"] = "dtmi:com:microsoft:akri:AkriObservabilityService;1";
+ if (mqttClient.ClientId != null)
+ {
+ TopicTokenMap["invokerClientId"] = mqttClient.ClientId;
+ }
+ TopicTokenMap["commandName"] = "publishMetrics";
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestPayload.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestPayload.g.cs
new file mode 100644
index 0000000000..c2a078207e
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestPayload.g.cs
@@ -0,0 +1,39 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class PublishMetricsRequestPayload : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// The Command request argument.
+ ///
+ [JsonPropertyName("metrics")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public PublishMetricsRequestSchema Metrics { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (Metrics is null)
+ {
+ throw new ArgumentNullException("metrics field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (Metrics is null)
+ {
+ throw new ArgumentNullException("metrics field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestSchema.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestSchema.g.cs
new file mode 100644
index 0000000000..bceaca0fcc
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsRequestSchema.g.cs
@@ -0,0 +1,37 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class PublishMetricsRequestSchema
+ {
+ ///
+ /// List of counter metrics to be published.
+ ///
+ [JsonPropertyName("counterMetrics")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public List? CounterMetrics { get; set; } = default;
+
+ ///
+ /// List of gauge metrics to be published
+ ///
+ [JsonPropertyName("gaugeMetrics")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public List? GaugeMetrics { get; set; } = default;
+
+ ///
+ /// List of histogram metrics to be published
+ ///
+ [JsonPropertyName("histogramMetrics")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public List? HistogramMetrics { get; set; } = default;
+
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsResponsePayload.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsResponsePayload.g.cs
new file mode 100644
index 0000000000..cc1c2b9dc9
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/PublishMetricsResponsePayload.g.cs
@@ -0,0 +1,39 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class PublishMetricsResponsePayload : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// The Command response argument.
+ ///
+ [JsonPropertyName("publishMetricsResponse")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public List PublishMetricsResponse { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (PublishMetricsResponse is null)
+ {
+ throw new ArgumentNullException("publishMetricsResponse field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (PublishMetricsResponse is null)
+ {
+ throw new ArgumentNullException("publishMetricsResponse field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/RecordOperation.g.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/RecordOperation.g.cs
new file mode 100644
index 0000000000..fdd071f60d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityService/RecordOperation.g.cs
@@ -0,0 +1,58 @@
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+#nullable enable
+
+namespace Azure.Iot.Operations.Services.Observability.AkriObservabilityService
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Represents a Record operation used by Gauge and Histogram metrics.
+ ///
+ [System.CodeDom.Compiler.GeneratedCode("Azure.Iot.Operations.ProtocolCompiler", "0.10.0.0")]
+ public partial class RecordOperation : IJsonOnDeserialized, IJsonOnSerializing
+ {
+ ///
+ /// Unique identifier for the operation, used to correlate responses.
+ ///
+ [JsonPropertyName("operationId")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public string OperationId { get; set; } = default!;
+
+ ///
+ /// Timestamp indicating when the metric value was recorded.
+ ///
+ [JsonPropertyName("timestamp")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public DateTime Timestamp { get; set; } = default!;
+
+ ///
+ /// The numeric value to be recorded by the Gauge or Histogram metric.
+ ///
+ [JsonPropertyName("value")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.Never)]
+ [JsonRequired]
+ public double Value { get; set; } = default!;
+
+ void IJsonOnDeserialized.OnDeserialized()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+
+ void IJsonOnSerializing.OnSerializing()
+ {
+ if (OperationId is null)
+ {
+ throw new ArgumentNullException("operationId field cannot be null");
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityServiceStub.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityServiceStub.cs
new file mode 100644
index 0000000000..22a5fb34ae
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/AkriObservabilityServiceStub.cs
@@ -0,0 +1,26 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Protocol;
+using Azure.Iot.Operations.Protocol.RPC;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public class AkriObservabilityServiceStub : AkriObservabilityService.AkriObservabilityService.Client, IAkriObservabilityService
+{
+ public AkriObservabilityServiceStub(
+ ApplicationContext applicationContext,
+ IMqttPubSubClient mqttClient,
+ Dictionary? topicTokenMap = null) : base(
+ applicationContext,
+ mqttClient,
+ topicTokenMap)
+ {
+ }
+
+ public RpcCallAsync PublishMetricsAsync(PublishMetricsRequestPayload request)
+ {
+ return base.PublishMetricsAsync(request);
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedCounter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedCounter.cs
new file mode 100644
index 0000000000..b7f1b3d38d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedCounter.cs
@@ -0,0 +1,47 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+internal class CachedCounter : ICounter
+{
+ private readonly ConcurrentQueue _operations = new();
+
+ public string Name { get; }
+ public Dictionary Labels { get; }
+ public string? Unit { get; }
+
+ public CachedCounter(string name, Dictionary labels, string? unit)
+ {
+ Name = name;
+ Labels = labels;
+ Unit = unit;
+ }
+
+ public void Add(double value)
+ {
+ _operations.Enqueue(new IncrementOperation
+ {
+ OperationId = Guid.NewGuid().ToString(),
+ Timestamp = DateTime.UtcNow,
+ Value = value
+ });
+ }
+
+ public List GetOperationsAndClear(int maxCount)
+ {
+ var result = new List();
+
+ int count = 0;
+ while (count < maxCount && _operations.TryDequeue(out var operation))
+ {
+ result.Add(operation);
+ count++;
+ }
+
+ return result;
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedGauge.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedGauge.cs
new file mode 100644
index 0000000000..47133e357e
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedGauge.cs
@@ -0,0 +1,47 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+internal class CachedGauge : IGauge
+{
+ private readonly ConcurrentQueue _operations = new();
+
+ public string Name { get; }
+ public Dictionary Labels { get; }
+ public string? Unit { get; }
+
+ public CachedGauge(string name, Dictionary labels, string? unit)
+ {
+ Name = name;
+ Labels = labels;
+ Unit = unit;
+ }
+
+ public void Record(double value)
+ {
+ _operations.Enqueue(new RecordOperation
+ {
+ OperationId = Guid.NewGuid().ToString(),
+ Timestamp = DateTime.UtcNow,
+ Value = value
+ });
+ }
+
+ public List GetOperationsAndClear(int maxCount)
+ {
+ var result = new List();
+
+ int count = 0;
+ while (count < maxCount && _operations.TryDequeue(out var operation))
+ {
+ result.Add(operation);
+ count++;
+ }
+
+ return result;
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedHistogram.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedHistogram.cs
new file mode 100644
index 0000000000..90d8d43521
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/CachedHistogram.cs
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ internal class CachedHistogram : IHistogram
+ {
+ private readonly ConcurrentQueue _operations = new();
+
+ public string Name { get; }
+ public Dictionary Labels { get; }
+ public string? Unit { get; }
+
+ public CachedHistogram(string name, Dictionary labels, string? unit)
+ {
+ Name = name;
+ Labels = labels;
+ Unit = unit;
+ }
+
+ public void Record(double value)
+ {
+ _operations.Enqueue(new RecordOperation
+ {
+ OperationId = Guid.NewGuid().ToString(),
+ Timestamp = DateTime.UtcNow,
+ Value = value
+ });
+ }
+
+ public List GetOperationsAndClear(int maxCount)
+ {
+ var result = new List();
+
+ int count = 0;
+ while (count < maxCount && _operations.TryDequeue(out var operation))
+ {
+ result.Add(operation);
+ count++;
+ }
+
+ return result;
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/BytesJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/BytesJsonConverter.cs
new file mode 100644
index 0000000000..54ae04294f
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/BytesJsonConverter.cs
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Class for customized JSON conversion of byte[] values to/from Base64 string representations per RFC 4648.
+ ///
+ internal sealed class BytesJsonConverter : JsonConverter
+ {
+ ///
+ public override byte[] Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return Convert.FromBase64String(reader.GetString() !);
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, byte[] value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(Convert.ToBase64String(value));
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DateJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DateJsonConverter.cs
new file mode 100644
index 0000000000..d8443ad04e
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DateJsonConverter.cs
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Globalization;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Class for customized JSON conversion of DateOnly values to/from string representations in ISO 8601 Date format.
+ ///
+ internal sealed class DateJsonConverter : JsonConverter
+ {
+ ///
+ public override DateOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return DateOnly.Parse(reader.GetString() !, CultureInfo.InvariantCulture);
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, DateOnly value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture));
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalJsonConverter.cs
new file mode 100644
index 0000000000..4689de2cf4
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalJsonConverter.cs
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Services.Observability;
+
+ ///
+ /// Class for customized JSON conversion of DecimalString values to/from strings.
+ ///
+ internal sealed class DecimalJsonConverter : JsonConverter
+ {
+ ///
+ public override DecimalString Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return new DecimalString(reader.GetString() !);
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, DecimalString value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(value.ToString());
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalString.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalString.cs
new file mode 100644
index 0000000000..02d084193d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DecimalString.cs
@@ -0,0 +1,97 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Globalization;
+ using System.Text.RegularExpressions;
+
+ public class DecimalString
+ {
+ private static readonly Regex ValidationRegex = new Regex("^(?:\\+|-)?(?:[1-9][0-9]*|0)(?:\\.[0-9]*)?$", RegexOptions.Compiled);
+
+ private readonly string value;
+
+ public DecimalString()
+ : this("0", skipValidation: false)
+ {
+ }
+
+ public DecimalString(string value)
+ : this(value, skipValidation: false)
+ {
+ }
+
+ private DecimalString(string value, bool skipValidation)
+ {
+ if (!skipValidation && !ValidationRegex.IsMatch(value))
+ {
+ throw new ArgumentException($"string {value} is not a valid decimal value");
+ }
+
+ this.value = value;
+ }
+
+ public static implicit operator string(DecimalString decimalString) => decimalString.value;
+
+ public static explicit operator DecimalString(string stringVal) => new DecimalString(stringVal);
+
+ public static implicit operator double(DecimalString decimalString) => double.TryParse(decimalString.value, out double doubleVal) ? doubleVal : double.NaN;
+
+ public static explicit operator DecimalString(double doubleVal) => new DecimalString(doubleVal.ToString("F", CultureInfo.InvariantCulture));
+
+ public static bool operator !=(DecimalString? x, DecimalString? y)
+ {
+ if (ReferenceEquals(null, x))
+ {
+ return !ReferenceEquals(null, y);
+ }
+
+ return !x.Equals(y);
+ }
+
+ public static bool operator ==(DecimalString? x, DecimalString? y)
+ {
+ if (ReferenceEquals(null, x))
+ {
+ return ReferenceEquals(null, y);
+ }
+
+ return x.Equals(y);
+ }
+
+ public static bool TryParse(string value, out DecimalString? decimalString)
+ {
+ if (ValidationRegex.IsMatch(value))
+ {
+ decimalString = new DecimalString(value, skipValidation: true);
+ return true;
+ }
+ else
+ {
+ decimalString = null;
+ return false;
+ }
+ }
+
+ public virtual bool Equals(DecimalString? other)
+ {
+ return other?.value == this?.value;
+ }
+
+ public override bool Equals(object? obj)
+ {
+ return obj is DecimalString other && Equals(other);
+ }
+
+ public override int GetHashCode()
+ {
+ return value.GetHashCode();
+ }
+
+ public override string ToString() => value;
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DurationJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DurationJsonConverter.cs
new file mode 100644
index 0000000000..6905a5df7d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/DurationJsonConverter.cs
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+ using System.Xml;
+
+ ///
+ /// Class for customized JSON conversion of TimeSpan values to/from string representations in ISO 8601 Duration format.
+ ///
+ internal sealed class DurationJsonConverter : JsonConverter
+ {
+ ///
+ public override TimeSpan Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return XmlConvert.ToTimeSpan(reader.GetString() !);
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, TimeSpan value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(XmlConvert.ToString(value));
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/EmptyJson.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/EmptyJson.cs
new file mode 100644
index 0000000000..a2875f2865
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/EmptyJson.cs
@@ -0,0 +1,11 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ public class EmptyJson
+ {
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/TimeJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/TimeJsonConverter.cs
new file mode 100644
index 0000000000..0ca6c4e0ec
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/TimeJsonConverter.cs
@@ -0,0 +1,30 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Globalization;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Class for customized JSON conversion of TimeOnly values to/from string representations in ISO 8601 Time format.
+ ///
+ internal sealed class TimeJsonConverter : JsonConverter
+ {
+ ///
+ public override TimeOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return TimeOnly.FromDateTime(DateTime.Parse(reader.GetString() !, CultureInfo.InvariantCulture, DateTimeStyles.NoCurrentDateDefault | DateTimeStyles.AdjustToUniversal));
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, TimeOnly value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture) + "Z");
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/Utf8JsonSerializer.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/Utf8JsonSerializer.cs
new file mode 100644
index 0000000000..6b37d06077
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/Utf8JsonSerializer.cs
@@ -0,0 +1,89 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Buffers;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+ using Azure.Iot.Operations.Protocol;
+ using Azure.Iot.Operations.Protocol.Models;
+
+ public class Utf8JsonSerializer : IPayloadSerializer
+ {
+ public const string ContentType = "application/json";
+
+ public const MqttPayloadFormatIndicator PayloadFormatIndicator = MqttPayloadFormatIndicator.CharacterData;
+
+ protected static readonly JsonSerializerOptions JsonSerializerOptions = new ()
+ {
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
+ Converters =
+ {
+ new DurationJsonConverter(),
+ new DateJsonConverter(),
+ new TimeJsonConverter(),
+ new UuidJsonConverter(),
+ new BytesJsonConverter(),
+ new DecimalJsonConverter(),
+ },
+ };
+
+ public T FromBytes(ReadOnlySequence payload, string? contentType, MqttPayloadFormatIndicator payloadFormatIndicator)
+ where T : class
+ {
+ if (contentType != null && contentType != ContentType)
+ {
+ throw new AkriMqttException($"Content type {contentType} is not supported by this implementation; only {ContentType} is accepted.")
+ {
+ Kind = AkriMqttErrorKind.HeaderInvalid,
+ HeaderName = "Content Type",
+ HeaderValue = contentType,
+ IsShallow = false,
+ IsRemote = false,
+ };
+ }
+
+ try
+ {
+ if (payload.IsEmpty)
+ {
+ if (typeof(T) != typeof(EmptyJson))
+ {
+ throw AkriMqttException.GetPayloadInvalidException();
+ }
+
+ return (new EmptyJson() as T) !;
+ }
+
+ Utf8JsonReader reader = new (payload);
+ return JsonSerializer.Deserialize(ref reader, JsonSerializerOptions) !;
+ }
+ catch (Exception)
+ {
+ throw AkriMqttException.GetPayloadInvalidException();
+ }
+ }
+
+ public SerializedPayloadContext ToBytes(T? payload)
+ where T : class
+ {
+ try
+ {
+ if (typeof(T) == typeof(EmptyJson))
+ {
+ return new (ReadOnlySequence.Empty, null, 0);
+ }
+
+ return new (new (JsonSerializer.SerializeToUtf8Bytes(payload, JsonSerializerOptions)), ContentType, PayloadFormatIndicator);
+ }
+ catch (Exception)
+ {
+ throw AkriMqttException.GetPayloadInvalidException();
+ }
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/UuidJsonConverter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/UuidJsonConverter.cs
new file mode 100644
index 0000000000..78e4e60226
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/UuidJsonConverter.cs
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+ using System.Text.Json;
+ using System.Text.Json.Serialization;
+
+ ///
+ /// Class for customized JSON conversion of Guid values to/from UUID string representations per RFC 9562.
+ ///
+ internal sealed class UuidJsonConverter : JsonConverter
+ {
+ ///
+ public override Guid Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
+ {
+ return Guid.ParseExact(reader.GetString() !, "D");
+ }
+
+ ///
+ public override void Write(Utf8JsonWriter writer, Guid value, JsonSerializerOptions options)
+ {
+ writer.WriteStringValue(value.ToString("D"));
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/ValueExtractor.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/ValueExtractor.cs
new file mode 100644
index 0000000000..02faee2662
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Common/ValueExtractor.cs
@@ -0,0 +1,20 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+/* Code generated by Azure.Iot.Operations.ProtocolCompiler v0.10.0.0; DO NOT EDIT. */
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ using System;
+
+ internal static class ValueExtractor
+ {
+#pragma warning disable IDE0030 // Null check can be simplified
+ public static T Value(this T obj)
+ where T : class => obj;
+
+ public static T Value(this T? val)
+ where T : struct => val.HasValue ? val.Value : default(T);
+#pragma warning restore IDE0030 // Null check can be simplified
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/CounterMetricCache.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/CounterMetricCache.cs
new file mode 100644
index 0000000000..62a9f65670
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/CounterMetricCache.cs
@@ -0,0 +1,50 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ internal class CounterMetricCache
+ {
+ private readonly ConcurrentDictionary _counters = new();
+
+ public ICounter CreateCounter(string name, Dictionary labels, string? unit)
+ {
+ var key = FormatMetricKey(name, labels);
+ return _counters.GetOrAdd(key, _ => new CachedCounter(name, new Dictionary(labels), unit));
+ }
+
+ public List GetMetricsAndClear(int maxBatchSize)
+ {
+ var result = new List();
+
+ foreach (var cachedCounter in _counters.Values)
+ {
+ var operations = cachedCounter.GetOperationsAndClear(maxBatchSize);
+ if (operations.Count > 0)
+ {
+ result.Add(new CounterMetric
+ {
+ Definition = new MetricDefinition
+ {
+ Name = cachedCounter.Name,
+ Labels = new Dictionary(cachedCounter.Labels),
+ Unit = cachedCounter.Unit
+ },
+ Operations = operations
+ });
+ }
+ }
+
+ return result;
+ }
+
+ private static string FormatMetricKey(string name, Dictionary labels)
+ {
+ var labelString = string.Join(",", labels.OrderBy(kvp => kvp.Key).Select(kvp => $"{kvp.Key}={kvp.Value}"));
+ return $"{name}:{labelString}";
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/GaugeMetricCache.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/GaugeMetricCache.cs
new file mode 100644
index 0000000000..8bcbca7328
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/GaugeMetricCache.cs
@@ -0,0 +1,49 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+internal class GaugeMetricCache
+{
+ private readonly ConcurrentDictionary _gauges = new();
+
+ public IGauge CreateGauge(string name, Dictionary labels, string? unit)
+ {
+ var key = FormatMetricKey(name, labels);
+ return _gauges.GetOrAdd(key, _ => new CachedGauge(name, new Dictionary(labels), unit));
+ }
+
+ public List GetMetricsAndClear(int maxBatchSize)
+ {
+ var result = new List();
+
+ foreach (var cachedGauge in _gauges.Values)
+ {
+ List operations = cachedGauge.GetOperationsAndClear(maxBatchSize);
+ if (operations.Count > 0)
+ {
+ result.Add(new GaugeMetric
+ {
+ Definition = new MetricDefinition
+ {
+ Name = cachedGauge.Name,
+ Labels = new Dictionary(cachedGauge.Labels),
+ Unit = cachedGauge.Unit
+ },
+ Operations = operations
+ });
+ }
+ }
+
+ return result;
+ }
+
+ private static string FormatMetricKey(string name, Dictionary labels)
+ {
+ var labelString = string.Join(",", labels.OrderBy(kvp => kvp.Key).Select(kvp => $"{kvp.Key}={kvp.Value}"));
+ return $"{name}:{labelString}";
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/HistogramMetricCache.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/HistogramMetricCache.cs
new file mode 100644
index 0000000000..a6d8fd8a69
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/HistogramMetricCache.cs
@@ -0,0 +1,49 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Collections.Concurrent;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+internal class HistogramMetricCache
+{
+ private readonly ConcurrentDictionary _histograms = new();
+
+ public IHistogram CreateHistogram(string name, Dictionary labels, string? unit)
+ {
+ var key = FormatMetricKey(name, labels);
+ return _histograms.GetOrAdd(key, _ => new CachedHistogram(name, new Dictionary(labels), unit));
+ }
+
+ public List GetMetricsAndClear(int maxBatchSize)
+ {
+ var result = new List();
+
+ foreach (var cachedHistogram in _histograms.Values)
+ {
+ var operations = cachedHistogram.GetOperationsAndClear(maxBatchSize);
+ if (operations.Count > 0)
+ {
+ result.Add(new HistogramMetric
+ {
+ Definition = new MetricDefinition
+ {
+ Name = cachedHistogram.Name,
+ Labels = new Dictionary(cachedHistogram.Labels),
+ Unit = cachedHistogram.Unit
+ },
+ Operations = operations
+ });
+ }
+ }
+
+ return result;
+ }
+
+ private static string FormatMetricKey(string name, Dictionary labels)
+ {
+ var labelString = string.Join(",", labels.OrderBy(kvp => kvp.Key).Select(kvp => $"{kvp.Key}={kvp.Value}"));
+ return $"{name}:{labelString}";
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/IAkriObservabilityService.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/IAkriObservabilityService.cs
new file mode 100644
index 0000000000..a9e061981d
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/IAkriObservabilityService.cs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+using Protocol.RPC;
+using AkriObservabilityService;
+
+public interface IAkriObservabilityService
+{
+ RpcCallAsync PublishMetricsAsync(PublishMetricsRequestPayload request);
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/ICounter.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/ICounter.cs
new file mode 100644
index 0000000000..73068599f8
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/ICounter.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public interface ICounter : IMetric
+{
+ void Add(double value);
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/IGauge.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/IGauge.cs
new file mode 100644
index 0000000000..bfadf78036
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/IGauge.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public interface IGauge : IMetric
+{
+ void Record(double value);
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/IHistogram.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/IHistogram.cs
new file mode 100644
index 0000000000..95eb4e0a03
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/IHistogram.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public interface IHistogram : IMetric
+{
+ void Record(double value);
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetric.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetric.cs
new file mode 100644
index 0000000000..faa8e4ebc7
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetric.cs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability
+{
+ public interface IMetric
+ {
+ string Name { get; }
+ Dictionary Labels { get; }
+ string? Unit { get; }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetricsReporterService.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetricsReporterService.cs
new file mode 100644
index 0000000000..e389afe6ef
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/IMetricsReporterService.cs
@@ -0,0 +1,13 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public interface IMetricsReporterService
+{
+ void Start(CancellationToken cancellationToken = default);
+ Task StopAsync();
+ ICounter CreateCounter(string name, Dictionary labels, string? unit = null);
+ IGauge CreateGauge(string name, Dictionary labels, string? unit = null);
+ IHistogram CreateHistogram(string name, Dictionary labels, string? unit = null);
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterOptions.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterOptions.cs
new file mode 100644
index 0000000000..9c7eb3d890
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterOptions.cs
@@ -0,0 +1,17 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public class MetricsReporterOptions
+{
+ ///
+ /// Interval at which metrics are reported to the underlying service
+ ///
+ public TimeSpan ReportingInterval { get; set; } = TimeSpan.FromSeconds(15);
+
+ ///
+ /// Maximum number of operations to include in a single report
+ ///
+ public int MaxBatchSize { get; set; } = 100;
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterService.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterService.cs
new file mode 100644
index 0000000000..604b15291e
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/MetricsReporterService.cs
@@ -0,0 +1,164 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Protocol.RPC;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+using Azure.Iot.Operations.Services.Observability.Utils;
+using ITimer = Azure.Iot.Operations.Services.Observability.Utils.ITimer;
+
+namespace Azure.Iot.Operations.Services.Observability;
+
+public class MetricsReporterService : IAsyncDisposable, IMetricsReporterService
+{
+ private readonly IAkriObservabilityService _observabilityService;
+ private readonly MetricsReporterOptions _options;
+ private readonly CounterMetricCache _counterCache;
+ private readonly GaugeMetricCache _gaugeCache;
+ private readonly HistogramMetricCache _histogramCache;
+ private readonly ITimerFactory _timerFactory;
+ private ITimer? _reportingTimer;
+ private bool _isDisposed;
+ private readonly SemaphoreSlim _reportingSemaphore = new(1, 1);
+ private readonly SemaphoreSlim _timerSemaphore = new(1, 1);
+
+ public MetricsReporterService(
+ IAkriObservabilityService observabilityService,
+ MetricsReporterOptions? options = null,
+ ITimerFactory? timerFactory = null)
+ {
+ _observabilityService = observabilityService ?? throw new ArgumentNullException(nameof(observabilityService));
+ _options = options ?? new MetricsReporterOptions();
+ _timerFactory = timerFactory ?? new DefaultTimerFactory();
+
+ _counterCache = new CounterMetricCache();
+ _gaugeCache = new GaugeMetricCache();
+ _histogramCache = new HistogramMetricCache();
+ }
+
+ public void Start(CancellationToken cancellationToken = default)
+ {
+ _timerSemaphore.Wait(cancellationToken);
+
+ try
+ {
+ if (_reportingTimer == null)
+ {
+ _reportingTimer = _timerFactory.CreateTimer();
+
+ _reportingTimer.Start(async _ => await ReportMetricsAsync(cancellationToken),
+ cancellationToken,
+ _options.ReportingInterval,
+ _options.ReportingInterval);
+ }
+ }
+ finally
+ {
+ _timerSemaphore.Release();
+ }
+ }
+
+ public async Task StopAsync()
+ {
+ await _timerSemaphore.WaitAsync();
+
+ try
+ {
+ if (_reportingTimer != null)
+ {
+ await _reportingTimer.DisposeAsync();
+ _reportingTimer = null;
+ }
+ }
+ finally
+ {
+ _timerSemaphore.Release();
+ }
+ }
+
+ public ICounter CreateCounter(string name, Dictionary labels, string? unit = null)
+ {
+ ValidateMetricParameters(name, labels);
+ return _counterCache.CreateCounter(name, labels, unit);
+ }
+
+ public IGauge CreateGauge(string name, Dictionary labels, string? unit = null)
+ {
+ ValidateMetricParameters(name, labels);
+ return _gaugeCache.CreateGauge(name, labels, unit);
+ }
+
+ public IHistogram CreateHistogram(string name, Dictionary labels, string? unit = null)
+ {
+ ValidateMetricParameters(name, labels);
+ return _histogramCache.CreateHistogram(name, labels, unit);
+ }
+
+ private void ValidateMetricParameters(string name, Dictionary labels)
+ {
+ if (string.IsNullOrEmpty(name))
+ {
+ throw new ArgumentException("Metric name cannot be null or empty.", nameof(name));
+ }
+
+ if (labels == null)
+ {
+ throw new ArgumentNullException(nameof(labels));
+ }
+ }
+
+ private async Task ReportMetricsAsync(CancellationToken cancellationToken)
+ {
+ if (!await _reportingSemaphore.WaitAsync(0, cancellationToken))
+ {
+ // Another reporting operation is in progress
+ return;
+ }
+
+ try
+ {
+ var request = new PublishMetricsRequestPayload
+ {
+ Metrics = new PublishMetricsRequestSchema
+ {
+ CounterMetrics = _counterCache.GetMetricsAndClear(_options.MaxBatchSize),
+ GaugeMetrics = _gaugeCache.GetMetricsAndClear(_options.MaxBatchSize),
+ HistogramMetrics = _histogramCache.GetMetricsAndClear(_options.MaxBatchSize)
+ }
+ };
+
+ // Only send if there are metrics to report
+ if (request.Metrics.CounterMetrics.Count > 0 ||
+ request.Metrics.GaugeMetrics.Count > 0 ||
+ request.Metrics.HistogramMetrics.Count > 0)
+ {
+ try
+ {
+ RpcCallAsync response = _observabilityService.PublishMetricsAsync(request);
+ }
+ catch (Exception ex)
+ {
+ // Log but don't throw
+ System.Diagnostics.Debug.WriteLine($"Error reporting metrics: {ex}");
+ }
+ }
+ }
+ finally
+ {
+ _reportingSemaphore.Release();
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ await StopAsync();
+
+ _timerSemaphore.Dispose();
+ _reportingSemaphore.Dispose();
+ _isDisposed = true;
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/DefaultTimerFactory.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/DefaultTimerFactory.cs
new file mode 100644
index 0000000000..03b99d907c
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/DefaultTimerFactory.cs
@@ -0,0 +1,12 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability.Utils;
+
+public class DefaultTimerFactory : ITimerFactory
+{
+ public ITimer CreateTimer()
+ {
+ return new TimerWrapper();
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimer.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimer.cs
new file mode 100644
index 0000000000..495f0a511c
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimer.cs
@@ -0,0 +1,10 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability.Utils;
+
+public interface ITimer : IAsyncDisposable
+{
+ void Start(Func callback, CancellationToken cancellationToken, TimeSpan dueTime, TimeSpan period);
+ Task StopAsync();
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimerFactory.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimerFactory.cs
new file mode 100644
index 0000000000..2d040f9c0a
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/ITimerFactory.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability.Utils;
+
+public interface ITimerFactory
+{
+ ITimer CreateTimer();
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/TimerWrapper.cs b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/TimerWrapper.cs
new file mode 100644
index 0000000000..4e74283d2a
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/Utils/TimerWrapper.cs
@@ -0,0 +1,40 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Azure.Iot.Operations.Services.Observability.Utils
+{
+ public class TimerWrapper : ITimer
+ {
+ private Timer? _timer;
+ private bool _isDisposed;
+
+ public void Start(Func callback, CancellationToken cancellationToken, TimeSpan dueTime, TimeSpan period)
+ {
+ _timer = new Timer(
+ async _ => await callback(CancellationToken.None),
+ null,
+ dueTime,
+ period);
+ }
+
+ public async Task StopAsync()
+ {
+ if (_timer != null)
+ {
+ await _timer.DisposeAsync();
+ _timer = null;
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ await StopAsync();
+ _isDisposed = true;
+ }
+ }
+}
diff --git a/dotnet/src/Azure.Iot.Operations.Services/Observability/gen.sh b/dotnet/src/Azure.Iot.Operations.Services/Observability/gen.sh
new file mode 100755
index 0000000000..e3130b2586
--- /dev/null
+++ b/dotnet/src/Azure.Iot.Operations.Services/Observability/gen.sh
@@ -0,0 +1,6 @@
+rm -rf ./AkriObservabilityService
+mkdir ./AkriObservabilityService
+../../../../codegen/src/Azure.Iot.Operations.ProtocolCompiler/bin/Debug/net9.0/Azure.Iot.Operations.ProtocolCompiler --modelFile ../../../../eng/dtdl/akri-observability-metrics-operations.json --lang csharp --outDir /tmp/Azure.Iot.Operations.Services.Observability
+cp -f /tmp/Azure.Iot.Operations.Services.Observability/AkriObservabilityService/*.cs AkriObservabilityService -v
+cp -f /tmp/Azure.Iot.Operations.Services.Observability/*.cs Common -v
+rm -rf /tmp/Azure.Iot.Operations.Services.Observability -v
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedCounterTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedCounterTests.cs
new file mode 100644
index 0000000000..4711b29d04
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedCounterTests.cs
@@ -0,0 +1,155 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class CachedCounterTests
+{
+ private readonly string _name = "test_counter";
+
+ private readonly Dictionary _labels = new()
+ {
+ { "service", "test" },
+ { "instance", "instance1" }
+ };
+
+ private readonly string _unit = "bytes";
+
+ [Fact]
+ public void Constructor_InitializesProperties()
+ {
+ // Arrange & Act
+ var counter = new CachedCounter(_name, _labels, _unit);
+
+ // Assert
+ Assert.Equal(_name, counter.Name);
+ Assert.Equal(_labels, counter.Labels);
+ Assert.Equal(_unit, counter.Unit);
+ }
+
+ [Fact]
+ public void Add_AddsOperationToQueue()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+ var value = 5.0;
+
+ // Act
+ counter.Add(value);
+
+ // Assert
+ var operations = counter.GetOperationsAndClear(10);
+ Assert.Single(operations);
+ Assert.Equal(value, operations[0].Value);
+ Assert.NotNull(operations[0].OperationId);
+ Assert.True(operations[0].Timestamp <= DateTime.UtcNow);
+ Assert.True(operations[0].Timestamp >= DateTime.UtcNow.AddMinutes(-1)); // Should be recent
+ }
+
+ [Fact]
+ public void MultipleOperations_AddedCorrectly()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+
+ // Act
+ counter.Add(3.0);
+ counter.Add(1.0);
+ counter.Add(7.5);
+
+ // Assert
+ var operations = counter.GetOperationsAndClear(10);
+ Assert.Equal(3, operations.Count);
+ Assert.Equal(3.0, operations[0].Value);
+ Assert.Equal(1.0, operations[1].Value);
+ Assert.Equal(7.5, operations[2].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_RespectsMaxCount()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+
+ // Add 5 operations
+ for (var i = 1; i <= 5; i++) counter.Add(i);
+
+ // Act - Get only 3 operations
+ var firstBatch = counter.GetOperationsAndClear(3);
+
+ // Assert
+ Assert.Equal(3, firstBatch.Count);
+ Assert.Equal(1.0, firstBatch[0].Value);
+ Assert.Equal(2.0, firstBatch[1].Value);
+ Assert.Equal(3.0, firstBatch[2].Value);
+
+ // Get remaining operations
+ var secondBatch = counter.GetOperationsAndClear(10);
+ Assert.Equal(2, secondBatch.Count);
+ Assert.Equal(4.0, secondBatch[0].Value);
+ Assert.Equal(5.0, secondBatch[1].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_EmptiesQueue()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+ counter.Add(1.0);
+ counter.Add(2.0);
+
+ // Act
+ var operations = counter.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, operations.Count);
+
+ // Verify queue is now empty
+ var emptyOperations = counter.GetOperationsAndClear(10);
+ Assert.Empty(emptyOperations);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+
+ // Act
+ var operations = counter.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Empty(operations);
+ }
+
+ [Fact]
+ public async Task Concurrent_Operations_ThreadSafe()
+ {
+ // Arrange
+ var counter = new CachedCounter(_name, _labels, _unit);
+ var operationCount = 1000;
+ var threads = 5;
+
+ // Act
+ var tasks = new List();
+ for (int t = 0; t < threads; t++)
+ {
+ tasks.Add(Task.Run(() =>
+ {
+ for (int i = 0; i < operationCount / threads; i++)
+ {
+ counter.Add(i);
+ }
+ }));
+ }
+
+ await Task.WhenAll(tasks.ToArray());
+
+ // Assert
+ var operations = counter.GetOperationsAndClear(operationCount);
+ Assert.Equal(operationCount, operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedGaugeTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedGaugeTests.cs
new file mode 100644
index 0000000000..bdc7859a67
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedGaugeTests.cs
@@ -0,0 +1,155 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class CachedGaugeTests
+{
+ private readonly string _name = "test_gauge";
+
+ private readonly Dictionary _labels = new()
+ {
+ { "service", "test" },
+ { "instance", "instance1" }
+ };
+
+ private readonly string _unit = "celsius";
+
+ [Fact]
+ public void Constructor_InitializesProperties()
+ {
+ // Arrange & Act
+ var gauge = new CachedGauge(_name, _labels, _unit);
+
+ // Assert
+ Assert.Equal(_name, gauge.Name);
+ Assert.Equal(_labels, gauge.Labels);
+ Assert.Equal(_unit, gauge.Unit);
+ }
+
+ [Fact]
+ public void Record_AddsOperationToQueue()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+ var value = 23.5;
+
+ // Act
+ gauge.Record(value);
+
+ // Assert
+ var operations = gauge.GetOperationsAndClear(10);
+ Assert.Single(operations);
+ Assert.Equal(value, operations[0].Value);
+ Assert.NotNull(operations[0].OperationId);
+ Assert.True(operations[0].Timestamp <= DateTime.UtcNow);
+ Assert.True(operations[0].Timestamp >= DateTime.UtcNow.AddMinutes(-1)); // Should be recent
+ }
+
+ [Fact]
+ public void MultipleOperations_AddedCorrectly()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+
+ // Act
+ gauge.Record(25.0);
+ gauge.Record(26.5);
+ gauge.Record(24.8);
+
+ // Assert
+ var operations = gauge.GetOperationsAndClear(10);
+ Assert.Equal(3, operations.Count);
+ Assert.Equal(25.0, operations[0].Value);
+ Assert.Equal(26.5, operations[1].Value);
+ Assert.Equal(24.8, operations[2].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_RespectsMaxCount()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+
+ // Add 5 operations
+ for (var i = 20; i < 25; i++) gauge.Record(i);
+
+ // Act - Get only 3 operations
+ var firstBatch = gauge.GetOperationsAndClear(3);
+
+ // Assert
+ Assert.Equal(3, firstBatch.Count);
+ Assert.Equal(20.0, firstBatch[0].Value);
+ Assert.Equal(21.0, firstBatch[1].Value);
+ Assert.Equal(22.0, firstBatch[2].Value);
+
+ // Get remaining operations
+ var secondBatch = gauge.GetOperationsAndClear(10);
+ Assert.Equal(2, secondBatch.Count);
+ Assert.Equal(23.0, secondBatch[0].Value);
+ Assert.Equal(24.0, secondBatch[1].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_EmptiesQueue()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+ gauge.Record(22.0);
+ gauge.Record(23.0);
+
+ // Act
+ var operations = gauge.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, operations.Count);
+
+ // Verify queue is now empty
+ var emptyOperations = gauge.GetOperationsAndClear(10);
+ Assert.Empty(emptyOperations);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+
+ // Act
+ var operations = gauge.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Empty(operations);
+ }
+
+ [Fact]
+ public async Task Concurrent_Operations_ThreadSafe()
+ {
+ // Arrange
+ var gauge = new CachedGauge(_name, _labels, _unit);
+ var operationCount = 1000;
+ var threads = 5;
+
+ // Act
+ var tasks = new List();
+ for (int t = 0; t < threads; t++)
+ {
+ tasks.Add(Task.Run(() =>
+ {
+ for (int i = 0; i < operationCount / threads; i++)
+ {
+ gauge.Record(i);
+ }
+ }));
+ }
+
+ await Task.WhenAll(tasks.ToArray());
+
+ // Assert
+ var operations = gauge.GetOperationsAndClear(operationCount);
+ Assert.Equal(operationCount, operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedHistogramTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedHistogramTests.cs
new file mode 100644
index 0000000000..a20ec22216
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CachedHistogramTests.cs
@@ -0,0 +1,155 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class CachedHistogramTests
+{
+ private readonly string _name = "test_histogram";
+
+ private readonly Dictionary _labels = new()
+ {
+ { "service", "test" },
+ { "instance", "instance1" }
+ };
+
+ private readonly string _unit = "milliseconds";
+
+ [Fact]
+ public void Constructor_InitializesProperties()
+ {
+ // Arrange & Act
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+
+ // Assert
+ Assert.Equal(_name, histogram.Name);
+ Assert.Equal(_labels, histogram.Labels);
+ Assert.Equal(_unit, histogram.Unit);
+ }
+
+ [Fact]
+ public void Record_AddsOperationToQueue()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+ var value = 15.7;
+
+ // Act
+ histogram.Record(value);
+
+ // Assert
+ var operations = histogram.GetOperationsAndClear(10);
+ Assert.Single(operations);
+ Assert.Equal(value, operations[0].Value);
+ Assert.NotNull(operations[0].OperationId);
+ Assert.True(operations[0].Timestamp <= DateTime.UtcNow);
+ Assert.True(operations[0].Timestamp >= DateTime.UtcNow.AddMinutes(-1)); // Should be recent
+ }
+
+ [Fact]
+ public void MultipleOperations_AddedCorrectly()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+
+ // Act
+ histogram.Record(12.5);
+ histogram.Record(18.7);
+ histogram.Record(21.3);
+
+ // Assert
+ var operations = histogram.GetOperationsAndClear(10);
+ Assert.Equal(3, operations.Count);
+ Assert.Equal(12.5, operations[0].Value);
+ Assert.Equal(18.7, operations[1].Value);
+ Assert.Equal(21.3, operations[2].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_RespectsMaxCount()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+
+ // Add 5 operations
+ for (var i = 10; i < 15; i++) histogram.Record(i);
+
+ // Act - Get only 3 operations
+ var firstBatch = histogram.GetOperationsAndClear(3);
+
+ // Assert
+ Assert.Equal(3, firstBatch.Count);
+ Assert.Equal(10.0, firstBatch[0].Value);
+ Assert.Equal(11.0, firstBatch[1].Value);
+ Assert.Equal(12.0, firstBatch[2].Value);
+
+ // Get remaining operations
+ var secondBatch = histogram.GetOperationsAndClear(10);
+ Assert.Equal(2, secondBatch.Count);
+ Assert.Equal(13.0, secondBatch[0].Value);
+ Assert.Equal(14.0, secondBatch[1].Value);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_EmptiesQueue()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+ histogram.Record(12.5);
+ histogram.Record(18.7);
+
+ // Act
+ var operations = histogram.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, operations.Count);
+
+ // Verify queue is now empty
+ var emptyOperations = histogram.GetOperationsAndClear(10);
+ Assert.Empty(emptyOperations);
+ }
+
+ [Fact]
+ public void GetOperationsAndClear_WithEmptyQueue_ReturnsEmptyList()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+
+ // Act
+ var operations = histogram.GetOperationsAndClear(10);
+
+ // Assert
+ Assert.Empty(operations);
+ }
+
+ [Fact]
+ public async Task Concurrent_Operations_ThreadSafe()
+ {
+ // Arrange
+ var histogram = new CachedHistogram(_name, _labels, _unit);
+ var operationCount = 1000;
+ var threads = 5;
+
+ // Act
+ var tasks = new List();
+ for (int t = 0; t < threads; t++)
+ {
+ tasks.Add(Task.Run(() =>
+ {
+ for (int i = 0; i < operationCount / threads; i++)
+ {
+ histogram.Record(i);
+ }
+ }));
+ }
+
+ await Task.WhenAll(tasks.ToArray());
+
+ // Assert
+ var operations = histogram.GetOperationsAndClear(operationCount);
+ Assert.Equal(operationCount, operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CounterMetricCacheTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CounterMetricCacheTests.cs
new file mode 100644
index 0000000000..53fcc6cd0f
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/CounterMetricCacheTests.cs
@@ -0,0 +1,183 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class CounterMetricCacheTests
+{
+ [Fact]
+ public void CreateCounter_InitializesCounter()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var name = "test_counter";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "bytes";
+
+ // Act
+ var counter = cache.CreateCounter(name, labels, unit);
+
+ // Assert
+ Assert.NotNull(counter);
+ Assert.Equal(name, counter.Name);
+ Assert.Equal(labels, counter.Labels);
+ Assert.Equal(unit, counter.Unit);
+ }
+
+ [Fact]
+ public void CreateCounter_SameNameAndLabels_ReturnsSameInstance()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var name = "test_counter";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "bytes";
+
+ // Act
+ var counter1 = cache.CreateCounter(name, labels, unit);
+ var counter2 = cache.CreateCounter(name, labels, unit);
+
+ // Assert
+ Assert.Same(counter1, counter2);
+ }
+
+ [Fact]
+ public void CreateCounter_DifferentNames_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "bytes";
+
+ // Act
+ var counter1 = cache.CreateCounter("counter1", labels, unit);
+ var counter2 = cache.CreateCounter("counter2", labels, unit);
+
+ // Assert
+ Assert.NotSame(counter1, counter2);
+ }
+
+ [Fact]
+ public void CreateCounter_SameNameDifferentLabels_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var name = "test_counter";
+ var unit = "bytes";
+ var labels1 = new Dictionary { { "service", "test1" } };
+ var labels2 = new Dictionary { { "service", "test2" } };
+
+ // Act
+ var counter1 = cache.CreateCounter(name, labels1, unit);
+ var counter2 = cache.CreateCounter(name, labels2, unit);
+
+ // Assert
+ Assert.NotSame(counter1, counter2);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_NoOperations_ReturnsEmptyList()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ cache.CreateCounter("counter", new Dictionary(), "bytes");
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Empty(metrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_WithOperations_ReturnsMetricsAndClears()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var name = "test_counter";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "bytes";
+
+ var counter = (ICounter)cache.CreateCounter(name, labels, unit);
+ counter.Add(5);
+ counter.Add(10);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(name, metrics[0].Definition.Name);
+ Assert.Equal(labels, metrics[0].Definition.Labels);
+ Assert.Equal(unit, metrics[0].Definition.Unit);
+ Assert.Equal(2, metrics[0].Operations.Count);
+ Assert.Equal(5, metrics[0].Operations[0].Value);
+ Assert.Equal(10, metrics[0].Operations[1].Value);
+
+ // Verify operations are cleared
+ var emptyMetrics = cache.GetMetricsAndClear(10);
+ Assert.Empty(emptyMetrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_MultipleCounters_ReturnsAllMetrics()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+
+ var counter1 = (ICounter)cache.CreateCounter("counter1", new Dictionary { { "service", "test1" } }, "bytes");
+ counter1.Add(5);
+
+ var counter2 = (ICounter)cache.CreateCounter("counter2", new Dictionary { { "service", "test2" } }, "requests");
+ counter2.Add(10);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, metrics.Count);
+
+ // Check first counter metrics
+ var metric1 = metrics.Find(m => m.Definition.Name == "counter1");
+ Assert.NotNull(metric1);
+ Assert.Single(metric1!.Operations);
+ Assert.Equal(5, metric1.Operations[0].Value);
+
+ // Check second counter metrics
+ var metric2 = metrics.Find(m => m.Definition.Name == "counter2");
+ Assert.NotNull(metric2);
+ Assert.Single(metric2!.Operations);
+ Assert.Equal(10, metric2.Operations[0].Value);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_RespectsMaxBatchSize()
+ {
+ // Arrange
+ var cache = new CounterMetricCache();
+ var name = "test_counter";
+ var labels = new Dictionary { { "service", "test" } };
+ var counter = (ICounter)cache.CreateCounter(name, labels, "bytes");
+
+ // Add 5 operations
+ for (int i = 1; i <= 5; i++)
+ {
+ counter.Add(i);
+ }
+
+ // Act - Get with max batch size of 3
+ var metrics = cache.GetMetricsAndClear(3);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(3, metrics[0].Operations.Count);
+
+ // Get remaining operations
+ var remainingMetrics = cache.GetMetricsAndClear(10);
+ Assert.Single(remainingMetrics);
+ Assert.Equal(2, remainingMetrics[0].Operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/GaugeMetricCacheTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/GaugeMetricCacheTests.cs
new file mode 100644
index 0000000000..eb15cf9f5b
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/GaugeMetricCacheTests.cs
@@ -0,0 +1,183 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class GaugeMetricCacheTests
+{
+ [Fact]
+ public void CreateGauge_InitializesGauge()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var name = "test_gauge";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "celsius";
+
+ // Act
+ var gauge = cache.CreateGauge(name, labels, unit);
+
+ // Assert
+ Assert.NotNull(gauge);
+ Assert.Equal(name, gauge.Name);
+ Assert.Equal(labels, gauge.Labels);
+ Assert.Equal(unit, gauge.Unit);
+ }
+
+ [Fact]
+ public void CreateGauge_SameNameAndLabels_ReturnsSameInstance()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var name = "test_gauge";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "celsius";
+
+ // Act
+ var gauge1 = cache.CreateGauge(name, labels, unit);
+ var gauge2 = cache.CreateGauge(name, labels, unit);
+
+ // Assert
+ Assert.Same(gauge1, gauge2);
+ }
+
+ [Fact]
+ public void CreateGauge_DifferentNames_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "celsius";
+
+ // Act
+ var gauge1 = cache.CreateGauge("gauge1", labels, unit);
+ var gauge2 = cache.CreateGauge("gauge2", labels, unit);
+
+ // Assert
+ Assert.NotSame(gauge1, gauge2);
+ }
+
+ [Fact]
+ public void CreateGauge_SameNameDifferentLabels_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var name = "test_gauge";
+ var unit = "celsius";
+ var labels1 = new Dictionary { { "service", "test1" } };
+ var labels2 = new Dictionary { { "service", "test2" } };
+
+ // Act
+ var gauge1 = cache.CreateGauge(name, labels1, unit);
+ var gauge2 = cache.CreateGauge(name, labels2, unit);
+
+ // Assert
+ Assert.NotSame(gauge1, gauge2);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_NoOperations_ReturnsEmptyList()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ cache.CreateGauge("gauge", new Dictionary(), "celsius");
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Empty(metrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_WithOperations_ReturnsMetricsAndClears()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var name = "test_gauge";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "celsius";
+
+ var gauge = (IGauge)cache.CreateGauge(name, labels, unit);
+ gauge.Record(22.5);
+ gauge.Record(23.8);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(name, metrics[0].Definition.Name);
+ Assert.Equal(labels, metrics[0].Definition.Labels);
+ Assert.Equal(unit, metrics[0].Definition.Unit);
+ Assert.Equal(2, metrics[0].Operations.Count);
+ Assert.Equal(22.5, metrics[0].Operations[0].Value);
+ Assert.Equal(23.8, metrics[0].Operations[1].Value);
+
+ // Verify operations are cleared
+ var emptyMetrics = cache.GetMetricsAndClear(10);
+ Assert.Empty(emptyMetrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_MultipleGauges_ReturnsAllMetrics()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+
+ var gauge1 = (IGauge)cache.CreateGauge("gauge1", new Dictionary { { "service", "test1" } }, "celsius");
+ gauge1.Record(22.5);
+
+ var gauge2 = (IGauge)cache.CreateGauge("gauge2", new Dictionary { { "service", "test2" } }, "fahrenheit");
+ gauge2.Record(72.0);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, metrics.Count);
+
+ // Check first gauge metrics
+ var metric1 = metrics.Find(m => m.Definition.Name == "gauge1");
+ Assert.NotNull(metric1);
+ Assert.Single(metric1!.Operations);
+ Assert.Equal(22.5, metric1.Operations[0].Value);
+
+ // Check second gauge metrics
+ var metric2 = metrics.Find(m => m.Definition.Name == "gauge2");
+ Assert.NotNull(metric2);
+ Assert.Single(metric2!.Operations);
+ Assert.Equal(72.0, metric2.Operations[0].Value);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_RespectsMaxBatchSize()
+ {
+ // Arrange
+ var cache = new GaugeMetricCache();
+ var name = "test_gauge";
+ var labels = new Dictionary { { "service", "test" } };
+ var gauge = (IGauge)cache.CreateGauge(name, labels, "celsius");
+
+ // Add 5 operations
+ for (int i = 20; i <= 24; i++)
+ {
+ gauge.Record(i);
+ }
+
+ // Act - Get with max batch size of 3
+ var metrics = cache.GetMetricsAndClear(3);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(3, metrics[0].Operations.Count);
+
+ // Get remaining operations
+ var remainingMetrics = cache.GetMetricsAndClear(10);
+ Assert.Single(remainingMetrics);
+ Assert.Equal(2, remainingMetrics[0].Operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/HistogramMetricCacheTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/HistogramMetricCacheTests.cs
new file mode 100644
index 0000000000..de680ccdf3
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/HistogramMetricCacheTests.cs
@@ -0,0 +1,183 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Services.Observability;
+using Xunit;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class HistogramMetricCacheTests
+{
+ [Fact]
+ public void CreateHistogram_InitializesHistogram()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var name = "test_histogram";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "milliseconds";
+
+ // Act
+ var histogram = cache.CreateHistogram(name, labels, unit);
+
+ // Assert
+ Assert.NotNull(histogram);
+ Assert.Equal(name, histogram.Name);
+ Assert.Equal(labels, histogram.Labels);
+ Assert.Equal(unit, histogram.Unit);
+ }
+
+ [Fact]
+ public void CreateHistogram_SameNameAndLabels_ReturnsSameInstance()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var name = "test_histogram";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "milliseconds";
+
+ // Act
+ var histogram1 = cache.CreateHistogram(name, labels, unit);
+ var histogram2 = cache.CreateHistogram(name, labels, unit);
+
+ // Assert
+ Assert.Same(histogram1, histogram2);
+ }
+
+ [Fact]
+ public void CreateHistogram_DifferentNames_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "milliseconds";
+
+ // Act
+ var histogram1 = cache.CreateHistogram("histogram1", labels, unit);
+ var histogram2 = cache.CreateHistogram("histogram2", labels, unit);
+
+ // Assert
+ Assert.NotSame(histogram1, histogram2);
+ }
+
+ [Fact]
+ public void CreateHistogram_SameNameDifferentLabels_ReturnsDifferentInstances()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var name = "test_histogram";
+ var unit = "milliseconds";
+ var labels1 = new Dictionary { { "service", "test1" } };
+ var labels2 = new Dictionary { { "service", "test2" } };
+
+ // Act
+ var histogram1 = cache.CreateHistogram(name, labels1, unit);
+ var histogram2 = cache.CreateHistogram(name, labels2, unit);
+
+ // Assert
+ Assert.NotSame(histogram1, histogram2);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_NoOperations_ReturnsEmptyList()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ cache.CreateHistogram("histogram", new Dictionary(), "milliseconds");
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Empty(metrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_WithOperations_ReturnsMetricsAndClears()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var name = "test_histogram";
+ var labels = new Dictionary { { "service", "test" } };
+ var unit = "milliseconds";
+
+ var histogram = (IHistogram)cache.CreateHistogram(name, labels, unit);
+ histogram.Record(15.2);
+ histogram.Record(25.7);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(name, metrics[0].Definition.Name);
+ Assert.Equal(labels, metrics[0].Definition.Labels);
+ Assert.Equal(unit, metrics[0].Definition.Unit);
+ Assert.Equal(2, metrics[0].Operations.Count);
+ Assert.Equal(15.2, metrics[0].Operations[0].Value);
+ Assert.Equal(25.7, metrics[0].Operations[1].Value);
+
+ // Verify operations are cleared
+ var emptyMetrics = cache.GetMetricsAndClear(10);
+ Assert.Empty(emptyMetrics);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_MultipleHistograms_ReturnsAllMetrics()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+
+ var histogram1 = (IHistogram)cache.CreateHistogram("histogram1", new Dictionary { { "service", "test1" } }, "milliseconds");
+ histogram1.Record(15.2);
+
+ var histogram2 = (IHistogram)cache.CreateHistogram("histogram2", new Dictionary { { "service", "test2" } }, "seconds");
+ histogram2.Record(1.5);
+
+ // Act
+ var metrics = cache.GetMetricsAndClear(10);
+
+ // Assert
+ Assert.Equal(2, metrics.Count);
+
+ // Check first histogram metrics
+ var metric1 = metrics.Find(m => m.Definition.Name == "histogram1");
+ Assert.NotNull(metric1);
+ Assert.Single(metric1!.Operations);
+ Assert.Equal(15.2, metric1.Operations[0].Value);
+
+ // Check second histogram metrics
+ var metric2 = metrics.Find(m => m.Definition.Name == "histogram2");
+ Assert.NotNull(metric2);
+ Assert.Single(metric2!.Operations);
+ Assert.Equal(1.5, metric2.Operations[0].Value);
+ }
+
+ [Fact]
+ public void GetMetricsAndClear_RespectsMaxBatchSize()
+ {
+ // Arrange
+ var cache = new HistogramMetricCache();
+ var name = "test_histogram";
+ var labels = new Dictionary { { "service", "test" } };
+ var histogram = (IHistogram)cache.CreateHistogram(name, labels, "milliseconds");
+
+ // Add 5 operations
+ for (int i = 10; i <= 14; i++)
+ {
+ histogram.Record(i);
+ }
+
+ // Act - Get with max batch size of 3
+ var metrics = cache.GetMetricsAndClear(3);
+
+ // Assert
+ Assert.Single(metrics);
+ Assert.Equal(3, metrics[0].Operations.Count);
+
+ // Get remaining operations
+ var remainingMetrics = cache.GetMetricsAndClear(10);
+ Assert.Single(remainingMetrics);
+ Assert.Equal(2, remainingMetrics[0].Operations.Count);
+ }
+}
diff --git a/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/MetricsReporterServiceTests.cs b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/MetricsReporterServiceTests.cs
new file mode 100644
index 0000000000..76a4bc35ba
--- /dev/null
+++ b/dotnet/test/Azure.Iot.Operations.Services.UnitTests/Observability/MetricsReporterServiceTests.cs
@@ -0,0 +1,265 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Azure.Iot.Operations.Protocol.RPC;
+using Azure.Iot.Operations.Services.Observability;
+using Azure.Iot.Operations.Services.Observability.AkriObservabilityService;
+using Azure.Iot.Operations.Services.Observability.Utils;
+using Moq;
+using Xunit;
+using ITimer = Azure.Iot.Operations.Services.Observability.Utils.ITimer;
+
+namespace Azure.Iot.Operations.Services.UnitTests.Observability;
+
+public class MetricsReporterServiceTests
+{
+ private readonly Mock _mockObservabilityService;
+ private readonly Mock _mockTimerFactory;
+ private readonly Mock _mockTimer;
+ private readonly MetricsReporterOptions _options;
+ private readonly Dictionary _defaultLabels;
+
+ public MetricsReporterServiceTests()
+ {
+ _mockObservabilityService = new Mock();
+ _mockTimerFactory = new Mock();
+ _mockTimer = new Mock();
+ _options = new MetricsReporterOptions
+ {
+ ReportingInterval = TimeSpan.FromSeconds(1),
+ MaxBatchSize = 100
+ };
+ _defaultLabels = new Dictionary { { "service", "test" } };
+
+ // Setup mock timer
+ _mockTimerFactory.Setup(f => f.CreateTimer()).Returns(_mockTimer.Object);
+ _mockTimer.Setup(t => t.Start(It.IsAny>(), It.IsAny(),
+ It.IsAny(), It.IsAny()))
+ .Callback, CancellationToken, TimeSpan, TimeSpan>((callback, token, dueTime, period) =>
+ {
+ // Store the callback for later invocation
+ _timerCallback = callback;
+ });
+ }
+
+ private Func? _timerCallback;
+
+ [Fact]
+ public void CreateCounter_ValidParameters_ReturnsCounter()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act
+ var counter = service.CreateCounter("test_counter", _defaultLabels, "bytes");
+
+ // Assert
+ Assert.NotNull(counter);
+ }
+
+ [Fact]
+ public void CreateGauge_ValidParameters_ReturnsGauge()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act
+ var gauge = service.CreateGauge("test_gauge", _defaultLabels, "seconds");
+
+ // Assert
+ Assert.NotNull(gauge);
+ }
+
+ [Fact]
+ public void CreateHistogram_ValidParameters_ReturnsHistogram()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act
+ var histogram = service.CreateHistogram("test_histogram", _defaultLabels, "bytes");
+
+ // Assert
+ Assert.NotNull(histogram);
+ }
+
+ [Fact]
+ public void Start_CreatesAndStartsTimer()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act
+ service.Start();
+
+ // Assert
+ _mockTimerFactory.Verify(f => f.CreateTimer(), Times.Once);
+ _mockTimer.Verify(t => t.Start(It.IsAny>(), It.IsAny(),
+ _options.ReportingInterval, _options.ReportingInterval), Times.Once);
+ }
+
+ [Fact]
+ public void Start_CalledMultipleTimes_OnlyOneTimerCreated()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act
+ service.Start();
+ service.Start();
+
+ // Assert
+ _mockTimerFactory.Verify(f => f.CreateTimer(), Times.Once);
+ _mockTimer.Verify(t => t.Start(
+ It.IsAny>(),
+ It.IsAny(),
+ _options.ReportingInterval,
+ _options.ReportingInterval),
+ Times.Once);
+ }
+
+ [Fact]
+ public async Task StopAsync_CalledBeforeStart_DoesNotThrow()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act & Assert
+ await service.StopAsync();
+ }
+
+ [Fact]
+ public async Task Stop_DisposesTimer()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+ service.Start();
+
+ // Act
+ await service.StopAsync();
+
+ // Assert
+ _mockTimer.Verify(t => t.DisposeAsync(), Times.Once);
+ }
+
+ [Fact]
+ public async Task ReportMetrics_SendsMetricsToObservabilityService()
+ {
+ // Arrange
+ _mockObservabilityService
+ .Setup(s => s.PublishMetricsAsync(It.IsAny()))
+ .Returns(new RpcCallAsync());
+
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Create and update some metrics
+ var counter = service.CreateCounter("test_counter", _defaultLabels);
+ counter.Add(5);
+
+ var gauge = service.CreateGauge("test_gauge", _defaultLabels);
+ gauge.Record(10);
+
+ var histogram = service.CreateHistogram("test_histogram", _defaultLabels);
+ histogram.Record(15);
+
+ service.Start();
+
+ // Act - Trigger the timer callback manually
+ if (_timerCallback != null)
+ {
+ await _timerCallback(CancellationToken.None);
+ }
+
+ // Assert
+ _mockObservabilityService.Verify(s => s.PublishMetricsAsync(
+ It.Is(p =>
+ p.Metrics.CounterMetrics!.Count > 0 &&
+ p.Metrics.GaugeMetrics!.Count > 0 &&
+ p.Metrics.HistogramMetrics!.Count > 0)),
+ Times.Once);
+
+ // verify all fields of the PublishMetricsRequestPayload and nested classes
+ _mockObservabilityService.Verify(s => s.PublishMetricsAsync(
+ It.Is(p =>
+ p.Metrics.CounterMetrics![0].Definition.Name == "test_counter" &&
+ p.Metrics.CounterMetrics![0].Definition.Labels["service"] == "test" &&
+ Math.Abs(p.Metrics.CounterMetrics![0].Operations[0].Value - 5) < 0.001 &&
+ p.Metrics.GaugeMetrics![0].Definition.Name == "test_gauge" &&
+ p.Metrics.GaugeMetrics![0].Definition.Labels["service"] == "test" &&
+ Math.Abs(p.Metrics.GaugeMetrics![0].Operations[0].Value - 10) < 0.001 &&
+ p.Metrics.HistogramMetrics![0].Definition.Name == "test_histogram" &&
+ p.Metrics.HistogramMetrics![0].Definition.Labels["service"] == "test" &&
+ Math.Abs(p.Metrics.HistogramMetrics![0].Operations[0].Value - 15) < 0.001)),
+ Times.Once);
+ }
+
+ [Fact]
+ public async Task ReportMetrics_ObservabilityServiceThrows_DoesNotPropagateException()
+ {
+ // Arrange
+ _mockObservabilityService
+ .Setup(s => s.PublishMetricsAsync(It.IsAny()))
+ .Throws(new InvalidOperationException("Test exception"));
+
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Create and update a metric
+ var counter = service.CreateCounter("test_counter", _defaultLabels);
+ counter.Add(5);
+
+ service.Start();
+
+ // Act & Assert - Should not throw
+ if (_timerCallback != null)
+ {
+ await _timerCallback(CancellationToken.None);
+ }
+
+ // Assert - Verify that throwing PublishMetricsAsync was called
+ _mockObservabilityService.Verify(s => s.PublishMetricsAsync(It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public void ValidateMetricParameters_WithInvalidData_ThrowsExpectedException()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+
+ // Act & Assert - Test with null name
+ Assert.Throws(() => service.CreateCounter(string.Empty, _defaultLabels));
+
+ // Act & Assert - Test with null labels
+ Assert.Throws(() => service.CreateCounter("test_counter", null!));
+ }
+
+ [Fact]
+ public void CreateMetric_WithIdenticalKeyButDifferentUnit_UsesFirstUnit()
+ {
+ // Arrange
+ var mockCache = new CounterMetricCache();
+ var name = "test_counter";
+ var labels = new Dictionary { { "service", "test" } };
+
+ // Act
+ var counter1 = mockCache.CreateCounter(name, labels, "bytes");
+ var counter2 = mockCache.CreateCounter(name, labels, "kilobytes"); // Same name and labels, different unit
+
+ // Assert
+ Assert.Same(counter1, counter2); // Should return the same instance
+ Assert.Equal("bytes", counter1.Unit); // Should use the first unit
+ }
+
+ [Fact]
+ public async Task Dispose_CleanupResources()
+ {
+ // Arrange
+ var service = new MetricsReporterService(_mockObservabilityService.Object, _options, _mockTimerFactory.Object);
+ service.Start();
+
+ // Act
+ await service.DisposeAsync();
+
+ // Assert
+ _mockTimer.Verify(t => t.DisposeAsync(), Times.Once);
+ }
+}
diff --git a/eng/dtdl/akri-observability-metrics-operations.json b/eng/dtdl/akri-observability-metrics-operations.json
new file mode 100644
index 0000000000..38bbad2609
--- /dev/null
+++ b/eng/dtdl/akri-observability-metrics-operations.json
@@ -0,0 +1,299 @@
+[
+ {
+ "@context": [
+ "dtmi:dtdl:context;4",
+ "dtmi:dtdl:extension:mqtt;2",
+ "dtmi:dtdl:extension:requirement;1"
+ ],
+ "@id": "dtmi:com:microsoft:akri:AkriObservabilityService;1",
+ "description": "Defines the MQTT-based contract for publishing telemetry metrics from Akri component in AIO.",
+ "@type": ["Interface", "Mqtt"],
+ "payloadFormat": "Json/ecma/404",
+ "commandTopic": "akri/observability/{ex:connectorClientId}/metrics",
+ "schemas": [
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:MetricDefinition;1",
+ "description": "Shared identifying and descriptive properties of a metric.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "name",
+ "description": "A unique, human-readable name that identifies the metric.",
+ "schema": "string"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "labels",
+ "description": "Key-value pairs that provide dimensional context for the metric.",
+ "schema": {
+ "@type": "Map",
+ "mapKey": {
+ "name": "labelKey",
+ "schema": "string"
+ },
+ "mapValue": {
+ "name": "labelValue",
+ "schema": "string"
+ }
+ }
+ },
+ {
+ "@type": "Field",
+ "name": "unit",
+ "description": "The unit of measurement associated with the metric (e.g., seconds, bytes).",
+ "schema": "string"
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:CounterMetric;1",
+ "description": "Defines a Counter metric and its associated Increment operations.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "definition",
+ "description": "Common identifying properties of the Counter metric.",
+ "schema": "dtmi:com:microsoft:akri:MetricDefinition;1"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "operations",
+ "description": "List of Increment operations for this Counter metric.",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:IncrementOperation;1"
+ }
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:GaugeMetric;1",
+ "description": "Defines a Gauge metric and its associated Record operations.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "definition",
+ "description": "Common identifying properties of the Gauge metric.",
+ "schema": "dtmi:com:microsoft:akri:MetricDefinition;1"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "operations",
+ "description": "List of Record operations for this Gauge metric.",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:RecordOperation;1"
+ }
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:HistogramMetric;1",
+ "description": "Defines a Histogram metric and its associated Record operations.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "definition",
+ "description": "Common identifying properties of the Histogram metric.",
+ "schema": "dtmi:com:microsoft:akri:MetricDefinition;1"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "operations",
+ "description": "List of Record operations for this Histogram metric.",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:RecordOperation;1"
+ }
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:IncrementOperation;1",
+ "description": "Represents a Increment operation used by Counter metric.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "operationId",
+ "description": "Unique identifier for the operation, used to correlate responses.",
+ "schema": "string"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "value",
+ "description": "The numeric value by which the counter will be incremented.",
+ "schema": "double"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "timestamp",
+ "description": "Timestamp indicating when the metric value was incremented.",
+ "schema": "dateTime"
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:RecordOperation;1",
+ "description": "Represents a Record operation used by Gauge and Histogram metrics.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "operationId",
+ "description": "Unique identifier for the operation, used to correlate responses.",
+ "schema": "string"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "value",
+ "description": "The numeric value to be recorded by the Gauge or Histogram metric.",
+ "schema": "double"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "timestamp",
+ "description": "Timestamp indicating when the metric value was recorded.",
+ "schema": "dateTime"
+ }
+ ]
+ },
+ {
+ "@type": "Object",
+ "@id": "dtmi:com:microsoft:akri:AkriMetricOperationResponse;2",
+ "description": "Represents the result of a single metric operation, including structured error information if applicable.",
+ "fields": [
+ {
+ "@type": ["Field", "Required"],
+ "name": "operationId",
+ "description": "ID of the operation this response refers to.",
+ "schema": "string"
+ },
+ {
+ "@type": ["Field", "Required"],
+ "name": "status",
+ "description": "Status of the operation: either Success or Error.",
+ "schema": "dtmi:com:microsoft:akri:AkriMetricOperationResponseStatus;1"
+ },
+ {
+ "@type": "Field",
+ "name": "errorKind",
+ "description": "Structured classification of the error type, included only if status is Error.",
+ "schema": "dtmi:com:microsoft:akri:AkriMetricErrorKind;1"
+ },
+ {
+ "@type": "Field",
+ "name": "propertyName",
+ "description": "Optional name of the field or conceptual field associated with the error.",
+ "schema": "string"
+ },
+ {
+ "@type": "Field",
+ "name": "errorMessage",
+ "description": "Optional human-readable description of the error.",
+ "schema": "string"
+ }
+ ]
+ },
+ {
+ "@type": "Enum",
+ "@id": "dtmi:com:microsoft:akri:AkriMetricOperationResponseStatus;1",
+ "valueSchema": "string",
+ "description": "Defines possible statuses of a metric operation.",
+ "enumValues": [
+ {
+ "name": "Success",
+ "enumValue": "Success"
+ },
+ {
+ "name": "Error",
+ "enumValue": "Error"
+ }
+ ]
+ },
+ {
+ "@type": "Enum",
+ "@id": "dtmi:com:microsoft:akri:AkriMetricErrorKind;1",
+ "valueSchema": "string",
+ "description": "Enumerates structured error types for failed metric operations.",
+ "enumValues": [
+ {
+ "name": "DuplicateOperationId",
+ "enumValue": "DuplicateOperationId",
+ "description": "Duplicate operation ID detected in the same batch of metrics."
+ },
+ {
+ "name": "ConflictingMetricDefinition",
+ "enumValue": "ConflictingMetricDefinition",
+ "description": "This error occurs when multiple metric definitions use the same combination of name and labels but differ in their structural characteristics. Common causes include publishing a Counter metric and a Gauge metric with the same name and label set, or attempting to redefine a metric with a different unit or incompatible schema."
+ },
+ {
+ "name": "TooManyUniqueLabelSets",
+ "enumValue": "TooManyUniqueLabelSets",
+ "description": "This error occurs when the number of unique combinations of label keys and values for a given metric exceeds the system’s configured or supported limit."
+ },
+ {
+ "name": "InternalError",
+ "enumValue": "InternalError",
+ "description": "This error occurs when an unexpected condition was encountered during the processing of a metric operation."
+ }
+ ]
+ }
+ ],
+ "contents": [
+ {
+ "@type": "Command",
+ "name": "publishMetrics",
+ "description": "Publishes a list of metrics, each of which is a Counter, Gauge, or Histogram.",
+ "request": {
+ "name": "metrics",
+ "description": "List of typed metrics to publish.",
+ "schema": {
+ "@type": "Object",
+ "fields": [
+ {
+ "@type": ["Field"],
+ "name": "counterMetrics",
+ "description": "List of counter metrics to be published.",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:CounterMetric;1"
+ }
+ },
+ {
+ "@type": ["Field"],
+ "name": "gaugeMetrics",
+ "description": "List of gauge metrics to be published",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:GaugeMetric;1"
+ }
+ },
+ {
+ "@type": ["Field"],
+ "name": "histogramMetrics",
+ "description": "List of histogram metrics to be published",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:HistogramMetric;1"
+ }
+ }
+ ]
+ }
+ },
+ "response": {
+ "name": "publishMetricsResponse",
+ "description": "Response indicating the success or failure of each operation.",
+ "schema": {
+ "@type": "Array",
+ "elementSchema": "dtmi:com:microsoft:akri:AkriMetricOperationResponse;2"
+ }
+ }
+ }
+ ]
+ }
+]