-
Notifications
You must be signed in to change notification settings - Fork 395
Expand file tree
/
Copy pathforwarder.py
More file actions
184 lines (156 loc) · 6.44 KB
/
forwarder.py
File metadata and controls
184 lines (156 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import json
import logging
import os
from logs.datadog_batcher import DatadogBatcher
from logs.datadog_client import DatadogClient
from logs.datadog_http_client import DatadogHTTPClient
from logs.datadog_matcher import DatadogMatcher
from logs.datadog_scrubber import DatadogScrubber
from logs.helpers import add_retry_tag
from retry.enums import RetryPrefix
from retry.storage import Storage
from settings import (
DD_API_KEY,
DD_FORWARD_LOG,
DD_NO_SSL,
DD_PORT,
DD_SKIP_SSL_VALIDATION,
DD_STORE_FAILED_EVENTS,
DD_TRACE_INTAKE_URL,
DD_URL,
EXCLUDE_AT_MATCH,
INCLUDE_AT_MATCH,
SCRUBBING_RULE_CONFIGS,
)
from telemetry import send_event_metric, send_log_metric
from trace_forwarder.connection import TraceConnection
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
class Forwarder(object):
def __init__(self, function_prefix):
self.trace_connection = TraceConnection(
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
)
self.storage = Storage(function_prefix)
def forward(self, logs, metrics, traces):
"""
Forward logs, metrics, and traces to Datadog in a background thread.
"""
if DD_FORWARD_LOG:
self._forward_logs(logs)
self._forward_metrics(metrics)
self._forward_traces(traces)
def retry(self):
"""
Retry forwarding logs, metrics, and traces to Datadog.
"""
for prefix in RetryPrefix:
self._retry_prefix(prefix)
def _retry_prefix(self, prefix):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Retrying {prefix} data")
key_data = self.storage.get_data(prefix)
for k, d in key_data.items():
if d is None:
continue
match prefix:
case RetryPrefix.LOGS:
self._forward_logs(d, key=k)
case RetryPrefix.METRICS:
self._forward_metrics(d, key=k)
case RetryPrefix.TRACES:
self._forward_traces(d, key=k)
def _forward_logs(self, logs, key=None):
"""Forward logs to Datadog"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarding {len(logs)} logs")
scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
matcher = DatadogMatcher(
include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH
)
logs_to_forward = []
for log in logs:
if key:
log = add_retry_tag(log)
evaluated_log = log
# apply scrubbing rules to inner log message
if isinstance(log, dict) and log.get("message"):
try:
log["message"] = scrubber.scrub(log["message"])
evaluated_log = log["message"]
except Exception as e:
logger.exception(
f"Exception while scrubbing log message {log['message']}: {e}"
)
if matcher.match(evaluated_log):
logs_to_forward.append(json.dumps(log, ensure_ascii=False))
batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400)
cli = DatadogHTTPClient(
DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber
)
failed_logs = []
with DatadogClient(cli) as client:
for batch in batcher.batch(logs_to_forward):
try:
client.send(batch)
except Exception as e:
logger.exception(
f"Exception while forwarding log batch {batch}: {e}"
)
failed_logs.extend(batch)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded log batch: {batch}")
if key:
self.storage.delete_data(key)
if DD_STORE_FAILED_EVENTS and len(failed_logs) > 0 and not key:
self.storage.store_data(RetryPrefix.LOGS, failed_logs)
send_event_metric("logs_forwarded", len(logs_to_forward) - len(failed_logs))
def _forward_metrics(self, metrics, key=None):
"""
Forward custom metrics submitted via logs to Datadog in a background thread
using `lambda_stats` that is provided by the Datadog Python Lambda Layer.
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarding {len(metrics)} metrics")
failed_metrics = []
for metric in metrics:
try:
send_log_metric(metric)
except Exception as e:
logger.exception(
f"Exception while forwarding metric {json.dumps(metric)}: {e}"
)
failed_metrics.append(metric)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded metric: {json.dumps(metric)}")
if key:
self.storage.delete_data(key)
if DD_STORE_FAILED_EVENTS and len(failed_metrics) > 0 and not key:
self.storage.store_data(RetryPrefix.METRICS, failed_metrics)
send_event_metric("metrics_forwarded", len(metrics) - len(failed_metrics))
def _forward_traces(self, traces, key=None):
if not len(traces) > 0:
return
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarding {len(traces)} traces")
try:
serialized_trace_paylods = json.dumps(traces)
self.trace_connection.send_traces(serialized_trace_paylods)
except Exception as e:
logger.exception(
f"Exception while forwarding traces {serialized_trace_paylods}: {e}"
)
if DD_STORE_FAILED_EVENTS and not key:
self.storage.store_data(RetryPrefix.TRACES, traces)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded traces: {serialized_trace_paylods}")
if key:
self.storage.delete_data(key)
send_event_metric("traces_forwarded", len(traces))