Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions global.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": {
"version": "8.0.100",
"version": "9.0.305",
"rollForward": "latestFeature"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" Condition="'$(TargetFramework)' != 'net462'" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.0" Condition="'$(TargetFramework)' == 'net462'" />
<PackageReference Include="AWSSDK.Core" Version="3.7.400" />
<PackageReference Include="AWSSDK.CloudWatchLogs" Version="3.7.400" />
<PackageReference Include="AWSSDK.XRay" Version="3.7.300" />
<PackageReference Include="Amazon.Lambda.Serialization.Json" Version="2.2.4" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
Expand All @@ -36,6 +37,8 @@
<PackageReference Include="OpenTelemetry.Sampler.AWS" Version="0.1.0-alpha.3" />
<PackageReference Include="OpenTelemetry.SemanticConventions" Version="1.0.0-rc9.9" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.556">
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Threading;
using System.Threading.Tasks;
using Amazon.CloudWatchLogs;
using OpenTelemetry.Metrics;

namespace AWS.Distro.OpenTelemetry.AutoInstrumentation.Exporters.Aws.Metrics
{
/// <summary>
/// OpenTelemetry metrics exporter for CloudWatch EMF format.
///
/// This exporter converts OTel metrics into CloudWatch EMF logs which are then
/// sent to CloudWatch Logs. CloudWatch Logs automatically extracts the metrics
/// from the EMF logs.
/// </summary>
public class AwsCloudWatchEmfExporter : EmfExporterBase
{
private readonly CloudWatchLogsClient _logClient;

public AwsCloudWatchEmfExporter(
string namespaceName = "default",
string logGroupName = "aws/otel/metrics",
string? logStreamName = null,
AmazonCloudWatchLogsConfig? cloudWatchLogsConfig = null)
: base(namespaceName)
{
_logClient = new CloudWatchLogsClient(logGroupName, logStreamName, cloudWatchLogsConfig);
}

/// <summary>
/// Send a log event to CloudWatch Logs using the log client.
/// </summary>
protected override async Task SendLogEventAsync(LogEvent logEvent)
{
await _logClient.SendLogEventAsync(logEvent);
}

/// <summary>
/// Force flush any pending metrics.
/// </summary>
public override async Task ForceFlushAsync(CancellationToken cancellationToken)
{
await _logClient.FlushPendingEventsAsync();
}

/// <summary>
/// Shutdown the exporter.
/// </summary>
public override async Task ShutdownAsync(CancellationToken cancellationToken)
{
await ForceFlushAsync(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Amazon.CloudWatchLogs;
using Amazon.CloudWatchLogs.Model;

namespace AWS.Distro.OpenTelemetry.AutoInstrumentation.Exporters.Aws.Metrics
{
/// <summary>
/// Container for a batch of CloudWatch log events with metadata.
/// </summary>
public class LogEventBatch
{
public List<InputLogEvent> LogEvents { get; } = new();
public int ByteTotal { get; set; }
public long MinTimestampMs { get; set; }
public long MaxTimestampMs { get; set; }
public long CreatedTimestampMs { get; }

public LogEventBatch()
{
CreatedTimestampMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}

/// <summary>
/// Add a log event to the batch.
/// </summary>
public void AddEvent(InputLogEvent logEvent, int eventSize)
{
LogEvents.Add(logEvent);
ByteTotal += eventSize;

var timestampMs = ((DateTimeOffset)logEvent.Timestamp).ToUnixTimeMilliseconds();
if (MinTimestampMs == 0 || timestampMs < MinTimestampMs)
{
MinTimestampMs = timestampMs;
}
if (timestampMs > MaxTimestampMs)
{
MaxTimestampMs = timestampMs;
}
}

/// <summary>
/// Check if the batch is empty.
/// </summary>
public bool IsEmpty() => LogEvents.Count == 0;

/// <summary>
/// Get the number of events in the batch.
/// </summary>
public int Size() => LogEvents.Count;

/// <summary>
/// Clear the batch.
/// </summary>
public void Clear()
{
LogEvents.Clear();
ByteTotal = 0;
MinTimestampMs = 0;
MaxTimestampMs = 0;
}
}

/// <summary>
/// CloudWatch Logs client for batching and sending log events.
/// </summary>
public class CloudWatchLogsClient
{
// CloudWatch Logs limits
public const int CwMaxEventPayloadBytes = 256 * 1024; // 256KB
public const int CwMaxRequestEventCount = 10000;
public const int CwPerEventHeaderBytes = 26;
public const int BatchFlushInterval = 60 * 1000; // 60 seconds
public const int CwMaxRequestPayloadBytes = 1 * 1024 * 1024; // 1MB
public const string CwTruncatedSuffix = "[Truncated...]";
public const long CwEventTimestampLimitPast = 14 * 24 * 60 * 60 * 1000; // 14 days
public const long CwEventTimestampLimitFuture = 2 * 60 * 60 * 1000; // 2 hours

private readonly string _logGroupName;
private readonly string _logStreamName;
private readonly IAmazonCloudWatchLogs _logsClient;
private LogEventBatch? _eventBatch;

public CloudWatchLogsClient(string logGroupName, string? logStreamName = null, AmazonCloudWatchLogsConfig? config = null)
{
_logGroupName = logGroupName;
_logStreamName = logStreamName ?? GenerateLogStreamName();
_logsClient = new AmazonCloudWatchLogsClient(config ?? new AmazonCloudWatchLogsConfig());
}

/// <summary>
/// Generate a unique log stream name.
/// </summary>
private static string GenerateLogStreamName()
{
var uniqueId = Guid.NewGuid().ToString("N")[..8];
return $"otel-dotnet-{uniqueId}";
}

/// <summary>
/// Ensure the log group exists, create if it doesn't.
/// </summary>
private async Task EnsureLogGroupExistsAsync()
{
try
{
await _logsClient.CreateLogGroupAsync(new CreateLogGroupRequest
{
LogGroupName = _logGroupName
});
}
catch (ResourceAlreadyExistsException)
{
// Log group already exists, which is fine
}
}

/// <summary>
/// Ensure the log stream exists, create if it doesn't.
/// </summary>
private async Task EnsureLogStreamExistsAsync()
{
try
{
await _logsClient.CreateLogStreamAsync(new CreateLogStreamRequest
{
LogGroupName = _logGroupName,
LogStreamName = _logStreamName
});
}
catch (ResourceAlreadyExistsException)
{
// Log stream already exists, which is fine
}
}

/// <summary>
/// Validate the log event according to CloudWatch Logs constraints.
/// </summary>
private bool ValidateLogEvent(InputLogEvent logEvent)
{
if (string.IsNullOrWhiteSpace(logEvent.Message))
{
return false;
}

// Check message size
var messageSize = logEvent.Message.Length + CwPerEventHeaderBytes;
if (messageSize > CwMaxEventPayloadBytes)
{
var maxMessageSize = CwMaxEventPayloadBytes - CwPerEventHeaderBytes - CwTruncatedSuffix.Length;
logEvent.Message = logEvent.Message[..maxMessageSize] + CwTruncatedSuffix;
}

// Check timestamp constraints
var currentTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var logEventTimestampMs = ((DateTimeOffset)logEvent.Timestamp).ToUnixTimeMilliseconds();
var timeDiff = currentTime - logEventTimestampMs;

if (timeDiff > CwEventTimestampLimitPast || timeDiff < -CwEventTimestampLimitFuture)
{
return false;
}

return true;
}

/// <summary>
/// Create a new log event batch.
/// </summary>
private static LogEventBatch CreateEventBatch() => new();

/// <summary>
/// Check if adding the next event would exceed CloudWatch Logs limits.
/// </summary>
private static bool EventBatchExceedsLimit(LogEventBatch batch, int nextEventSize)
{
return batch.Size() >= CwMaxRequestEventCount ||
batch.ByteTotal + nextEventSize > CwMaxRequestPayloadBytes;
}

/// <summary>
/// Check if the event batch is active and can accept the event.
/// </summary>
private static bool IsBatchActive(LogEventBatch batch, long targetTimestampMs)
{
// New log event batch
if (batch.MinTimestampMs == 0 || batch.MaxTimestampMs == 0)
{
return true;
}

// Check if adding the event would make the batch span more than 24 hours
if (targetTimestampMs - batch.MinTimestampMs > 24 * 3600 * 1000 ||
batch.MaxTimestampMs - targetTimestampMs > 24 * 3600 * 1000)
{
return false;
}

// Flush the event batch when reached 60s interval
var currentTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
return currentTime - batch.CreatedTimestampMs < BatchFlushInterval;
}

/// <summary>
/// Sort log events in the batch by timestamp.
/// </summary>
private static void SortLogEvents(LogEventBatch batch)
{
batch.LogEvents.Sort((a, b) => a.Timestamp.CompareTo(b.Timestamp));
}

/// <summary>
/// Send a batch of log events to CloudWatch Logs.
/// </summary>
private async Task<PutLogEventsResponse?> SendLogBatchAsync(LogEventBatch batch)
{
if (batch.IsEmpty())
{
return null;
}

SortLogEvents(batch);

var request = new PutLogEventsRequest
{
LogGroupName = _logGroupName,
LogStreamName = _logStreamName,
LogEvents = batch.LogEvents
};

try
{
return await _logsClient.PutLogEventsAsync(request);
}
catch (ResourceNotFoundException)
{
// Create log group and stream, then retry
await EnsureLogGroupExistsAsync();
await EnsureLogStreamExistsAsync();
return await _logsClient.PutLogEventsAsync(request);
}
}

/// <summary>
/// Send a log event to CloudWatch Logs.
/// </summary>
public async Task SendLogEventAsync(LogEvent logEvent)
{
var inputLogEvent = new InputLogEvent
{
Message = logEvent.Message,
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(logEvent.Timestamp).UtcDateTime
};

if (!ValidateLogEvent(inputLogEvent))
{
return;
}

var eventSize = inputLogEvent.Message.Length + CwPerEventHeaderBytes;

// Initialize event batch if needed
_eventBatch ??= CreateEventBatch();

// Check if we need to send the current batch and create a new one
var currentBatch = _eventBatch;
var inputLogEventTimestampMs = ((DateTimeOffset)inputLogEvent.Timestamp).ToUnixTimeMilliseconds();
if (EventBatchExceedsLimit(currentBatch, eventSize) ||
!IsBatchActive(currentBatch, inputLogEventTimestampMs))
{
await SendLogBatchAsync(currentBatch);
_eventBatch = CreateEventBatch();
currentBatch = _eventBatch;
}

// Add the log event to the batch
currentBatch.AddEvent(inputLogEvent, eventSize);
}

/// <summary>
/// Force flush any pending log events.
/// </summary>
public async Task FlushPendingEventsAsync()
{
if (_eventBatch != null && !_eventBatch.IsEmpty())
{
var currentBatch = _eventBatch;
_eventBatch = CreateEventBatch();
await SendLogBatchAsync(currentBatch);
}
}
}
}
Loading
Loading