-
Notifications
You must be signed in to change notification settings - Fork 395
Expand file tree
/
Copy pathdatadog_http_client.py
More file actions
114 lines (91 loc) · 3.48 KB
/
datadog_http_client.py
File metadata and controls
114 lines (91 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import logging
import os
from concurrent.futures import as_completed
from requests_futures.sessions import FuturesSession
from logs.exceptions import ScrubbingException
from logs.helpers import compress_logs
from settings import (
DD_COMPRESSION_LEVEL,
DD_FORWARDER_VERSION,
DD_MAX_WORKERS,
DD_USE_COMPRESSION,
get_enrich_cloudwatch_tags,
get_enrich_s3_tags,
)
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
def get_dd_storage_tag_header():
storage_tag = ""
if get_enrich_s3_tags():
storage_tag += "s3"
if get_enrich_cloudwatch_tags():
if storage_tag != "":
storage_tag += ","
storage_tag += "cloudwatch"
return storage_tag
class DatadogHTTPClient(object):
"""
Client that sends a batch of logs over HTTP.
"""
_POST = "POST"
if DD_USE_COMPRESSION:
_HEADERS = {"Content-type": "application/json", "Content-Encoding": "gzip"}
else:
_HEADERS = {"Content-type": "application/json"}
_HEADERS["DD-EVP-ORIGIN"] = "aws_forwarder"
_HEADERS["DD-EVP-ORIGIN-VERSION"] = DD_FORWARDER_VERSION
storage_tag = get_dd_storage_tag_header()
if storage_tag != "":
_HEADERS["DD-STORAGE-TAG"] = storage_tag
def __init__(
self, host, port, no_ssl, skip_ssl_validation, api_key, scrubber, timeout=10
):
self._HEADERS.update({"DD-API-KEY": api_key})
protocol = "http" if no_ssl else "https"
self._url = "{}://{}:{}/api/v2/logs".format(protocol, host, port)
self._scrubber = scrubber
self._timeout = timeout
self._session = None
self._ssl_validation = not skip_ssl_validation
self._futures = []
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Initialized http client for logs intake: "
f"<host: {host}, port: {port}, url: {self._url}, no_ssl: {no_ssl}, "
f"skip_ssl_validation: {skip_ssl_validation}, timeout: {timeout}>"
)
def _connect(self):
self._session = FuturesSession(max_workers=DD_MAX_WORKERS)
self._session.headers.update(self._HEADERS)
def _close(self):
# Resolve all the futures and log exceptions if any
for future in as_completed(self._futures):
try:
future.result()
except Exception:
logger.exception("Exception while forwarding logs")
self._session.close()
def send(self, logs):
"""
Sends a batch of log, only retry on server and network errors.
"""
try:
data = self._scrubber.scrub("[{}]".format(",".join(logs)))
except ScrubbingException as e:
raise Exception(f"could not scrub the payload: {e}")
if DD_USE_COMPRESSION:
data = compress_logs(data, DD_COMPRESSION_LEVEL)
# FuturesSession returns immediately with a future object
future = self._session.post(
self._url, data, timeout=self._timeout, verify=self._ssl_validation
)
self._futures.append(future)
def __enter__(self):
self._connect()
return self
def __exit__(self, ex_type, ex_value, traceback):
self._close()