Skip to content

Commit f52fbe7

Browse files
LorisFriedelclaude
andauthored
feat(forwarder): add SQS support as DLQ backend for failed events (#1063)
* feat(forwarder): add SQS support as DLQ backend for failed events Add AWS SQS as an alternative to S3 for storing failed events during retry. Users can set DD_SQS_QUEUE_URL to use a pre-existing SQS queue instead of S3. The implementation uses a pluggable storage backend pattern with BaseStorage ABC, S3Storage (existing, renamed), and new SQSStorage. A factory function selects the backend based on config. SQS design: single queue with MessageAttributes for prefix separation, data chunked to fit 256KB limit, bounded polling (10 iterations), idempotent deletion via ReceiptHandle, non-matching messages released immediately via ChangeMessageVisibility. CloudFormation template updated with DdSqsQueueUrl parameter, scoped IAM permissions (ARN derived from queue URL), and HasStorageBackend condition for DD_STORE_FAILED_EVENTS. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(forwarder): prevent factory ValueError crash on existing deployments The create_storage() factory raised ValueError when neither DD_SQS_QUEUE_URL nor DD_S3_BUCKET_NAME was configured. This would crash existing deployments where retry is disabled (the default) but no storage backend is set — the old Storage() class tolerated this because S3 API errors only surfaced when storage was actually used. Fix: always fall back to S3Storage when SQS is not configured, matching the original behavior. Also add a logger.warning when a single item exceeds the SQS 240KB chunk limit, to aid debugging. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(forwarder): simplify retry module and fix black formatting Reduce nesting, extract helper, and fix f-string formatting for CI lint. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(forwarder): fix black formatting for API key validation error message Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(forwarder): simplify code in retry and parsing modules Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(forwarder): remove unnecessary backward-compatible Storage alias The alias is no longer needed since all production code now uses the factory method and base class pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cbfd512 commit f52fbe7

File tree

11 files changed

+591
-17
lines changed

11 files changed

+591
-17
lines changed

aws/logs_monitoring/forwarder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from logs.datadog_matcher import DatadogMatcher
1515
from logs.datadog_scrubber import DatadogScrubber
1616
from logs.helpers import add_retry_tag
17+
from retry import create_storage
1718
from retry.enums import RetryPrefix
18-
from retry.storage import Storage
1919
from settings import (
2020
DD_API_KEY,
2121
DD_FORWARD_LOG,
@@ -41,7 +41,7 @@ def __init__(self, function_prefix):
4141
self.trace_connection = TraceConnection(
4242
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
4343
)
44-
self.storage = Storage(function_prefix)
44+
self.storage = create_storage(function_prefix)
4545

4646
def forward(self, logs, metrics, traces):
4747
"""
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from retry.base_storage import BaseStorage
2+
from settings import DD_SQS_QUEUE_URL
3+
4+
5+
def create_storage(function_prefix) -> BaseStorage:
6+
"""Select the appropriate storage backend based on configuration.
7+
8+
If DD_SQS_QUEUE_URL is set, use SQS. Otherwise, fall back to S3.
9+
The S3 backend may be initialized with an empty bucket name when the
10+
retry feature is disabled (DD_STORE_FAILED_EVENTS=false) — this is
11+
safe because storage methods are only called when retry is enabled.
12+
"""
13+
if DD_SQS_QUEUE_URL:
14+
from retry.sqs_storage import SQSStorage
15+
16+
return SQSStorage(function_prefix)
17+
18+
from retry.storage import S3Storage
19+
20+
return S3Storage(function_prefix)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from abc import ABC, abstractmethod
2+
3+
4+
class BaseStorage(ABC):
5+
@abstractmethod
6+
def get_data(self, prefix) -> dict:
7+
"""Retrieve stored data for a given prefix. Returns {key: data}."""
8+
...
9+
10+
@abstractmethod
11+
def store_data(self, prefix, data) -> None:
12+
"""Store data under the given prefix."""
13+
...
14+
15+
@abstractmethod
16+
def delete_data(self, key) -> None:
17+
"""Delete stored data by key."""
18+
...
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import json
2+
import logging
3+
import os
4+
5+
import boto3
6+
from botocore.exceptions import ClientError
7+
8+
from retry.base_storage import BaseStorage
9+
from settings import DD_SQS_QUEUE_URL
10+
11+
logger = logging.getLogger(__name__)
12+
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
13+
14+
# SQS max message size is 256KB; use 240KB to leave room for attributes/overhead
15+
SQS_MAX_CHUNK_BYTES = 240 * 1024
16+
SQS_MAX_MESSAGES_PER_RECEIVE = 10
17+
SQS_MAX_POLL_ITERATIONS = 10
18+
19+
20+
class SQSStorage(BaseStorage):
21+
def __init__(self, function_prefix):
22+
self.queue_url = DD_SQS_QUEUE_URL
23+
self.sqs_client = boto3.client("sqs")
24+
self.function_prefix = function_prefix
25+
26+
def get_data(self, prefix):
27+
"""Poll SQS for messages matching prefix and function_prefix.
28+
29+
Returns {receipt_handle: data} for matching messages.
30+
Non-matching messages are released immediately by resetting their
31+
visibility timeout to 0.
32+
"""
33+
key_data = {}
34+
35+
for _ in range(SQS_MAX_POLL_ITERATIONS):
36+
try:
37+
response = self.sqs_client.receive_message(
38+
QueueUrl=self.queue_url,
39+
MaxNumberOfMessages=SQS_MAX_MESSAGES_PER_RECEIVE,
40+
MessageAttributeNames=["retry_prefix", "function_prefix"],
41+
WaitTimeSeconds=0,
42+
)
43+
except ClientError as e:
44+
logger.error(f"Failed to receive SQS messages: {e}")
45+
break
46+
47+
messages = response.get("Messages", [])
48+
if not messages:
49+
break
50+
51+
for message in messages:
52+
receipt_handle = message["ReceiptHandle"]
53+
msg_retry_prefix = self._get_message_attr(message, "retry_prefix")
54+
msg_function_prefix = self._get_message_attr(message, "function_prefix")
55+
56+
if (
57+
msg_retry_prefix != str(prefix)
58+
or msg_function_prefix != self.function_prefix
59+
):
60+
self._release_message(receipt_handle)
61+
continue
62+
63+
data = self._deserialize(message["Body"])
64+
if data is not None:
65+
key_data[receipt_handle] = data
66+
67+
if logger.isEnabledFor(logging.DEBUG):
68+
logger.debug(
69+
f"Found {len(key_data)} SQS retry messages for prefix {prefix}"
70+
)
71+
72+
return key_data
73+
74+
def store_data(self, prefix, data):
75+
"""Store data as one or more SQS messages, chunking to stay under the size limit."""
76+
if logger.isEnabledFor(logging.DEBUG):
77+
logger.debug(f"Storing retry data to SQS for prefix {prefix}")
78+
79+
chunks = self._chunk_data(data)
80+
for chunk in chunks:
81+
serialized = self._serialize(chunk)
82+
try:
83+
self.sqs_client.send_message(
84+
QueueUrl=self.queue_url,
85+
MessageBody=serialized,
86+
MessageAttributes={
87+
"retry_prefix": {
88+
"DataType": "String",
89+
"StringValue": str(prefix),
90+
},
91+
"function_prefix": {
92+
"DataType": "String",
93+
"StringValue": self.function_prefix,
94+
},
95+
},
96+
)
97+
except ClientError as e:
98+
logger.error(f"Failed to send SQS message for prefix {prefix}: {e}")
99+
100+
def delete_data(self, key):
101+
"""Delete a message by receipt handle. Idempotent — logs and swallows errors."""
102+
try:
103+
self.sqs_client.delete_message(
104+
QueueUrl=self.queue_url,
105+
ReceiptHandle=key,
106+
)
107+
except ClientError as e:
108+
logger.error(f"Failed to delete SQS message (receipt={key}): {e}")
109+
110+
def _release_message(self, receipt_handle):
111+
"""Make a non-matching message immediately visible to other consumers."""
112+
try:
113+
self.sqs_client.change_message_visibility(
114+
QueueUrl=self.queue_url,
115+
ReceiptHandle=receipt_handle,
116+
VisibilityTimeout=0,
117+
)
118+
except ClientError as e:
119+
logger.error(f"Failed to release SQS message: {e}")
120+
121+
@staticmethod
122+
def _get_message_attr(message, attr_name):
123+
"""Extract a string attribute value from an SQS message."""
124+
attrs = message.get("MessageAttributes", {})
125+
return attrs.get(attr_name, {}).get("StringValue")
126+
127+
def _chunk_data(self, data):
128+
"""Split a list of items into chunks that each fit under SQS_MAX_CHUNK_BYTES."""
129+
if not isinstance(data, list):
130+
return [data]
131+
132+
chunks = []
133+
current_chunk = []
134+
current_size = 2 # account for JSON array brackets "[]"
135+
136+
for item in data:
137+
item_json = json.dumps(item, ensure_ascii=False)
138+
item_size = len(item_json.encode("UTF-8"))
139+
# +1 for the comma separator between items
140+
separator_size = 1 if current_chunk else 0
141+
142+
if current_size + separator_size + item_size > SQS_MAX_CHUNK_BYTES:
143+
if current_chunk:
144+
chunks.append(current_chunk)
145+
if 2 + item_size > SQS_MAX_CHUNK_BYTES:
146+
logger.warning(
147+
f"Single item exceeds SQS message size limit "
148+
f"({item_size} bytes > {SQS_MAX_CHUNK_BYTES} bytes). "
149+
f"SQS send will fail for this chunk."
150+
)
151+
current_chunk = [item]
152+
current_size = 2 + item_size
153+
else:
154+
current_chunk.append(item)
155+
current_size += separator_size + item_size
156+
157+
if current_chunk:
158+
chunks.append(current_chunk)
159+
160+
return chunks or [data]
161+
162+
def _serialize(self, data):
163+
return json.dumps(data, ensure_ascii=False)
164+
165+
def _deserialize(self, data):
166+
try:
167+
return json.loads(data)
168+
except (json.JSONDecodeError, TypeError) as e:
169+
logger.error(f"Failed to deserialize SQS message body: {e}")
170+
return None

aws/logs_monitoring/retry/storage.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
import boto3
77
from botocore.exceptions import ClientError
88

9+
from retry.base_storage import BaseStorage
910
from settings import DD_S3_BUCKET_NAME, DD_S3_RETRY_DIRNAME
1011

1112
logger = logging.getLogger(__name__)
1213
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
1314

1415

15-
class Storage(object):
16+
class S3Storage(BaseStorage):
1617
def __init__(self, function_prefix):
1718
self.bucket_name = DD_S3_BUCKET_NAME
1819
self.s3_client = boto3.client("s3")
@@ -81,7 +82,7 @@ def _get_key_prefix(self, retry_prefix):
8182
return f"{DD_S3_RETRY_DIRNAME}/{self.function_prefix}/{str(retry_prefix)}/"
8283

8384
def _serialize(self, data):
84-
return bytes(json.dumps(data).encode("UTF-8"))
85+
return json.dumps(data).encode("UTF-8")
8586

8687
def _deserialize(self, data):
8788
return json.loads(data.decode("UTF-8"))

aws/logs_monitoring/settings.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ def is_api_key_valid():
242242

243243
# Check if the API key is the correct number of characters
244244
if len(DD_API_KEY) != 32:
245-
raise Exception(f"""
246-
Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}.
247-
Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys
248-
""")
245+
raise Exception(
246+
f"Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}. "
247+
f"Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys"
248+
)
249249

250250
# Validate the API key
251251
logger.debug("Validating the Datadog API key")
@@ -379,3 +379,4 @@ def get_enrich_cloudwatch_tags():
379379
DD_S3_RETRY_DIRNAME = "failed_events"
380380
DD_RETRY_KEYWORD = "retry"
381381
DD_STORE_FAILED_EVENTS = get_env_var("DD_STORE_FAILED_EVENTS", "false", boolean=True)
382+
DD_SQS_QUEUE_URL = get_env_var("DD_SQS_QUEUE_URL", default=None)

aws/logs_monitoring/steps/parsing.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,6 @@ def normalize_events(events, metadata):
247247

248248

249249
def collect_and_count(events):
250-
collected = []
251-
counter = 0
252-
for event in events:
253-
counter += 1
254-
collected.append(event)
255-
256-
send_event_metric("incoming_events", counter)
257-
250+
collected = list(events)
251+
send_event_metric("incoming_events", len(collected))
258252
return collected

aws/logs_monitoring/template.yaml

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ Parameters:
276276
Type: Number
277277
Default: 6
278278
Description: Interval in hours for scheduled forwarder invocation (via AWS EventBridge).
279+
DdSqsQueueUrl:
280+
Type: String
281+
Default: ""
282+
Description: URL of an existing SQS queue for failed event storage (alternative to S3). When set, the forwarder uses SQS instead of S3 for retry storage.
279283
DdForwarderExistingBucketName:
280284
Type: String
281285
Default: ""
@@ -404,6 +408,11 @@ Conditions:
404408
- !Condition CreateS3Bucket
405409
- !Not
406410
- !Equals [!Ref DdForwarderExistingBucketName, ""]
411+
SetDdSqsQueueUrl: !Not
412+
- !Equals [!Ref DdSqsQueueUrl, ""]
413+
HasStorageBackend: !Or
414+
- !Condition SetForwarderBucket
415+
- !Condition SetDdSqsQueueUrl
407416
SetVpcSecurityGroupIds: !Not
408417
- !Equals [!Join ["", !Ref VPCSecurityGroupIds], ""]
409418
SetVpcSubnetIds: !Not
@@ -531,9 +540,13 @@ Resources:
531540
- !Ref DdPort
532541
- !Ref AWS::NoValue
533542
DD_STORE_FAILED_EVENTS: !If
534-
- SetForwarderBucket
543+
- HasStorageBackend
535544
- !Ref DdStoreFailedEvents
536545
- !Ref AWS::NoValue
546+
DD_SQS_QUEUE_URL: !If
547+
- SetDdSqsQueueUrl
548+
- !Ref DdSqsQueueUrl
549+
- !Ref AWS::NoValue
537550
REDACT_IP: !If
538551
- SetRedactIp
539552
- !Ref RedactIp
@@ -770,6 +783,20 @@ Resources:
770783
- !Ref SqsQueueArnList
771784
- "*"
772785
Effect: Allow
786+
- !If
787+
- SetDdSqsQueueUrl # Access SQS queue for failed event storage
788+
- Action:
789+
- sqs:SendMessage
790+
- sqs:ReceiveMessage
791+
- sqs:DeleteMessage
792+
- sqs:ChangeMessageVisibility
793+
Resource: !Sub
794+
- "arn:${AWS::Partition}:sqs:${Region}:${Account}:${QueueName}"
795+
- Region: !Select [1, !Split [".", !Select [2, !Split ["/", !Ref DdSqsQueueUrl]]]]
796+
Account: !Select [3, !Split ["/", !Ref DdSqsQueueUrl]]
797+
QueueName: !Select [4, !Split ["/", !Ref DdSqsQueueUrl]]
798+
Effect: Allow
799+
- !Ref AWS::NoValue
773800
Tags:
774801
- Value: !FindInMap [Constants, DdForwarder, Version]
775802
Key: dd_forwarder_version
@@ -1159,6 +1186,7 @@ Metadata:
11591186
- DdForwarderExistingBucketName
11601187
- DdForwarderBucketName
11611188
- DdStoreFailedEvents
1189+
- DdSqsQueueUrl
11621190
- DdLogLevel
11631191
ParameterLabels:
11641192
DdApiKey:

0 commit comments

Comments
 (0)