Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import signal
import sys
import time
import supervisor.xmlrpc
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but that's 3rd party

import xmlrpclib
from copy import copy

# For pickle & PID files, see issue 293
Expand All @@ -32,10 +34,15 @@
get_parsed_args,
get_system_stats,
load_check_directory,
load_check
load_check,
generate_jmx_configs
)
from daemon import AgentSupervisor, Daemon
from emitter import http_emitter
import rpc.service_discovery_pb2

#3p
import grpc

# utils
from util import Watchdog
Expand All @@ -50,12 +57,16 @@
from utils.service_discovery.sd_backend import get_sd_backend

# Constants
from jmxfetch import JMX_CHECKS
PID_NAME = "dd-agent"
PID_DIR = None
WATCHDOG_MULTIPLIER = 10
RESTART_INTERVAL = 4 * 24 * 60 * 60 # Defaults to 4 days
START_COMMANDS = ['start', 'restart', 'foreground']
DD_AGENT_COMMANDS = ['check', 'flare', 'jmx']
JMX_SUPERVISOR_ENTRY = 'datadog-agent:jmxfetch'
JMX_GRACE_SECS = 2
SERVICE_DISCOVERY_PREFIX = 'SD-'

DEFAULT_COLLECTOR_PROFILE_INTERVAL = 20

Expand All @@ -81,6 +92,13 @@ def __init__(self, pidfile, autorestart, start_event=True, in_developer_mode=Fal
# this flag can be set to True, False, or a list of checks (for partial reload)
self.reload_configs_flag = False
self.sd_backend = None
self.supervisor_proxy = xmlrpclib.ServerProxy(
'http://127.0.0.1',
transport=supervisor.xmlrpc.SupervisorTransport(
None, None, serverurl='unix:///opt/datadog-agent/run/datadog-supervisor.sock')
)
channel = grpc.insecure_channel('localhost:50051')
self.rpcstub = rpc.service_discovery_pb2.ServiceDiscoveryStub(channel)

def _handle_sigterm(self, signum, frame):
"""Handles SIGTERM and SIGINT, which gracefully stops the agent."""
Expand Down Expand Up @@ -115,13 +133,43 @@ def reload_configs(self, checks_to_reload=set()):
check.stop()

self._checksd = load_check_directory(self._agentConfig, hostname)
jmx_sd_configs = generate_jmx_configs(self._agentConfig, hostname)
else:
new_checksd = copy(self._checksd)

self.refresh_specific_checks(hostname, new_checksd, checks_to_reload)
jmx_checks = [check for check in checks_to_reload if check in JMX_CHECKS]
py_checks = set(checks_to_reload) - set(jmx_checks)
self.refresh_specific_checks(hostname, new_checksd, py_checks)
jmx_sd_configs = generate_jmx_configs(self._agentConfig, hostname, jmx_checks)

# once the reload is done, replace existing checks with the new ones
self._checksd = new_checksd

# restart jmx
if jmx_sd_configs:
# TODO jaime: set guards here this is unix specific.
jmx_state = self.supervisor_proxy.supervisor.getProcessInfo(JMX_SUPERVISOR_ENTRY)
log.debug("Current JMX check state: %s", jmx_state['statename'])
if jmx_state['statename'] in ['STOPPED', 'EXITED', 'FATAL'] and self._agentConfig.get('sd_jmx_enable'):
log.debug("Starting JMX...")
self.supervisor_proxy.supervisor.startProcess(JMX_SUPERVISOR_ENTRY)
time.sleep(JMX_GRACE_SECS)
# TODO jaime: we probably have to wait for the the process to come up...

for name, yaml in jmx_sd_configs.iteritems():
try:
res = self.rpcstub.SetConfig(rpc.service_discovery_pb2.SDConfig(name="{}{}".format(
SERVICE_DISCOVERY_PREFIX, name), config=yaml))
except Exception as e:
log.exception("unable to submit YAML via RPC: %s", e)
else:
if res.success:
log.info("JMX SD Config submitted via RPC for %s successfully.", name)
else:
log.info("JMX SD Config submitted via RPC for %s failed. \
Perhaps overriden by file config or bad YAML.", name)


# Logging
num_checks = len(self._checksd['initialized_checks'])
if num_checks > 0:
Expand Down
44 changes: 39 additions & 5 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# (C) Datadog, Inc. 2010-2016
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

Expand All @@ -23,7 +22,7 @@
from urlparse import urlparse

# project
from util import check_yaml
from util import check_yaml, config_to_yaml
from utils.platform import Platform, get_os
from utils.proxy import get_proxy
from utils.service_discovery.config import extract_agent_config
Expand Down Expand Up @@ -72,6 +71,8 @@
"app.datad0g.com",
]

JMX_SD_CONF_TEMPLATE = '.jmx.{}.yaml'


class PathNotFound(Exception):
pass
Expand Down Expand Up @@ -376,7 +377,6 @@ def get_config(parse_args=True, cfg_path=None, options=None):
if options is not None and options.profile:
agentConfig['developer_mode'] = True

#
# Core config
#ap
if not config.has_option('Main', 'api_key'):
Expand Down Expand Up @@ -997,6 +997,7 @@ def load_check_directory(agentConfig, hostname):
initialize. Only checks that have a configuration
file in conf.d will be returned. '''
from checks import AGENT_METRICS_CHECK_NAME
from jmxfetch import JMX_CHECKS

initialized_checks = {}
init_failed_checks = {}
Expand Down Expand Up @@ -1035,7 +1036,9 @@ def load_check_directory(agentConfig, hostname):

for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems():
# ignore this config from service disco if the check has been loaded through a file config
if check_name in initialized_checks or check_name in init_failed_checks:
if check_name in initialized_checks or \
check_name in init_failed_checks or \
check_name in JMX_CHECKS:
continue

# if TRACE_CONFIG is set, service_disco_check_config looks like:
Expand Down Expand Up @@ -1071,12 +1074,14 @@ def load_check_directory(agentConfig, hostname):

def load_check(agentConfig, hostname, checkname):
"""Same logic as load_check_directory except it loads one specific check"""
from jmxfetch import JMX_CHECKS

agentConfig['checksd_hostname'] = hostname
osname = get_os()
checks_places = get_checks_places(osname, agentConfig)
for config_path in _file_configs_paths(osname, agentConfig):
check_name = _conf_path_to_check_name(config_path)
if check_name == checkname:
if check_name == checkname and check_name not in JMX_CHECKS:
conf_is_valid, check_config, invalid_check = _load_file_config(config_path, check_name, agentConfig)

if invalid_check and not conf_is_valid:
Expand All @@ -1098,6 +1103,35 @@ def load_check(agentConfig, hostname, checkname):

return None

def generate_jmx_configs(agentConfig, hostname, checknames=None):
"""Similar logic to load_check_directory for JMX checks"""
from jmxfetch import JMX_CHECKS

if not checknames:
checknames = JMX_CHECKS
agentConfig['checksd_hostname'] = hostname

# the check was not found, try with service discovery
generated = {}
for check_name, service_disco_check_config in _service_disco_configs(agentConfig).iteritems():
if check_name in checknames and check_name in JMX_CHECKS:
log.debug('Generating JMX config for: %s' % check_name)

# Why is sd_init_config a dict and sd_instances a list of dicts?
sd_init_config, sd_instances = service_disco_check_config
for idx, instance in enumerate(sd_instances):
check_config = {}
check_config.update(sd_init_config)
check_config.update(instance)

try:
yaml = config_to_yaml(check_config)
generated["{}_{}".format(check_name, idx)] = yaml
except Exception as e:
log.exception("Unable to generate YAML config for %s: %s", check_name, e)

return generated

#
# logging

Expand Down
3 changes: 3 additions & 0 deletions datadog.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ gce_updated_hostname: yes
# and modify its value.
# sd_template_dir: /datadog/check_configs
#
# JMX Service Disocvery
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this flag ? Let's add some comments about why it's needed or not

# sd_jmx_enable: no
#
# ========================================================================== #
# Other #
# ========================================================================== #
Expand Down
26 changes: 21 additions & 5 deletions jmxfetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
get_config,
get_logging_config,
PathNotFound,
_is_affirmative
)
from util import yLoader
from utils.jmx import JMX_FETCH_JAR_NAME, JMXFiles
from utils.platform import Platform
from utils.subprocess_output import subprocess
from utils.service_discovery.config import SD_DEFAULT_RPC_WAIT

log = logging.getLogger('jmxfetch')

Expand All @@ -46,6 +48,7 @@
}

_JVM_DEFAULT_MAX_MEMORY_ALLOCATION = " -Xmx200m"
_JVM_DEFAULT_SD_MAX_MEMORY_ALLOCATION = " -Xmx512m"
_JVM_DEFAULT_INITIAL_MEMORY_ALLOCATION = " -Xms50m"
JMXFETCH_MAIN_CLASS = "org.datadog.jmxfetch.App"
JMX_CHECKS = [
Expand Down Expand Up @@ -81,6 +84,7 @@ def __init__(self, confd_path, agentConfig):
self.agentConfig = agentConfig
self.logging_config = get_logging_config()
self.check_frequency = DEFAULT_CHECK_FREQUENCY
self.service_discovery = _is_affirmative(self.agentConfig.get('sd_jmx_enable', False))

self.jmx_process = None
self.jmx_checks = None
Expand All @@ -93,6 +97,10 @@ def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping subprocess.")
self.jmx_process.terminate()

def _handle_sigreload(self, signum, frame):
# Terminate jmx process on SIGTERM signal
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new signal handler if we are using gRPC ?

log.debug("Caught sigusr. Prompting reload...")

def register_signal_handlers(self):
"""
Enable SIGTERM and SIGINT handlers
Expand All @@ -104,6 +112,9 @@ def register_signal_handlers(self):
# Handle Keyboard Interrupt
signal.signal(signal.SIGINT, self._handle_sigterm)

# Handle Config reload...
signal.signal(signal.SIGUSR1, self._handle_sigreload)

except ValueError:
log.exception("Unable to register signal handlers.")

Expand Down Expand Up @@ -148,7 +159,7 @@ def run(self, command=None, checks_list=None, reporter=None, redirect_std_stream
except Exception:
log.exception("Error while writing JMX status file")

if len(self.jmx_checks) > 0:
if len(self.jmx_checks) > 0 or self.service_discovery:
return self._start(self.java_bin_path, self.java_options, self.jmx_checks,
command, reporter, self.tools_jar_path, self.custom_jar_paths, redirect_std_streams)
else:
Expand Down Expand Up @@ -273,13 +284,18 @@ def _start(self, path_to_java, java_run_opts, jmx_checks, command, reporter, too
subprocess_args.insert(len(subprocess_args) - 1, '--exit_file_location')
subprocess_args.insert(len(subprocess_args) - 1, path_to_exit_file)

subprocess_args.insert(4, '--check')
for check in jmx_checks:
subprocess_args.insert(5, check)
if self.service_discovery:
subprocess_args.insert(4, '--rpc_wait')
subprocess_args.insert(5, str(SD_DEFAULT_RPC_WAIT))

if jmx_checks:
subprocess_args.insert(4, '--check')
for check in jmx_checks:
subprocess_args.insert(5, check)

# Specify a maximum memory allocation pool for the JVM
if "Xmx" not in java_run_opts and "XX:MaxHeapSize" not in java_run_opts:
java_run_opts += _JVM_DEFAULT_MAX_MEMORY_ALLOCATION
java_run_opts += _JVM_DEFAULT_SD_MAX_MEMORY_ALLOCATION if self.service_discovery else _JVM_DEFAULT_MAX_MEMORY_ALLOCATION
# Specify the initial memory allocation pool for the JVM
if "Xms" not in java_run_opts and "XX:InitialHeapSize" not in java_run_opts:
java_run_opts += _JVM_DEFAULT_INITIAL_MEMORY_ALLOCATION
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ simplejson==3.6.5
supervisor==3.3.0
tornado==3.2.2
uptime==3.0.1
grpcio==1.0.0

###########################################################
# These modules are for checks. But they are
Expand Down
29 changes: 29 additions & 0 deletions rpc/proto/service_discovery.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.datadog.jmxfetch";
option java_outer_classname = "ServiceDiscoveryProto";

package servicediscovery;

// Interface exported by the server.
service ServiceDiscovery {

// A simple RPC.
//
// Attempts to set a YAML service discovery configuration.
//
// A confirmation message is returned with the status of the
// operation.
rpc SetConfig(SDConfig) returns (Confirmation) {}

}

message Confirmation {
bool success = 1;
}

message SDConfig {
string name = 1;
string config = 2;
}
31 changes: 31 additions & 0 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,37 @@ def check_yaml(conf_path):
else:
return check_config

def config_to_yaml(config):
'''
Convert a config dict to YAML
'''
assert 'init_config' in config, "No 'init_config' section found"
assert 'instances' in config, "No 'instances' section found"

valid_instances = True
if config['instances'] is None or not isinstance(config['instances'], list):
valid_instances = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might as well raise here.

else:
yaml_output = yaml.safe_dump(config, default_flow_style=False)

if not valid_instances:
raise Exception('You need to have at least one instance defined in your config.')

return yaml_output

def dump_yaml(path, config):
'''
Dump config dict to YAML file in path
'''
try:
yaml = config_to_yaml(config)
except Exception as e:
log.exception("Unable to convert config to YAML: %s", e)
else:
with open(path, 'w+') as f:
f.write(yaml)


class Watchdog(object):
"""
Simple signal-based watchdog. Restarts the process when:
Expand Down
2 changes: 1 addition & 1 deletion utils/service_discovery/abstract_config_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def crawl_config_template(self):
# Initialize the config index reference
if self.previous_config_index is None:
self.previous_config_index = config_index
return False
return True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this change ?

# Config has been modified since last crawl
# in this case a full config reload is triggered and the identifier_to_checks cache is rebuilt
if config_index != self.previous_config_index:
Expand Down
1 change: 1 addition & 0 deletions utils/service_discovery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

log = logging.getLogger(__name__)

SD_DEFAULT_RPC_WAIT = 5

def extract_agent_config(config):
# get merged into the real agentConfig
Expand Down
3 changes: 3 additions & 0 deletions utils/service_discovery/config_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def extract_sd_config(config):
if config.has_option('Main', 'sd_backend_port'):
sd_config['sd_backend_port'] = config.get(
'Main', 'sd_backend_port')
if config.has_option('Main', 'sd_jmx_enable'):
sd_config['sd_jmx_enable'] = config.get(
'Main', 'sd_jmx_enable')
return sd_config


Expand Down