44
55import re
66import requests
7- from collections import defaultdict
87from google .protobuf .internal .decoder import _DecodeVarint32 # pylint: disable=E0611,E0401
98from checks import AgentCheck
109from utils .prometheus import metrics_pb2
1110
12- from prometheus_client .parser import text_fd_to_metric_families
13-
14-
1511# Prometheus check is a mother class providing a structure and some helpers
1612# to collect metrics, events and service checks exposed via Prometheus.
1713#
@@ -33,15 +29,7 @@ class PrometheusFormat:
3329 PROTOBUF = "PROTOBUF"
3430 TEXT = "TEXT"
3531
36-
37- class UnknownFormatError (TypeError ):
38- pass
39-
40-
4132class PrometheusCheck (AgentCheck ):
42- UNWANTED_LABELS = ["le" , "quantile" ] # are specifics keys for prometheus itself
43- REQUESTS_CHUNK_SIZE = 1024 * 10 # use 10kb as chunk size when using the Stream feature in requests.get
44-
4533 def __init__ (self , name , init_config , agentConfig , instances = None ):
4634 AgentCheck .__init__ (self , name , init_config , agentConfig , instances )
4735 # message.type is the index in this array
@@ -57,7 +45,7 @@ def __init__(self, name, init_config, agentConfig, instances=None):
5745 # child check class.
5846 self .NAMESPACE = ''
5947
60- # `metrics_mapper` is a dictionary where the keys are the metrics to capture
48+ # `metrics_mapper` is a dictionnary where the keys are the metrics to capture
6149 # and the values are the corresponding metrics names to have in datadog.
6250 # Note: it is empty in the mother class but will need to be
6351 # overloaded/hardcoded in the final check not to be counted as custom metric.
@@ -95,24 +83,24 @@ def prometheus_metric_name(self, message, **kwargs):
9583 """ Example method"""
9684 pass
9785
98- def parse_metric_family ( self , response ):
99- """
100- Parse the MetricFamily from a valid requests.Response object to provide a MetricFamily object (see [0])
86+ class UnknownFormatError ( Exception ):
87+ def __init__ ( self , arg ):
88+ self . args = arg
10189
102- The text format uses iter_lines() generator.
90+ def parse_metric_family (self , buf , content_type ):
91+ """
92+ Gets the output data from a prometheus endpoint response along with its
93+ Content-type header and parses it into Prometheus classes (see [0])
10394
104- The protobuf format directly parse the response.content property searching for Prometheus messages of type
105- MetricFamily [0] delimited by a varint32 [1] when the content-type is a `application/vnd.google.protobuf`.
95+ Parse the binary buffer in input, searching for Prometheus messages
96+ of type MetricFamily [0] delimited by a varint32 [1] when the
97+ content-type is a `application/vnd.google.protobuf`.
10698
10799 [0] https://github.com/prometheus/client_model/blob/086fe7ca28bde6cec2acd5223423c1475a362858/metrics.proto#L76-%20%20L81
108100 [1] https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/AbstractMessageLite#writeDelimitedTo(java.io.OutputStream)
109-
110- :param response: requests.Response
111- :return: metrics_pb2.MetricFamily()
112101 """
113- if 'application/vnd.google.protobuf' in response . headers [ 'Content-Type' ] :
102+ if 'application/vnd.google.protobuf' in content_type :
114103 n = 0
115- buf = response .content
116104 while n < len (buf ):
117105 msg_len , new_pos = _DecodeVarint32 (buf , n )
118106 n = new_pos
@@ -130,56 +118,26 @@ def parse_metric_family(self, response):
130118 else :
131119 self .log .debug ("type override %s for %s is not a valid type name" % (new_type , message .name ))
132120 yield message
133-
134- elif 'text/plain' in response .headers ['Content-Type' ]:
135- messages = defaultdict (list ) # map with the name of the element (before the labels)
136- # and the list of occurrences with labels and values
137-
138- obj_map = {} # map of the types of each metrics
121+ elif 'text/plain' in content_type :
122+ messages = {} # map with the name of the element (before the labels) and the list of occurrences with labels and values
123+ obj_map = {} # map of the types of each metrics
139124 obj_help = {} # help for the metrics
140- for metric in text_fd_to_metric_families (response .iter_lines (chunk_size = self .REQUESTS_CHUNK_SIZE )):
141- metric_name = "%s_bucket" % metric .name if metric .type == "histogram" else metric .name
142- metric_type = self .type_overrides .get (metric_name , metric .type )
143- if metric_type == "untyped" or metric_type not in self .METRIC_TYPES :
144- continue
145-
146- for sample in metric .samples :
147- if (sample [0 ].endswith ("_sum" ) or sample [0 ].endswith ("_count" )) and \
148- metric_type in ["histogram" , "summary" ]:
149- messages [sample [0 ]].append ({"labels" : sample [1 ], 'value' : sample [2 ]})
150- else :
151- messages [metric_name ].append ({"labels" : sample [1 ], 'value' : sample [2 ]})
125+ for line in buf .splitlines ():
126+ self ._extract_metrics_from_string (line , messages , obj_map , obj_help )
127+
128+ # Add type overrides:
129+ for m_name , m_type in self .type_overrides .iteritems ():
130+ if m_type in self .METRIC_TYPES :
131+ obj_map [m_name ] = m_type
132+ else :
133+ self .log .debug ("type override %s for %s is not a valid type name" % (m_type ,m_name ))
152134
153- obj_map [metric .name ] = metric_type
154- obj_help [metric .name ] = metric .documentation
155135
156136 for _m in obj_map :
157- if _m in messages or (obj_map [_m ] == 'histogram' and ( '{}_bucket' .format (_m ) in messages ) ):
137+ if _m in messages or (obj_map [_m ] == 'histogram' and '{}_bucket' .format (_m ) in messages ):
158138 yield self ._extract_metric_from_map (_m , messages , obj_map , obj_help )
159139 else :
160- raise UnknownFormatError ('Unsupported content-type provided: {}' .format (
161- response .headers ['Content-Type' ]))
162-
163- @staticmethod
164- def get_metric_value_by_labels (messages , _metric , _m , metric_suffix ):
165- """
166- :param messages: dictionary as metric_name: {labels: {}, value: 10}
167- :param _metric: dictionary as {labels: {le: '0.001', 'custom': 'value'}}
168- :param _m: str as metric name
169- :param metric_suffix: str must be in (count or sum)
170- :return: value of the metric_name matched by the labels
171- """
172- metric_name = '{}_{}' .format (_m , metric_suffix )
173- expected_labels = set ([(k , v ) for k , v in _metric ["labels" ].iteritems ()
174- if k not in PrometheusCheck .UNWANTED_LABELS ])
175- for elt in messages [metric_name ]:
176- current_labels = set ([(k , v ) for k , v in elt ["labels" ].iteritems ()
177- if k not in PrometheusCheck .UNWANTED_LABELS ])
178- # As we have two hashable objects we can compare them without any side effects
179- if current_labels == expected_labels :
180- return float (elt ["value" ])
181-
182- raise AttributeError ("cannot find expected labels for metric %s with suffix %s" % (metric_name , metric_suffix ))
140+ raise self .UnknownFormatError ('Unsupported content-type provided: {}' .format (content_type ))
183141
184142 def _extract_metric_from_map (self , _m , messages , obj_map , obj_help ):
185143 """
@@ -220,15 +178,15 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
220178 _g .gauge .value = float (_metric ['value' ])
221179 elif obj_map [_m ] == 'summary' :
222180 if '{}_count' .format (_m ) in messages :
223- _g .summary .sample_count = long (self . get_metric_value_by_labels (messages , _metric , _m , 'count' ))
181+ _g .summary .sample_count = long (float (messages [ '{}_count' . format ( _m )][ 0 ][ 'value' ] ))
224182 if '{}_sum' .format (_m ) in messages :
225- _g .summary .sample_sum = self . get_metric_value_by_labels (messages , _metric , _m , 'sum' )
183+ _g .summary .sample_sum = float (messages [ '{}_sum' . format ( _m )][ 0 ][ 'value' ] )
226184 # TODO: see what can be done with the untyped metrics
227185 elif obj_map [_m ] == 'histogram' :
228186 if '{}_count' .format (_m ) in messages :
229- _g .histogram .sample_count = long (self . get_metric_value_by_labels (messages , _metric , _m , 'count' ))
187+ _g .histogram .sample_count = long (float (messages [ '{}_count' . format ( _m )][ 0 ][ 'value' ] ))
230188 if '{}_sum' .format (_m ) in messages :
231- _g .histogram .sample_sum = self . get_metric_value_by_labels (messages , _metric , _m , 'sum' )
189+ _g .histogram .sample_sum = float (messages [ '{}_sum' . format ( _m )][ 0 ][ 'value' ] )
232190 # last_metric = len(_obj.metric) - 1
233191 # if last_metric >= 0:
234192 for lbl in _metric ['labels' ]:
@@ -256,6 +214,42 @@ def _extract_metric_from_map(self, _m, messages, obj_map, obj_help):
256214 _l .value = _metric ['labels' ][lbl ]
257215 return _obj
258216
217+ def _extract_metrics_from_string (self , line , messages , obj_map , obj_help ):
218+ """
219+ Extracts the metrics from a line of metric and update the given
220+ dictionnaries (we take advantage of the reference of the dictionary here)
221+ """
222+ if line .startswith ('# TYPE' ):
223+ metric = line .split (' ' )
224+ if len (metric ) == 4 :
225+ obj_map [metric [2 ]] = metric [3 ] # line = # TYPE metric_name metric_type
226+ elif line .startswith ('# HELP' ):
227+ _h = line .split (' ' , 3 )
228+ if len (_h ) == 4 :
229+ obj_help [_h [2 ]] = _h [3 ] # line = # HELP metric_name Help message...
230+ elif not line .startswith ('#' ):
231+ _match = self .metrics_pattern .match (line )
232+ if _match is not None :
233+ _g = _match .groups ()
234+ _msg = []
235+ _lbls = self ._extract_labels_from_string (_g [1 ])
236+ if _g [0 ] in messages :
237+ _msg = messages [_g [0 ]]
238+ _msg .append ({'labels' : _lbls , 'value' : _g [2 ]})
239+ messages [_g [0 ]] = _msg
240+
241+ def _extract_labels_from_string (self ,labels ):
242+ """
243+ Extracts the labels from a string that looks like:
244+ {label_name_1="value 1", label_name_2="value 2"}
245+ """
246+ lbls = {}
247+ labels = labels .lstrip ('{' ).rstrip ('}' )
248+ _lbls = self .lbl_pattern .findall (labels )
249+ for _lbl in _lbls :
250+ lbls [_lbl [0 ]] = _lbl [1 ]
251+ return lbls
252+
259253 def process (self , endpoint , send_histograms_buckets = True , instance = None ):
260254 """
261255 Polls the data from prometheus and pushes them as gauges
@@ -264,15 +258,12 @@ def process(self, endpoint, send_histograms_buckets=True, instance=None):
264258 Note that if the instance has a 'tags' attribute, it will be pushed
265259 automatically as additionnal custom tags and added to the metrics
266260 """
267- response = self .poll (endpoint )
268- try :
269- tags = []
270- if instance is not None :
271- tags = instance .get ('tags' , [])
272- for metric in self .parse_metric_family (response ):
273- self .process_metric (metric , send_histograms_buckets = send_histograms_buckets , custom_tags = tags , instance = instance )
274- finally :
275- response .close ()
261+ content_type , data = self .poll (endpoint )
262+ tags = []
263+ if instance is not None :
264+ tags = instance .get ('tags' , [])
265+ for metric in self .parse_metric_family (data , content_type ):
266+ self .process_metric (metric , send_histograms_buckets = send_histograms_buckets , custom_tags = tags , instance = instance )
276267
277268 def process_metric (self , message , send_histograms_buckets = True , custom_tags = None , ** kwargs ):
278269 """
@@ -293,39 +284,23 @@ def process_metric(self, message, send_histograms_buckets=True, custom_tags=None
293284 except AttributeError as err :
294285 self .log .debug ("Unable to handle metric: {} - error: {}" .format (message .name , err ))
295286
296- def poll (self , endpoint , pFormat = PrometheusFormat .PROTOBUF , headers = None ):
287+ def poll (self , endpoint , pFormat = PrometheusFormat .PROTOBUF , headers = {} ):
297288 """
298289 Polls the metrics from the prometheus metrics endpoint provided.
299290 Defaults to the protobuf format, but can use the formats specified by
300291 the PrometheusFormat class.
301292 Custom headers can be added to the default headers.
302293
303- Returns a valid requests.Response, raise requests.HTTPError if the status code of the requests.Response
304- isn't valid - see response.raise_for_status()
305-
306- The caller needs to close the requests.Response
307-
308- :param endpoint: string url endpoint
309- :param pFormat: the preferred format defined in PrometheusFormat
310- :param headers: extra headers
311- :return: requests.Response
294+ Returns the content-type of the response and the content of the reponse itself.
312295 """
313- if headers is None :
314- headers = {}
315296 if 'accept-encoding' not in headers :
316297 headers ['accept-encoding' ] = 'gzip'
317298 if pFormat == PrometheusFormat .PROTOBUF :
318- headers ['accept' ] = 'application/vnd.google.protobuf; ' \
319- 'proto=io.prometheus.client.MetricFamily; ' \
320- 'encoding=delimited'
299+ headers ['accept' ] = 'application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited'
321300
322- response = requests .get (endpoint , headers = headers , stream = True )
323- try :
324- response .raise_for_status ()
325- return response
326- except requests .HTTPError :
327- response .close ()
328- raise
301+ req = requests .get (endpoint , headers = headers )
302+ req .raise_for_status ()
303+ return req .headers ['Content-Type' ], req .content
329304
330305 def _submit (self , metric_name , message , send_histograms_buckets = True , custom_tags = None ):
331306 """
0 commit comments