-
Notifications
You must be signed in to change notification settings - Fork 395
Expand file tree
/
Copy pathparsing.py
More file actions
252 lines (212 loc) · 8.6 KB
/
parsing.py
File metadata and controls
252 lines (212 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import itertools
import json
import logging
import os
from settings import DD_SERVICE, DD_SOURCE
from steps.common import (
generate_metadata,
get_service_from_tags_and_remove_duplicates,
merge_dicts,
)
from steps.enums import AwsEventSource, AwsEventType, AwsEventTypeKeyword
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.s3_handler import S3EventHandler
from telemetry import send_event_metric, set_forwarder_telemetry_tags
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
def parse(event, context, cache_layer):
"""Parse Lambda input to normalized events"""
metadata = generate_metadata(context)
try:
event_type = parse_event_type(event)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Parsed event type: {event_type}")
set_forwarder_telemetry_tags(context, event_type)
match event_type:
case AwsEventType.AWSLOGS:
aws_handler = AwsLogsHandler(context, cache_layer)
events = aws_handler.handle(event)
return collect_and_count(events)
case AwsEventType.S3:
s3_handler = S3EventHandler(context, metadata, cache_layer)
events = s3_handler.handle(event)
case AwsEventType.SQS:
events = sqs_handler(event, context, cache_layer)
return collect_and_count(events)
case AwsEventType.EVENTBRIDGE_S3:
events = eventbridge_s3_handler(event, context, metadata, cache_layer)
case AwsEventType.EVENTS:
events = cwevent_handler(event, metadata)
case AwsEventType.SNS:
events = sns_handler(event, metadata)
case AwsEventType.KINESIS:
events = kinesis_awslogs_handler(event, context, cache_layer)
return collect_and_count(events)
except Exception as e:
# Logs through the socket the error
err_message = "Error parsing the object. Exception: {} for event {}".format(
str(e), event
)
events = [err_message]
return normalize_events(events, metadata)
def parse_event_type(event):
if records := event.get(str(AwsEventTypeKeyword.RECORDS), None):
record = records[0]
if record.get(str(AwsEventType.S3), None):
return AwsEventType.S3
elif sns_record := record.get(str(AwsEventTypeKeyword.SNS), None):
sns_msg = sns_record.get(str(AwsEventTypeKeyword.MESSAGE), None)
try:
sns_msg_dict = json.loads(sns_msg)
if inner_records := sns_msg_dict.get(
str(AwsEventTypeKeyword.RECORDS), None
):
if inner_records[0].get(str(AwsEventType.S3)):
return AwsEventType.S3
except Exception:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"No s3 event detected from SNS message: {sns_msg}")
return AwsEventType.SNS
elif str(AwsEventType.KINESIS) in record:
return AwsEventType.KINESIS
elif record.get("eventSource") == "aws:sqs":
return AwsEventType.SQS
elif str(AwsEventType.AWSLOGS) in event:
return AwsEventType.AWSLOGS
elif "detail" in event:
# Check if this is an EventBridge S3 event
if event.get("source", "") == "aws.s3" and "Object Created" in event.get(
"detail-type", ""
):
return AwsEventType.EVENTBRIDGE_S3
return AwsEventType.EVENTS
raise Exception("Event type not supported (see #Event supported section)")
# Handle S3 events delivered via SQS (S3 -> SQS or S3 -> SNS -> SQS)
def sqs_handler(event, context, cache_layer):
for record in event["Records"]:
inner_event = _extract_inner_event_from_sqs(record)
if inner_event is None:
continue
# Fresh metadata per SQS record: S3EventHandler mutates metadata
# (DD_SOURCE, tags, service), so each record needs its own copy.
metadata = generate_metadata(context)
s3_handler = S3EventHandler(context, metadata, cache_layer)
for log_event in s3_handler.handle(inner_event):
if isinstance(log_event, dict):
yield merge_dicts(log_event, metadata)
elif isinstance(log_event, str):
yield merge_dicts({"message": log_event}, metadata)
def _extract_inner_event_from_sqs(sqs_record):
try:
body = json.loads(sqs_record["body"])
except (json.JSONDecodeError, KeyError, TypeError):
logger.warning("SQS record has missing or malformed body, skipping")
return None
if not isinstance(body, dict):
logger.warning("SQS record body is not a JSON object, skipping")
return None
# Direct S3 event: body contains Records[0].s3
if _contains_s3_records(body):
return body
# SNS-wrapped S3 event: body.Type == "Notification" and body.Message contains S3 event
if body.get("Type") == "Notification":
try:
message = json.loads(body.get("Message", ""))
if _contains_s3_records(message):
return message
except (json.JSONDecodeError, TypeError):
pass
logger.warning("SQS record body does not contain a recognized S3 event, skipping")
return None
def _contains_s3_records(event):
records = event.get("Records")
return isinstance(records, list) and len(records) > 0 and records[0].get("s3")
# Handle S3 event over EventBridge
def eventbridge_s3_handler(event, context, metadata, cache_layer):
"""
Transform EventBridge S3 event to standard S3 event format.
EventBridge format:
{
"version": "0",
"detail-type": "Object Created",
"source": "aws.s3",
"detail": {
"bucket": {"name": "bucket-name"},
"object": {"key": "object-key", "size": 1234}
}
}
Standard S3 format:
{
"Records": [{
"s3": {
"bucket": {"name": "bucket-name"},
"object": {"key": "object-key"}
}
}]
}
"""
def reformat_eventbridge_s3_event(event):
return {
"Records": [
{
"s3": {
"bucket": event["detail"]["bucket"],
"object": {
"key": event["detail"]["object"]["key"],
},
}
}
]
}
event = reformat_eventbridge_s3_event(event)
s3_handler = S3EventHandler(context, metadata, cache_layer)
return s3_handler.handle(event)
# Handle Cloudwatch Events
def cwevent_handler(event, metadata):
# Set the source on the log
if metadata.get(DD_SOURCE) is None:
source = event.get("source", str(AwsEventSource.CLOUDWATCH))
service = source.split(".")
if len(service) > 1:
metadata[DD_SOURCE] = service[1]
else:
metadata[DD_SOURCE] = str(AwsEventSource.CLOUDWATCH)
metadata[DD_SERVICE] = get_service_from_tags_and_remove_duplicates(metadata)
yield event
# Handle Sns events
def sns_handler(event, metadata):
# Set the source on the log
metadata[DD_SOURCE] = str(AwsEventSource.SNS)
for ev in event["Records"]:
yield ev
# Handle CloudWatch logs from Kinesis
def kinesis_awslogs_handler(event, context, cache_layer):
def reformat_record(record):
return {"awslogs": {"data": record["kinesis"]["data"]}}
awslogs_handler = AwsLogsHandler(context, cache_layer)
return itertools.chain.from_iterable(
awslogs_handler.handle(reformat_record(r)) for r in event["Records"]
)
def normalize_events(events, metadata):
normalized = []
events_counter = 0
for event in events:
events_counter += 1
if isinstance(event, dict):
normalized.append(merge_dicts(event, metadata))
elif isinstance(event, str):
normalized.append(merge_dicts({"message": event}, metadata))
else:
# drop this log
continue
"""Submit count of total events"""
send_event_metric("incoming_events", events_counter)
return normalized
def collect_and_count(events):
collected = list(events)
send_event_metric("incoming_events", len(collected))
return collected