66from collections import defaultdict
77from datetime import datetime , timedelta
88from itertools import islice
9+ from math import ceil , floor , sqrt
910from urlparse import urljoin
1011
1112# project
1516import requests
1617
1718
19+ # More information in https://www.consul.io/docs/internals/coordinates.html,
20+ # code is based on the snippet there.
21+ def distance (a , b ):
22+ a = a ['Coord' ]
23+ b = b ['Coord' ]
24+ total = 0
25+ b_vec = b ['Vec' ]
26+ for i , a_p in enumerate (a ['Vec' ]):
27+ diff = a_p - b_vec [i ]
28+ total += diff * diff
29+ rtt = sqrt (total ) + a ['Height' ] + b ['Height' ]
30+
31+ adjusted = rtt + a ['Adjustment' ] + b ['Adjustment' ]
32+ if adjusted > 0.0 :
33+ rtt = adjusted
34+
35+ return rtt * 1000.0
36+
37+
38+ def ceili (v ):
39+ return int (ceil (v ))
40+
41+
1842class ConsulCheck (AgentCheck ):
1943 CONSUL_CHECK = 'consul.up'
2044 HEALTH_CHECK = 'consul.check'
@@ -102,7 +126,18 @@ def _is_instance_leader(self, instance):
102126 return False
103127
104128 def _check_for_leader_change (self , instance ):
105- agent_dc = self ._get_agent_datacenter (instance )
129+ perform_new_leader_checks = instance .get ('new_leader_checks' ,
130+ self .init_config .get ('new_leader_checks' , False ))
131+ perform_self_leader_check = instance .get ('self_leader_check' ,
132+ self .init_config .get ('self_leader_check' , False ))
133+
134+ if perform_new_leader_checks and perform_self_leader_check :
135+ self .log .warn ('Both perform_self_leader_check and perform_new_leader_checks are set, '
136+ 'ignoring perform_new_leader_checks' )
137+ elif not perform_new_leader_checks and not perform_self_leader_check :
138+ # Nothing to do here
139+ return
140+
106141 leader = self ._get_cluster_leader (instance )
107142
108143 if not leader :
@@ -118,24 +153,30 @@ def _check_for_leader_change(self, instance):
118153 self ._last_known_leader = leader
119154 return
120155
156+ agent = self ._get_agent_url (instance )
157+ agent_dc = self ._get_agent_datacenter (instance )
158+
121159 if leader != self ._last_known_leader :
122- self .log .info (('Leader change from {0} to {1}. Sending new leader event' ).format (
123- self ._last_known_leader , leader ))
124-
125- self .event ({
126- "timestamp" : int (datetime .now ().strftime ("%s" )),
127- "event_type" : "consul.new_leader" ,
128- "source_type_name" : self .SOURCE_TYPE_NAME ,
129- "msg_title" : "New Consul Leader Elected in consul_datacenter:{0}" .format (agent_dc ),
130- "aggregation_key" : "consul.new_leader" ,
131- "msg_text" : "The Node at {0} is the new leader of the consul datacenter {1}" .format (
132- leader ,
133- agent_dc
134- ),
135- "tags" : ["prev_consul_leader:{0}" .format (self ._last_known_leader ),
136- "curr_consul_leader:{0}" .format (leader ),
137- "consul_datacenter:{0}" .format (agent_dc )]
138- })
160+ # There was a leadership change
161+ if perform_new_leader_checks or (perform_self_leader_check and agent == leader ):
162+ # We either emit all leadership changes or emit when we become the leader and that just happened
163+ self .log .info (('Leader change from {0} to {1}. Sending new leader event' ).format (
164+ self ._last_known_leader , leader ))
165+
166+ self .event ({
167+ "timestamp" : int (datetime .now ().strftime ("%s" )),
168+ "event_type" : "consul.new_leader" ,
169+ "source_type_name" : self .SOURCE_TYPE_NAME ,
170+ "msg_title" : "New Consul Leader Elected in consul_datacenter:{0}" .format (agent_dc ),
171+ "aggregation_key" : "consul.new_leader" ,
172+ "msg_text" : "The Node at {0} is the new leader of the consul datacenter {1}" .format (
173+ leader ,
174+ agent_dc
175+ ),
176+ "tags" : ["prev_consul_leader:{0}" .format (self ._last_known_leader ),
177+ "curr_consul_leader:{0}" .format (leader ),
178+ "consul_datacenter:{0}" .format (agent_dc )]
179+ })
139180
140181 self ._last_known_leader = leader
141182
@@ -167,10 +208,7 @@ def _cull_services_list(self, services, service_whitelist):
167208 return services
168209
169210 def check (self , instance ):
170- perform_new_leader_checks = instance .get ('new_leader_checks' ,
171- self .init_config .get ('new_leader_checks' , False ))
172- if perform_new_leader_checks :
173- self ._check_for_leader_change (instance )
211+ self ._check_for_leader_change (instance )
174212
175213 peers = self .get_peers_in_cluster (instance )
176214 main_tags = []
@@ -190,6 +228,8 @@ def check(self, instance):
190228 service_check_tags = ['consul_url:{0}' .format (instance .get ('url' ))]
191229 perform_catalog_checks = instance .get ('catalog_checks' ,
192230 self .init_config .get ('catalog_checks' ))
231+ perform_network_latency_checks = instance .get ('network_latency_checks' ,
232+ self .init_config .get ('network_latency_checks' ))
193233
194234 try :
195235 # Make service checks from health checks for all services in catalog
@@ -323,3 +363,69 @@ def check(self, instance):
323363 status_value ,
324364 tags = main_tags + node_tags
325365 )
366+
367+ if perform_network_latency_checks :
368+ self .check_network_latency (instance , agent_dc , main_tags )
369+
370+ def _get_coord_datacenters (self , instance ):
371+ return self .consul_request (instance , '/v1/coordinate/datacenters' )
372+
373+ def _get_coord_nodes (self , instance ):
374+ return self .consul_request (instance , 'v1/coordinate/nodes' )
375+
376+ def check_network_latency (self , instance , agent_dc , main_tags ):
377+
378+ datacenters = self ._get_coord_datacenters (instance )
379+ for datacenter in datacenters :
380+ name = datacenter ['Datacenter' ]
381+ if name == agent_dc :
382+ # This is us, time to collect inter-datacenter data
383+ for other in datacenters :
384+ other_name = other ['Datacenter' ]
385+ if name == other_name :
386+ # Ignore ourself
387+ continue
388+ latencies = []
389+ for node_a in datacenter ['Coordinates' ]:
390+ for node_b in other ['Coordinates' ]:
391+ latencies .append (distance (node_a , node_b ))
392+ latencies .sort ()
393+ tags = main_tags + ['source_datacenter:{}' .format (name ),
394+ 'dest_datacenter:{}' .format (other_name )]
395+ n = len (latencies )
396+ half_n = int (floor (n / 2 ))
397+ if n % 2 :
398+ median = latencies [half_n ]
399+ else :
400+ median = (latencies [half_n - 1 ] + latencies [half_n ]) / 2
401+ self .gauge ('consul.net.dc.latency.min' , latencies [0 ], hostname = '' , tags = tags )
402+ self .gauge ('consul.net.dc.latency.median' , median , hostname = '' , tags = tags )
403+ self .gauge ('consul.net.dc.latency.max' , latencies [- 1 ], hostname = '' , tags = tags )
404+ # We've found ourself, we can move on
405+ break
406+
407+ # Intra-datacenter
408+ nodes = self ._get_coord_nodes (instance )
409+ for node in nodes :
410+ node_name = node ['Node' ]
411+ latencies = []
412+ for other in nodes :
413+ other_name = other ['Node' ]
414+ if node_name == other_name :
415+ continue
416+ latencies .append (distance (node , other ))
417+ latencies .sort ()
418+ n = len (latencies )
419+ half_n = int (floor (n / 2 ))
420+ if n % 2 :
421+ median = latencies [half_n ]
422+ else :
423+ median = (latencies [half_n - 1 ] + latencies [half_n ]) / 2
424+ self .gauge ('consul.net.node.latency.min' , latencies [0 ], hostname = node_name , tags = main_tags )
425+ self .gauge ('consul.net.node.latency.p25' , latencies [ceili (n * 0.25 ) - 1 ], hostname = node_name , tags = main_tags )
426+ self .gauge ('consul.net.node.latency.median' , median , hostname = node_name , tags = main_tags )
427+ self .gauge ('consul.net.node.latency.p75' , latencies [ceili (n * 0.75 ) - 1 ], hostname = node_name , tags = main_tags )
428+ self .gauge ('consul.net.node.latency.p90' , latencies [ceili (n * 0.90 ) - 1 ], hostname = node_name , tags = main_tags )
429+ self .gauge ('consul.net.node.latency.p95' , latencies [ceili (n * 0.95 ) - 1 ], hostname = node_name , tags = main_tags )
430+ self .gauge ('consul.net.node.latency.p99' , latencies [ceili (n * 0.99 ) - 1 ], hostname = node_name , tags = main_tags )
431+ self .gauge ('consul.net.node.latency.max' , latencies [- 1 ], hostname = node_name , tags = main_tags )
0 commit comments