diff --git a/config.py b/config.py index 6b1c0fbd74..c13aca3005 100644 --- a/config.py +++ b/config.py @@ -1038,16 +1038,11 @@ def load_check_directory(agentConfig, hostname): if check_name in initialized_checks or check_name in init_failed_checks: continue - # if TRACE_CONFIG is set, service_disco_check_config looks like: - # (config_src, (sd_init_config, sd_instances)) instead of - # (sd_init_config, sd_instances) + sd_init_config, sd_instances = service_disco_check_config[1] if agentConfig.get(TRACE_CONFIG): - sd_init_config, sd_instances = service_disco_check_config[1] configs_and_sources[check_name] = ( service_disco_check_config[0], {'init_config': sd_init_config, 'instances': sd_instances}) - else: - sd_init_config, sd_instances = service_disco_check_config check_config = {'init_config': sd_init_config, 'instances': sd_instances} @@ -1065,8 +1060,7 @@ def load_check_directory(agentConfig, hostname): return configs_and_sources return {'initialized_checks': initialized_checks.values(), - 'init_failed_checks': init_failed_checks, - } + 'init_failed_checks': init_failed_checks} def load_check(agentConfig, hostname, checkname): @@ -1089,7 +1083,7 @@ def load_check(agentConfig, hostname, checkname): # the check was not found, try with service discovery for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems(): if check_name == checkname: - sd_init_config, sd_instances = service_disco_check_config + sd_init_config, sd_instances = service_disco_check_config[1] check_config = {'init_config': sd_init_config, 'instances': sd_instances} # try to load the check and return the result diff --git a/tests/core/test_service_discovery.py b/tests/core/test_service_discovery.py index 38d58f4e3e..a31be22430 100644 --- a/tests/core/test_service_discovery.py +++ b/tests/core/test_service_discovery.py @@ -10,9 +10,9 @@ from utils.service_discovery.config_stores import get_config_store from utils.service_discovery.consul_config_store import ConsulStore from utils.service_discovery.etcd_config_store import EtcdStore -from utils.service_discovery.abstract_config_store import AbstractConfigStore +from utils.service_discovery.abstract_config_store import AbstractConfigStore, CONFIG_FROM_KUBE from utils.service_discovery.sd_backend import get_sd_backend -from utils.service_discovery.sd_docker_backend import SDDockerBackend +from utils.service_discovery.sd_docker_backend import SDDockerBackend, _SDDockerBackendConfigFetchState def clear_singletons(agentConfig): @@ -34,20 +34,22 @@ def raise_for_status(self): def _get_container_inspect(c_id): """Return a mocked container inspect dict from self.container_inspects.""" - for co, _, _, _ in TestServiceDiscovery.container_inspects: + for co, _, _, _, _ in TestServiceDiscovery.container_inspects: if co.get('Id') == c_id: return co return None -def _get_conf_tpls(image_name, trace_config=False, kube_annotations=None): +def _get_conf_tpls(image_name, kube_annotations=None, kube_pod_name=None, kube_container_name=None): """Return a mocked configuration template from self.mock_templates.""" - return copy.deepcopy(TestServiceDiscovery.mock_templates.get(image_name)[0]) + return [(x, y) for x, y in + copy.deepcopy(TestServiceDiscovery.mock_templates.get(image_name)[0])] def _get_check_tpls(image_name, **kwargs): if image_name in TestServiceDiscovery.mock_templates: - return [copy.deepcopy(TestServiceDiscovery.mock_templates.get(image_name)[0][0][0:3])] + result = copy.deepcopy(TestServiceDiscovery.mock_templates.get(image_name)[0][0]) + return [(result[0], result[1][0:3])] elif image_name in TestServiceDiscovery.bad_mock_templates: try: return [copy.deepcopy(TestServiceDiscovery.bad_mock_templates.get(image_name))] @@ -107,16 +109,16 @@ class TestServiceDiscovery(unittest.TestCase): # templates with variables already extracted mock_templates = { - # image_name: ([(check_name, init_tpl, instance_tpl, variables)], (expected_config_template)) + # image_name: ([(source, (check_name, init_tpl, instance_tpl, variables))], (expected_config_template)) 'image_0': ( - [('check_0', {}, {'host': '%%host%%'}, ['host'])], - ('check_0', {}, {'host': '127.0.0.1'})), + [('template', ('check_0', {}, {'host': '%%host%%'}, ['host']))], + ('template', ('check_0', {}, {'host': '127.0.0.1'}))), 'image_1': ( - [('check_1', {}, {'port': '%%port%%'}, ['port'])], - ('check_1', {}, {'port': '1337'})), + [('template', ('check_1', {}, {'port': '%%port%%'}, ['port']))], + ('template', ('check_1', {}, {'port': '1337'}))), 'image_2': ( - [('check_2', {}, {'host': '%%host%%', 'port': '%%port%%'}, ['host', 'port'])], - ('check_2', {}, {'host': '127.0.0.1', 'port': '1337'})), + [('template', ('check_2', {}, {'host': '%%host%%', 'port': '%%port%%'}, ['host', 'port']))], + ('template', ('check_2', {}, {'host': '127.0.0.1', 'port': '1337'}))), } # raw templates coming straight from the config store @@ -124,13 +126,13 @@ class TestServiceDiscovery(unittest.TestCase): # image_name: ('[check_name]', '[init_tpl]', '[instance_tpl]', expected_python_tpl_list) 'image_0': ( ('["check_0"]', '[{}]', '[{"host": "%%host%%"}]'), - [('check_0', {}, {"host": "%%host%%"})]), + [('template', ('check_0', {}, {"host": "%%host%%"}))]), 'image_1': ( ('["check_1"]', '[{}]', '[{"port": "%%port%%"}]'), - [('check_1', {}, {"port": "%%port%%"})]), + [('template', ('check_1', {}, {"port": "%%port%%"}))]), 'image_2': ( ('["check_2"]', '[{}]', '[{"host": "%%host%%", "port": "%%port%%"}]'), - [('check_2', {}, {"host": "%%host%%", "port": "%%port%%"})]), + [('template', ('check_2', {}, {"host": "%%host%%", "port": "%%port%%"}))]), 'bad_image_0': ((['invalid template']), []), 'bad_image_1': (('invalid template'), []), 'bad_image_2': (None, []), @@ -176,7 +178,10 @@ def setUp(self): @mock.patch('utils.http.requests.get') @mock.patch('utils.kubernetes.kubeutil.check_yaml') - def test_get_host_address(self, mock_check_yaml, mock_get): + @mock.patch.object(AbstractConfigStore, '__init__', return_value=None) + @mock.patch('utils.dockerutil.DockerUtil.client', return_value=None) + @mock.patch('utils.kubernetes.kubeutil.get_conf_path', return_value=None) + def test_get_host_address(self, mock_check_yaml, mock_get, *args): kubernetes_config = {'instances': [{'kubelet_port': 1337}]} pod_list = { 'items': [{ @@ -232,55 +237,54 @@ def test_get_host_address(self, mock_check_yaml, mock_get): mock_get.return_value = Response(pod_list) for c_ins, tpl_var, expected_ip in ip_address_inspects: - with mock.patch.object(AbstractConfigStore, '__init__', return_value=None): - with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None): - with mock.patch('utils.kubernetes.kubeutil.get_conf_path', return_value=None): - sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) - self.assertEquals(sd_backend._get_host_address(c_ins, tpl_var), expected_ip) - clear_singletons(self.auto_conf_agentConfig) - - def test_get_port(self): - with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None): - for c_ins, _, var_tpl, expected_ports, _ in self.container_inspects: - sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) - if isinstance(expected_ports, str): - self.assertEquals(sd_backend._get_port(c_ins, var_tpl), expected_ports) - else: - self.assertRaises(expected_ports, sd_backend._get_port, c_ins, var_tpl) - clear_singletons(self.auto_conf_agentConfig) - - @mock.patch('docker.Client.inspect_container', side_effect=_get_container_inspect) + state = _SDDockerBackendConfigFetchState(lambda _: c_ins) + sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) + self.assertEquals(sd_backend._get_host_address(state, 'container id', tpl_var), expected_ip) + clear_singletons(self.auto_conf_agentConfig) + + @mock.patch('utils.dockerutil.DockerUtil.client', return_value=None) + def test_get_port(self, _): + for c_ins, _, var_tpl, expected_ports, _ in self.container_inspects: + state = _SDDockerBackendConfigFetchState(lambda _: c_ins) + sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) + if isinstance(expected_ports, str): + self.assertEquals(sd_backend._get_port(state, 'container id', var_tpl), expected_ports) + else: + self.assertRaises(expected_ports, sd_backend._get_port, state, 'c_id', var_tpl) + clear_singletons(self.auto_conf_agentConfig) + + @mock.patch('utils.dockerutil.DockerUtil.client', return_value=None) + @mock.patch.object(SDDockerBackend, '_get_host_address', return_value='127.0.0.1') + @mock.patch.object(SDDockerBackend, '_get_port', return_value='1337') @mock.patch.object(SDDockerBackend, '_get_config_templates', side_effect=_get_conf_tpls) - def test_get_check_configs(self, mock_inspect_container, mock_get_conf_tpls): + def test_get_check_configs(self, *args): """Test get_check_config with mocked container inspect and config template""" - with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None): - with mock.patch.object(SDDockerBackend, '_get_host_address', return_value='127.0.0.1'): - with mock.patch.object(SDDockerBackend, '_get_port', return_value='1337'): - c_id = self.docker_container_inspect.get('Id') - for image in self.mock_templates.keys(): - sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) - self.assertEquals( - sd_backend._get_check_configs(c_id, image)[0], - self.mock_templates[image][1]) - clear_singletons(self.auto_conf_agentConfig) + c_id = self.docker_container_inspect.get('Id') + for image in self.mock_templates.keys(): + sd_backend = get_sd_backend(agentConfig=self.auto_conf_agentConfig) + state = _SDDockerBackendConfigFetchState(_get_container_inspect) + self.assertEquals( + sd_backend._get_check_configs(state, c_id, image)[0], + self.mock_templates[image][1]) + clear_singletons(self.auto_conf_agentConfig) + @mock.patch('utils.dockerutil.DockerUtil.client', return_value=None) + @mock.patch.object(ConsulStore, 'get_client', return_value=None) + @mock.patch.object(EtcdStore, 'get_client', return_value=None) @mock.patch.object(AbstractConfigStore, 'get_check_tpls', side_effect=_get_check_tpls) - def test_get_config_templates(self, mock_get_check_tpls): + def test_get_config_templates(self, *args): """Test _get_config_templates with mocked get_check_tpls""" - with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None): - with mock.patch.object(EtcdStore, 'get_client', return_value=None): - with mock.patch.object(ConsulStore, 'get_client', return_value=None): - for agentConfig in self.agentConfigs: - sd_backend = get_sd_backend(agentConfig=agentConfig) - # normal cases - for image in self.mock_templates.keys(): - template = sd_backend._get_config_templates(image) - expected_template = self.mock_templates.get(image)[0] - self.assertEquals(template, expected_template) - # error cases - for image in self.bad_mock_templates.keys(): - self.assertEquals(sd_backend._get_config_templates(image), None) - clear_singletons(agentConfig) + for agentConfig in self.agentConfigs: + sd_backend = get_sd_backend(agentConfig=agentConfig) + # normal cases + for image in self.mock_templates.keys(): + template = sd_backend._get_config_templates(image) + expected_template = self.mock_templates.get(image)[0] + self.assertEquals(template, expected_template) + # error cases + for image in self.bad_mock_templates.keys(): + self.assertEquals(sd_backend._get_config_templates(image), None) + clear_singletons(agentConfig) def test_render_template(self): """Test _render_template""" @@ -315,7 +319,10 @@ def test_render_template(self): self.assertEquals(config, None) clear_singletons(agentConfig) - def test_fill_tpl(self): + @mock.patch('utils.dockerutil.DockerUtil.client', return_value=None) + @mock.patch.object(EtcdStore, 'get_client', return_value=None) + @mock.patch.object(ConsulStore, 'get_client', return_value=None) + def test_fill_tpl(self, *args): """Test _fill_tpl with mocked docker client""" valid_configs = [ @@ -466,32 +473,28 @@ def test_fill_tpl(self): ) ] - with mock.patch('utils.dockerutil.DockerUtil.client', return_value=None): - with mock.patch.object(EtcdStore, 'get_client', return_value=None): - with mock.patch.object(ConsulStore, 'get_client', return_value=None): - for ac in self.agentConfigs: - sd_backend = get_sd_backend(agentConfig=ac) - try: - for co in valid_configs + edge_cases: - inspect, tpl, variables, tags = co[0] - instance_tpl, var_values = sd_backend._fill_tpl(inspect, tpl, variables, tags) - for key in instance_tpl.keys(): - if isinstance(instance_tpl[key], list): - self.assertEquals(len(instance_tpl[key]), len(co[1][0].get(key))) - for elem in instance_tpl[key]: - self.assertTrue(elem in co[1][0].get(key)) - else: - self.assertEquals(instance_tpl[key], co[1][0].get(key)) - self.assertEquals(var_values, co[1][1]) - - for co in invalid_config: - inspect, tpl, variables, tags = co[0] - self.assertRaises(co[1], sd_backend._fill_tpl(inspect, tpl, variables, tags)) - - clear_singletons(ac) - except Exception: - clear_singletons(ac) - raise + for ac in self.agentConfigs: + sd_backend = get_sd_backend(agentConfig=ac) + try: + for co in valid_configs + edge_cases: + inspect, tpl, variables, tags = co[0] + state = _SDDockerBackendConfigFetchState(lambda _: inspect) + instance_tpl, var_values = sd_backend._fill_tpl(state, 'c_id', tpl, variables, tags) + for key in instance_tpl.keys(): + if isinstance(instance_tpl[key], list): + self.assertEquals(len(instance_tpl[key]), len(co[1][0].get(key))) + for elem in instance_tpl[key]: + self.assertTrue(elem in co[1][0].get(key)) + else: + self.assertEquals(instance_tpl[key], co[1][0].get(key)) + self.assertEquals(var_values, co[1][1]) + + for co in invalid_config: + inspect, tpl, variables, tags = co[0] + state = _SDDockerBackendConfigFetchState(lambda _: inspect) + self.assertRaises(co[1], sd_backend._fill_tpl(state, 'c_id', tpl, variables, tags)) + finally: + clear_singletons(ac) # config_stores tests @@ -526,12 +529,13 @@ def test_get_check_tpls(self, mock_client_read): @mock.patch.object(AbstractConfigStore, 'client_read', side_effect=client_read) def test_get_check_tpls_kube(self, mock_client_read): - """Test get_check_tpls""" + """Test get_check_tpls for kubernetes annotations""" valid_config = ['image_0', 'image_1', 'image_2'] invalid_config = ['bad_image_0'] config_store = get_config_store(self.auto_conf_agentConfig) for image in valid_config + invalid_config: tpl = self.mock_tpls.get(image)[1] + tpl = [(CONFIG_FROM_KUBE, t[1]) for t in tpl] if tpl: self.assertNotEquals( tpl, @@ -540,10 +544,12 @@ def test_get_check_tpls_kube(self, mock_client_read): tpl, config_store.get_check_tpls( 'k8s-' + image, auto_conf=True, + kube_pod_name=image, + kube_container_name='foo', kube_annotations=dict(zip( - ['com.datadoghq.sd/check_names', - 'com.datadoghq.sd/init_configs', - 'com.datadoghq.sd/instances'], + ['service-discovery.datadoghq.com/foo.check_names', + 'service-discovery.datadoghq.com/foo.init_configs', + 'service-discovery.datadoghq.com/foo.instances'], self.mock_tpls[image][0])))) def test_get_config_id(self): diff --git a/utils/service_discovery/abstract_config_store.py b/utils/service_discovery/abstract_config_store.py index 6e4850783b..2201045701 100644 --- a/utils/service_discovery/abstract_config_store.py +++ b/utils/service_discovery/abstract_config_store.py @@ -26,7 +26,8 @@ INIT_CONFIGS = 'init_configs' INSTANCES = 'instances' KUBE_ANNOTATIONS = 'kube_annotations' -KUBE_ANNOTATION_PREFIX = 'com.datadoghq.sd/' +KUBE_CONTAINER_NAME = 'kube_container_name' +KUBE_ANNOTATION_PREFIX = 'service-discovery.datadoghq.com' class KeyNotFound(Exception): @@ -98,11 +99,12 @@ def _populate_identifier_to_checks(self): return identifier_to_checks - def _get_kube_config(self, identifier, kube_annotations): + def _get_kube_config(self, identifier, kube_annotations, kube_container_name): try: - check_names = json.loads(kube_annotations[KUBE_ANNOTATION_PREFIX + CHECK_NAMES]) - init_config_tpls = json.loads(kube_annotations[KUBE_ANNOTATION_PREFIX + INIT_CONFIGS]) - instance_tpls = json.loads(kube_annotations[KUBE_ANNOTATION_PREFIX + INSTANCES]) + prefix = '{}/{}.'.format(KUBE_ANNOTATION_PREFIX, kube_container_name) + check_names = json.loads(kube_annotations[prefix + CHECK_NAMES]) + init_config_tpls = json.loads(kube_annotations[prefix + INIT_CONFIGS]) + instance_tpls = json.loads(kube_annotations[prefix + INSTANCES]) return [check_names, init_config_tpls, instance_tpls] except KeyError: return None @@ -131,9 +133,10 @@ def _get_auto_config(self, image_name): def get_checks_to_refresh(self, identifier, **kwargs): to_check = set(self.identifier_to_checks[identifier]) - kube_annotations = kwargs.get('kube_annotations') + kube_annotations = kwargs.get(KUBE_ANNOTATIONS) + kube_container_name = kwargs.get(KUBE_CONTAINER_NAME) if kube_annotations: - kube_config = self._get_kube_config(identifier, kube_annotations) + kube_config = self._get_kube_config(identifier, kube_annotations, kube_container_name) if kube_config is not None: to_check.update(kube_config[0]) @@ -141,29 +144,26 @@ def get_checks_to_refresh(self, identifier, **kwargs): def get_check_tpls(self, identifier, **kwargs): """Retrieve template configs for an identifier from the config_store or auto configuration.""" - trace_config = kwargs.get(TRACE_CONFIG, False) - # this flag is used when no valid configuration store was provided # it makes the method skip directly to the auto_conf if kwargs.get('auto_conf') is True: # When not using a configuration store on kubernetes, check the pod # annotations for configs before falling back to autoconf. kube_annotations = kwargs.get(KUBE_ANNOTATIONS) + kube_container_name = kwargs.get(KUBE_CONTAINER_NAME) if kube_annotations: - kube_config = self._get_kube_config(identifier, kube_annotations) + kube_config = self._get_kube_config(identifier, kube_annotations, kube_container_name) if kube_config is not None: check_names, init_config_tpls, instance_tpls = kube_config source = CONFIG_FROM_KUBE - return [(source, vs) if trace_config else vs + return [(source, vs) for vs in zip(check_names, init_config_tpls, instance_tpls)] # in auto config mode, identifier is the image name auto_config = self._get_auto_config(identifier) if auto_config is not None: source = CONFIG_FROM_AUTOCONF - if trace_config: - return [(source, auto_config)] - return [auto_config] + return [(source, auto_config)] else: log.debug('No auto config was found for image %s, leaving it alone.' % identifier) return [] @@ -183,7 +183,7 @@ def get_check_tpls(self, identifier, **kwargs): # Try to update the identifier_to_checks cache self._update_identifier_to_checks(identifier, check_names) - return [(source, values) if trace_config else values + return [(source, values) for values in zip(check_names, init_config_tpls, instance_tpls)] def read_config_from_store(self, identifier): diff --git a/utils/service_discovery/sd_docker_backend.py b/utils/service_discovery/sd_docker_backend.py index 028809cce3..55b53b4785 100644 --- a/utils/service_discovery/sd_docker_backend.py +++ b/utils/service_discovery/sd_docker_backend.py @@ -14,15 +14,59 @@ from utils.kubernetes import KubeUtil from utils.platform import Platform from utils.service_discovery.abstract_sd_backend import AbstractSDBackend -from utils.service_discovery.config_stores import get_config_store, TRACE_CONFIG +from utils.service_discovery.config_stores import get_config_store DATADOG_ID = 'com.datadoghq.sd.check.id' -K8S_ANNOTATION_CHECK_NAMES = 'com.datadoghq.sd/check_names' -K8S_ANNOTATION_INIT_CONFIGS = 'com.datadoghq.sd/init_configs' -K8S_ANNOTATION_INSTANCES = 'com.datadoghq.sd/instances' log = logging.getLogger(__name__) +class _SDDockerBackendConfigFetchState(object): + def __init__(self, inspect_fn, kube_pods=None): + self.inspect_cache = {} + + self.inspect_fn = inspect_fn + self.kube_pods = kube_pods + + def inspect_container(self, c_id): + if c_id in self.inspect_cache: + return self.inspect_cache[c_id] + + try: + self.inspect_cache[c_id] = inspect = self.inspect_fn(c_id) + except (NullResource, NotFound): + self.inspect_cache[c_id] = inspect = {} + + return inspect + + def get_kube_container_status(self, c_id): + for pod in self.kube_pods: + c_statuses = pod.get('status', {}).get('containerStatuses', []) + for status in c_statuses: + if c_id == status.get('containerID', '').split('//')[-1]: + return status + return {} + + def get_kube_container_name(self, c_id): + return self.get_kube_container_status(c_id).get('name') + + def get_kube_container_spec(self, c_id): + c_name = self.get_kube_container_name(c_id) + containers = self.get_kube_config(c_id, 'spec').get('containers', []) + for co in containers: + if co.get('name') == c_name: + return co + return None + + def get_kube_config(self, c_id, key): + """Get a part of a pod config from the kubernetes API""" + for pod in self.kube_pods: + c_statuses = pod.get('status', {}).get('containerStatuses', []) + for status in c_statuses: + if c_id == status.get('containerID', '').split('//')[-1]: + return pod.get(key, {}) + return {} + + class SDDockerBackend(AbstractSDBackend): """Docker-based service discovery""" @@ -47,31 +91,42 @@ def __init__(self, agentConfig): AbstractSDBackend.__init__(self, agentConfig) + def _make_fetch_state(self): + return _SDDockerBackendConfigFetchState( + self.docker_client.inspect_container, + self.kubeutil.retrieve_pods_list().get('items', []) if Platform.is_k8s() else None) + def update_checks(self, changed_containers): - conf_reload_set = set() - for id_ in changed_containers: - try: - inspect = self.docker_client.inspect_container(id_) - except (NullResource, NotFound): - inspect = {} + state = self._make_fetch_state() - checks = self._get_checks_from_inspect(inspect) + conf_reload_set = set() + for c_id in changed_containers: + checks = self._get_checks_to_refresh(state, c_id) conf_reload_set.update(set(checks)) if conf_reload_set: self.reload_check_configs = conf_reload_set - def _get_checks_from_inspect(self, inspect): + def _get_checks_to_refresh(self, state, c_id): """Get the list of checks applied to a container from the identifier_to_checks cache in the config store. Use the DATADOG_ID label or the image.""" + inspect = state.inspect_container(c_id) identifier = inspect.get('Config', {}).get('Labels', {}).get(DATADOG_ID) or \ inspect.get('Config', {}).get('Image') - annotations = (self._get_kube_config(inspect.get('Id'), 'metadata') or {}).get('annotations') if Platform.is_k8s() else None - return self.config_store.get_checks_to_refresh(identifier, kube_annotations=annotations) + platform_kwargs = {} + if Platform.is_k8s(): + kube_metadata = state.get_kube_config(c_id, 'metadata') or {} + platform_kwargs = { + 'kube_annotations': kube_metadata.get('annotations'), + 'kube_container_name': state.get_kube_container_name(c_id), + } - def _get_host_address(self, c_inspect, tpl_var): + return self.config_store.get_checks_to_refresh(identifier, **platform_kwargs) + + def _get_host_address(self, state, c_id, tpl_var): """Extract the container IP from a docker inspect object, or the kubelet API.""" + c_inspect = state.inspect_container(c_id) c_id, c_img = c_inspect.get('Id', ''), c_inspect.get('Config', {}).get('Image', '') networks = c_inspect.get('NetworkSettings', {}).get('Networks') or {} @@ -95,17 +150,9 @@ def _get_host_address(self, c_inspect, tpl_var): # kubernetes case log.debug("Couldn't find the IP address for container %s (%s), " "using the kubernetes way." % (c_id[:12], c_img)) - pod_list = self.kubeutil.retrieve_pods_list().get('items', []) - for pod in pod_list: - pod_ip = pod.get('status', {}).get('podIP') - if pod_ip is None: - continue - else: - c_statuses = pod.get('status', {}).get('containerStatuses', []) - for status in c_statuses: - # compare the container id with those of containers in the current pod - if c_id == status.get('containerID', '').split('//')[-1]: - return pod_ip + pod_ip = state.get_kube_config(c_id, 'status').get('podIP') + if pod_ip: + return pod_ip log.error("No IP address was found for container %s (%s)" % (c_id[:12], c_img)) return None @@ -138,9 +185,9 @@ def _get_fallback_ip(self, ip_dict): log.warning("Trying with the last (sorted) network: '%s'." % last_key) return ip_dict[last_key] - def _get_port(self, container_inspect, tpl_var): + def _get_port(self, state, c_id, tpl_var): """Extract a port from a container_inspect or the k8s API given a template variable.""" - c_id = container_inspect.get('Id', '') + container_inspect = state.inspect_container(c_id) try: ports = map(lambda x: x.split('/')[0], container_inspect['NetworkSettings']['Ports'].keys()) @@ -152,17 +199,10 @@ def _get_port(self, container_inspect, tpl_var): if not ports and Platform.is_k8s(): log.debug("Didn't find the port for container %s (%s), trying the kubernetes way." % (c_id[:12], container_inspect.get('Config', {}).get('Image', ''))) - co_statuses = self._get_kube_config(c_id, 'status').get('containerStatuses', []) - c_name = None - for co in co_statuses: - if co.get('containerID', '').split('//')[-1] == c_id: - c_name = co.get('name') - break - containers = self._get_kube_config(c_id, 'spec').get('containers', []) - for co in containers: - if co.get('name') == c_name: - ports = map(lambda x: str(x.get('containerPort')), co.get('ports', [])) - ports = sorted(ports, key=lambda x: int(x)) + spec = state.get_kube_container_spec(c_id) + if spec: + ports = [str(x.get('containerPort')) for x in spec.get('ports', [])] + ports = sorted(ports, key=int) return self._extract_port_from_list(ports, tpl_var) def _extract_port_from_list(self, ports, tpl_var): @@ -185,15 +225,15 @@ def _extract_port_from_list(self, ports, tpl_var): log.error("Port index is out of range. Using the last element instead.") return ports[-1] - def get_tags(self, c_inspect): + def get_tags(self, state, c_id): """Extract useful tags from docker or platform APIs. These are collected by default.""" tags = [] if Platform.is_k8s(): - pod_metadata = self._get_kube_config(c_inspect.get('Id'), 'metadata') + pod_metadata = state.get_kube_config(c_id, 'metadata') if pod_metadata is None: log.warning("Failed to fetch pod metadata for container %s." - " Kubernetes tags may be missing." % c_inspect.get('Id', '')[:12]) + " Kubernetes tags may be missing." % c_id[:12]) return [] # get labels kube_labels = pod_metadata.get('labels', {}) @@ -210,66 +250,45 @@ def get_tags(self, c_inspect): return tags - def _get_additional_tags(self, container_inspect, *args): + def _get_additional_tags(self, state, c_id, *args): tags = [] if Platform.is_k8s(): - pod_metadata = self._get_kube_config(container_inspect.get('Id'), 'metadata') - pod_spec = self._get_kube_config(container_inspect.get('Id'), 'spec') + pod_metadata = state.get_kube_config(c_id, 'metadata') + pod_spec = state.get_kube_config(c_id, 'spec') if pod_metadata is None or pod_spec is None: log.warning("Failed to fetch pod metadata or pod spec for container %s." - " Additional Kubernetes tags may be missing." % container_inspect.get('Id', '')[:12]) + " Additional Kubernetes tags may be missing." % c_id[:12]) return [] tags.append('node_name:%s' % pod_spec.get('nodeName')) tags.append('pod_name:%s' % pod_metadata.get('name')) return tags - def _get_kube_config(self, c_id, key): - """Get a part of a pod config from the kubernetes API""" - pods = self.kubeutil.retrieve_pods_list().get('items', []) - for pod in pods: - c_statuses = pod.get('status', {}).get('containerStatuses', []) - for status in c_statuses: - if c_id == status.get('containerID', '').split('//')[-1]: - return pod.get(key, {}) - def get_configs(self): """Get the config for all docker containers running on the host.""" configs = {} + state = self._make_fetch_state() containers = [( container.get('Image'), container.get('Id'), container.get('Labels') ) for container in self.docker_client.containers()] - # used by the configcheck agent command to trace where check configs come from - trace_config = self.agentConfig.get(TRACE_CONFIG, False) - for image, cid, labels in containers: try: # value of the DATADOG_ID tag or the image name if the label is missing identifier = self.get_config_id(image, labels) - check_configs = self._get_check_configs(cid, identifier, trace_config=trace_config) or [] + check_configs = self._get_check_configs(state, cid, identifier) or [] for conf in check_configs: - if trace_config and conf is not None: - source, conf = conf + source, (check_name, init_config, instance) = conf - check_name, init_config, instance = conf # build instances list if needed if configs.get(check_name) is None: - if trace_config: - configs[check_name] = (source, (init_config, [instance])) - else: - configs[check_name] = (init_config, [instance]) + configs[check_name] = (source, (init_config, [instance])) else: conflict_init_msg = 'Different versions of `init_config` found for check {}. ' \ 'Keeping the first one found.' - if trace_config: - if configs[check_name][1][0] != init_config: - log.warning(conflict_init_msg.format(check_name)) - configs[check_name][1][1].append(instance) - else: - if configs[check_name][0] != init_config: - log.warning(conflict_init_msg.format(check_name)) - configs[check_name][1].append(instance) + if configs[check_name][1][0] != init_config: + log.warning(conflict_init_msg.format(check_name)) + configs[check_name][1][1].append(instance) except Exception: log.exception('Building config for container %s based on image %s using service ' 'discovery failed, leaving it alone.' % (cid[:12], image)) @@ -279,37 +298,38 @@ def get_config_id(self, image, labels): """Look for a DATADOG_ID label, return its value or the image name if missing""" return labels.get(DATADOG_ID) or image - def _get_check_configs(self, c_id, identifier, trace_config=False): + def _get_check_configs(self, state, c_id, identifier): """Retrieve configuration templates and fill them with data pulled from docker and tags.""" - inspect = self.docker_client.inspect_container(c_id) - annotations = (self._get_kube_config(inspect.get('Id'), 'metadata') or {}).get('annotations') if Platform.is_k8s() else None - config_templates = self._get_config_templates(identifier, trace_config=trace_config, kube_annotations=annotations) + platform_kwargs = {} + if Platform.is_k8s(): + kube_metadata = state.get_kube_config(c_id, 'metadata') or {} + platform_kwargs = { + 'kube_container_name': state.get_kube_container_name(c_id), + 'kube_annotations': kube_metadata.get('annotations'), + } + config_templates = self._get_config_templates(identifier, **platform_kwargs) if not config_templates: log.debug('No config template for container %s with identifier %s. ' 'It will be left unconfigured.' % (c_id[:12], identifier)) return None check_configs = [] - tags = self.get_tags(inspect) + tags = self.get_tags(state, c_id) for config_tpl in config_templates: - if trace_config: - source, config_tpl = config_tpl + source, config_tpl = config_tpl check_name, init_config_tpl, instance_tpl, variables = config_tpl # insert tags in instance_tpl and process values for template variables - instance_tpl, var_values = self._fill_tpl(inspect, instance_tpl, variables, tags) + instance_tpl, var_values = self._fill_tpl(state, c_id, instance_tpl, variables, tags) tpl = self._render_template(init_config_tpl or {}, instance_tpl or {}, var_values) if tpl and len(tpl) == 2: init_config, instance = tpl - if trace_config: - check_configs.append((source, (check_name, init_config, instance))) - else: - check_configs.append((check_name, init_config, instance)) + check_configs.append((source, (check_name, init_config, instance))) return check_configs - def _get_config_templates(self, identifier, trace_config=False, kube_annotations=None): + def _get_config_templates(self, identifier, **platform_kwargs): """Extract config templates for an identifier from a K/V store and returns it as a dict object.""" config_backend = self.agentConfig.get('sd_config_backend') templates = [] @@ -319,18 +339,14 @@ def _get_config_templates(self, identifier, trace_config=False, kube_annotations else: auto_conf = False - # format: [('ident', {init_tpl}, {instance_tpl})] without trace_config - # or [(source, ('ident', {init_tpl}, {instance_tpl}))] with trace_config - raw_tpls = self.config_store.get_check_tpls( - identifier, auto_conf=auto_conf, trace_config=trace_config, kube_annotations=kube_annotations) + # format [(source, ('ident', {init_tpl}, {instance_tpl}))] + raw_tpls = self.config_store.get_check_tpls(identifier, auto_conf=auto_conf, **platform_kwargs) for tpl in raw_tpls: - if trace_config and tpl is not None: - # each template can come from either auto configuration or user-supplied templates - source, tpl = tpl - if tpl is not None and len(tpl) == 3: - check_name, init_config_tpl, instance_tpl = tpl - else: - log.debug('No template was found for identifier %s, leaving it alone.' % identifier) + # each template can come from either auto configuration or user-supplied templates + try: + source, (check_name, init_config_tpl, instance_tpl) = tpl + except (TypeError, IndexError, ValueError): + log.debug('No template was found for identifier %s, leaving it alone: %s' % (identifier, tpl)) return None try: # build a list of all variables to replace in the template @@ -346,18 +362,16 @@ def _get_config_templates(self, identifier, trace_config=False, kube_annotations ' by service discovery failed for ident {1}.'.format(check_name, identifier)) return None - if trace_config: - templates.append((source, (check_name, init_config_tpl, instance_tpl, variables))) - else: - templates.append((check_name, init_config_tpl, instance_tpl, variables)) + templates.append((source, + (check_name, init_config_tpl, instance_tpl, variables))) return templates - def _fill_tpl(self, inspect, instance_tpl, variables, tags=None): + def _fill_tpl(self, state, c_id, instance_tpl, variables, tags=None): """Add container tags to instance templates and build a dict from template variable names and their values.""" var_values = {} - c_id, c_image = inspect.get('Id', ''), inspect.get('Config', {}).get('Image', '') + c_image = state.inspect_container(c_id).get('Config', {}).get('Image', '') # add default tags to the instance if tags: @@ -369,7 +383,7 @@ def _fill_tpl(self, inspect, instance_tpl, variables, tags=None): # variables can be suffixed with an index in case several values are found if var.split('_')[0] in self.VAR_MAPPING: try: - res = self.VAR_MAPPING[var.split('_')[0]](inspect, var) + res = self.VAR_MAPPING[var.split('_')[0]](state, c_id, var) if res is None: raise ValueError("Invalid value for variable %s." % var) var_values[var] = res