Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aws/logs_monitoring/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from logs.datadog_matcher import DatadogMatcher
from logs.datadog_scrubber import DatadogScrubber
from logs.helpers import add_retry_tag
from retry import create_storage
from retry.enums import RetryPrefix
from retry.storage import Storage
from settings import (
DD_API_KEY,
DD_FORWARD_LOG,
Expand All @@ -41,7 +41,7 @@ def __init__(self, function_prefix):
self.trace_connection = TraceConnection(
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
)
self.storage = Storage(function_prefix)
self.storage = create_storage(function_prefix)

def forward(self, logs, metrics, traces):
"""
Expand Down
20 changes: 20 additions & 0 deletions aws/logs_monitoring/retry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from retry.base_storage import BaseStorage
from settings import DD_SQS_QUEUE_URL


def create_storage(function_prefix) -> BaseStorage:
"""Select the appropriate storage backend based on configuration.
If DD_SQS_QUEUE_URL is set, use SQS. Otherwise, fall back to S3.
The S3 backend may be initialized with an empty bucket name when the
retry feature is disabled (DD_STORE_FAILED_EVENTS=false) — this is
safe because storage methods are only called when retry is enabled.
"""
if DD_SQS_QUEUE_URL:
from retry.sqs_storage import SQSStorage

return SQSStorage(function_prefix)

from retry.storage import S3Storage

return S3Storage(function_prefix)
18 changes: 18 additions & 0 deletions aws/logs_monitoring/retry/base_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import ABC, abstractmethod


class BaseStorage(ABC):
@abstractmethod
def get_data(self, prefix) -> dict:
"""Retrieve stored data for a given prefix. Returns {key: data}."""
...

@abstractmethod
def store_data(self, prefix, data) -> None:
"""Store data under the given prefix."""
...

@abstractmethod
def delete_data(self, key) -> None:
"""Delete stored data by key."""
...
170 changes: 170 additions & 0 deletions aws/logs_monitoring/retry/sqs_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import json
import logging
import os

import boto3
from botocore.exceptions import ClientError

from retry.base_storage import BaseStorage
from settings import DD_SQS_QUEUE_URL

logger = logging.getLogger(__name__)
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))

# SQS max message size is 256KB; use 240KB to leave room for attributes/overhead
SQS_MAX_CHUNK_BYTES = 240 * 1024
SQS_MAX_MESSAGES_PER_RECEIVE = 10
SQS_MAX_POLL_ITERATIONS = 10


class SQSStorage(BaseStorage):
def __init__(self, function_prefix):
self.queue_url = DD_SQS_QUEUE_URL
self.sqs_client = boto3.client("sqs")
self.function_prefix = function_prefix

def get_data(self, prefix):
"""Poll SQS for messages matching prefix and function_prefix.

Returns {receipt_handle: data} for matching messages.
Non-matching messages are released immediately by resetting their
visibility timeout to 0.
"""
key_data = {}

for _ in range(SQS_MAX_POLL_ITERATIONS):
try:
response = self.sqs_client.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=SQS_MAX_MESSAGES_PER_RECEIVE,
MessageAttributeNames=["retry_prefix", "function_prefix"],
WaitTimeSeconds=0,
)
except ClientError as e:
logger.error(f"Failed to receive SQS messages: {e}")
break

messages = response.get("Messages", [])
if not messages:
break

for message in messages:
receipt_handle = message["ReceiptHandle"]
msg_retry_prefix = self._get_message_attr(message, "retry_prefix")
msg_function_prefix = self._get_message_attr(message, "function_prefix")

if (
msg_retry_prefix != str(prefix)
or msg_function_prefix != self.function_prefix
):
self._release_message(receipt_handle)
continue

data = self._deserialize(message["Body"])
if data is not None:
key_data[receipt_handle] = data

if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Found {len(key_data)} SQS retry messages for prefix {prefix}"
)

return key_data

def store_data(self, prefix, data):
"""Store data as one or more SQS messages, chunking to stay under the size limit."""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Storing retry data to SQS for prefix {prefix}")

chunks = self._chunk_data(data)
for chunk in chunks:
serialized = self._serialize(chunk)
try:
self.sqs_client.send_message(
QueueUrl=self.queue_url,
MessageBody=serialized,
MessageAttributes={
"retry_prefix": {
"DataType": "String",
"StringValue": str(prefix),
},
"function_prefix": {
"DataType": "String",
"StringValue": self.function_prefix,
},
},
)
except ClientError as e:
logger.error(f"Failed to send SQS message for prefix {prefix}: {e}")

def delete_data(self, key):
"""Delete a message by receipt handle. Idempotent — logs and swallows errors."""
try:
self.sqs_client.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=key,
)
except ClientError as e:
logger.error(f"Failed to delete SQS message (receipt={key}): {e}")

def _release_message(self, receipt_handle):
"""Make a non-matching message immediately visible to other consumers."""
try:
self.sqs_client.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=0,
)
except ClientError as e:
logger.error(f"Failed to release SQS message: {e}")

@staticmethod
def _get_message_attr(message, attr_name):
"""Extract a string attribute value from an SQS message."""
attrs = message.get("MessageAttributes", {})
return attrs.get(attr_name, {}).get("StringValue")

def _chunk_data(self, data):
"""Split a list of items into chunks that each fit under SQS_MAX_CHUNK_BYTES."""
if not isinstance(data, list):
return [data]

chunks = []
current_chunk = []
current_size = 2 # account for JSON array brackets "[]"

for item in data:
item_json = json.dumps(item, ensure_ascii=False)
item_size = len(item_json.encode("UTF-8"))
# +1 for the comma separator between items
separator_size = 1 if current_chunk else 0

if current_size + separator_size + item_size > SQS_MAX_CHUNK_BYTES:
if current_chunk:
chunks.append(current_chunk)
if 2 + item_size > SQS_MAX_CHUNK_BYTES:
logger.warning(
f"Single item exceeds SQS message size limit "
f"({item_size} bytes > {SQS_MAX_CHUNK_BYTES} bytes). "
f"SQS send will fail for this chunk."
)
current_chunk = [item]
current_size = 2 + item_size
else:
current_chunk.append(item)
current_size += separator_size + item_size

if current_chunk:
chunks.append(current_chunk)

return chunks or [data]

def _serialize(self, data):
return json.dumps(data, ensure_ascii=False)

def _deserialize(self, data):
try:
return json.loads(data)
except (json.JSONDecodeError, TypeError) as e:
logger.error(f"Failed to deserialize SQS message body: {e}")
return None
5 changes: 3 additions & 2 deletions aws/logs_monitoring/retry/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import boto3
from botocore.exceptions import ClientError

from retry.base_storage import BaseStorage
from settings import DD_S3_BUCKET_NAME, DD_S3_RETRY_DIRNAME

logger = logging.getLogger(__name__)
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))


class Storage(object):
class S3Storage(BaseStorage):
def __init__(self, function_prefix):
self.bucket_name = DD_S3_BUCKET_NAME
self.s3_client = boto3.client("s3")
Expand Down Expand Up @@ -81,7 +82,7 @@ def _get_key_prefix(self, retry_prefix):
return f"{DD_S3_RETRY_DIRNAME}/{self.function_prefix}/{str(retry_prefix)}/"

def _serialize(self, data):
return bytes(json.dumps(data).encode("UTF-8"))
return json.dumps(data).encode("UTF-8")

def _deserialize(self, data):
return json.loads(data.decode("UTF-8"))
9 changes: 5 additions & 4 deletions aws/logs_monitoring/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def is_api_key_valid():

# Check if the API key is the correct number of characters
if len(DD_API_KEY) != 32:
raise Exception(f"""
Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}.
Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys
""")
raise Exception(
f"Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}. "
f"Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys"
)

# Validate the API key
logger.debug("Validating the Datadog API key")
Expand Down Expand Up @@ -379,3 +379,4 @@ def get_enrich_cloudwatch_tags():
DD_S3_RETRY_DIRNAME = "failed_events"
DD_RETRY_KEYWORD = "retry"
DD_STORE_FAILED_EVENTS = get_env_var("DD_STORE_FAILED_EVENTS", "false", boolean=True)
DD_SQS_QUEUE_URL = get_env_var("DD_SQS_QUEUE_URL", default=None)
10 changes: 2 additions & 8 deletions aws/logs_monitoring/steps/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,6 @@ def normalize_events(events, metadata):


def collect_and_count(events):
collected = []
counter = 0
for event in events:
counter += 1
collected.append(event)

send_event_metric("incoming_events", counter)

collected = list(events)
send_event_metric("incoming_events", len(collected))
return collected
30 changes: 29 additions & 1 deletion aws/logs_monitoring/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ Parameters:
Type: Number
Default: 6
Description: Interval in hours for scheduled forwarder invocation (via AWS EventBridge).
DdSqsQueueUrl:
Type: String
Default: ""
Description: URL of an existing SQS queue for failed event storage (alternative to S3). When set, the forwarder uses SQS instead of S3 for retry storage.
DdForwarderExistingBucketName:
Type: String
Default: ""
Expand Down Expand Up @@ -404,6 +408,11 @@ Conditions:
- !Condition CreateS3Bucket
- !Not
- !Equals [!Ref DdForwarderExistingBucketName, ""]
SetDdSqsQueueUrl: !Not
- !Equals [!Ref DdSqsQueueUrl, ""]
HasStorageBackend: !Or
- !Condition SetForwarderBucket
- !Condition SetDdSqsQueueUrl
SetVpcSecurityGroupIds: !Not
- !Equals [!Join ["", !Ref VPCSecurityGroupIds], ""]
SetVpcSubnetIds: !Not
Expand Down Expand Up @@ -531,9 +540,13 @@ Resources:
- !Ref DdPort
- !Ref AWS::NoValue
DD_STORE_FAILED_EVENTS: !If
- SetForwarderBucket
- HasStorageBackend
- !Ref DdStoreFailedEvents
- !Ref AWS::NoValue
DD_SQS_QUEUE_URL: !If
- SetDdSqsQueueUrl
- !Ref DdSqsQueueUrl
- !Ref AWS::NoValue
REDACT_IP: !If
- SetRedactIp
- !Ref RedactIp
Expand Down Expand Up @@ -770,6 +783,20 @@ Resources:
- !Ref SqsQueueArnList
- "*"
Effect: Allow
- !If
- SetDdSqsQueueUrl # Access SQS queue for failed event storage
- Action:
- sqs:SendMessage
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:ChangeMessageVisibility
Resource: !Sub
- "arn:${AWS::Partition}:sqs:${Region}:${Account}:${QueueName}"
- Region: !Select [1, !Split [".", !Select [2, !Split ["/", !Ref DdSqsQueueUrl]]]]
Account: !Select [3, !Split ["/", !Ref DdSqsQueueUrl]]
QueueName: !Select [4, !Split ["/", !Ref DdSqsQueueUrl]]
Effect: Allow
- !Ref AWS::NoValue
Tags:
- Value: !FindInMap [Constants, DdForwarder, Version]
Key: dd_forwarder_version
Expand Down Expand Up @@ -1159,6 +1186,7 @@ Metadata:
- DdForwarderExistingBucketName
- DdForwarderBucketName
- DdStoreFailedEvents
- DdSqsQueueUrl
- DdLogLevel
ParameterLabels:
DdApiKey:
Expand Down
Loading
Loading