-
Notifications
You must be signed in to change notification settings - Fork 395
Expand file tree
/
Copy pathlambda_function.py
More file actions
154 lines (125 loc) · 4.84 KB
/
lambda_function.py
File metadata and controls
154 lines (125 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import json
import logging
import os
from hashlib import sha1
import boto3
import requests
from datadog import api
from datadog_lambda.wrapper import datadog_lambda_wrapper
from caching.cache_layer import CacheLayer
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from forwarder import Forwarder
from settings import (
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_API_KEY,
DD_API_URL,
DD_FORWARDER_VERSION,
DD_RETRY_KEYWORD,
DD_SITE,
DD_SKIP_SSL_VALIDATION,
)
from steps.enrichment import enrich
from steps.parsing import parse
from steps.splitting import split
from steps.transformation import transform
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
# DD_API_KEY must be set
if DD_API_KEY == "<YOUR_DATADOG_API_KEY>" or DD_API_KEY == "":
raise Exception(
"Missing Datadog API key. Set DD_API_KEY environment variable. "
"See: https://docs.datadoghq.com/serverless/forwarder/"
)
# Check if the API key is the correct number of characters
if len(DD_API_KEY) != 32:
raise Exception(
f"Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}. "
f"Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys"
)
# Validate the API key
logger.debug("Validating the Datadog API key")
with requests.Session() as s:
retries = requests.adapters.Retry(
total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]
)
s.mount("http://", requests.adapters.HTTPAdapter(max_retries=retries))
s.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
validation_res = s.get(
"{}/api/v1/validate?api_key={}".format(DD_API_URL, DD_API_KEY),
verify=(not DD_SKIP_SSL_VALIDATION),
timeout=10,
)
if not validation_res.ok:
raise Exception(
f"Datadog API key validation failed (HTTP {validation_res.status_code}). "
f"Verify your API key is correct and DD_SITE matches your Datadog account region (current: {DD_SITE}). "
"See: https://docs.datadoghq.com/getting_started/site/"
)
# Force the layer to use the exact same API key and host as the forwarder
api._api_key = DD_API_KEY
api._api_host = DD_API_URL
api._cacert = not DD_SKIP_SSL_VALIDATION
cache_layer = None
forwarder = None
def datadog_forwarder(event, context):
"""The actual lambda function entry point"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Received Event:{json.dumps(event)}")
logger.debug(f"Forwarder version: {DD_FORWARDER_VERSION}")
if DD_ADDITIONAL_TARGET_LAMBDAS:
invoke_additional_target_lambdas(event)
function_prefix = get_function_arn_digest(context)
init_cache_layer(function_prefix)
init_forwarder(function_prefix)
parsed = parse(event, context, cache_layer)
enriched = enrich(parsed, cache_layer)
transformed = transform(enriched)
metrics, logs, trace_payloads = split(transformed)
forwarder.forward(logs, metrics, trace_payloads)
parse_and_submit_enhanced_metrics(logs, cache_layer)
try:
if bool(event.get(DD_RETRY_KEYWORD, False)) is True:
forwarder.retry()
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Failed to retry forwarding {e}")
pass
def init_cache_layer(function_prefix):
global cache_layer
if cache_layer is None:
# set the prefix for cache layer
try:
if cache_layer is None:
cache_layer = CacheLayer(function_prefix)
except Exception as e:
logger.exception(f"Failed to create cache layer due to {e}")
raise
def init_forwarder(function_prefix):
global forwarder
if forwarder is None:
forwarder = Forwarder(function_prefix)
def get_function_arn_digest(context):
function_arn = context.invoked_function_arn.lower()
prefix = sha1(function_arn.encode("UTF-8")).hexdigest()
return prefix
def invoke_additional_target_lambdas(event):
lambda_client = boto3.client("lambda")
lambda_arns = DD_ADDITIONAL_TARGET_LAMBDAS.split(",")
lambda_payload = json.dumps(event)
for lambda_arn in lambda_arns:
try:
lambda_client.invoke(
FunctionName=lambda_arn,
InvocationType="Event",
Payload=lambda_payload,
)
except Exception as e:
logger.exception(
f"Failed to invoke additional target lambda {lambda_arn} due to {e}"
)
return
lambda_handler = datadog_lambda_wrapper(datadog_forwarder)