Skip to content
11 changes: 11 additions & 0 deletions elastic/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ files:
value:
type: boolean
example: false
- name: detailed_shard_metrics
description: |
Enable to collect detailed shard placement metrics with `index` and `prirep` (primary/replica) tags.
Requires `cat_allocation_stats` to be enabled.
When disabled (default), reports aggregated shard counts per node.
When enabled, reports shard counts per node-index-prirep combination for detailed placement reporting.
Enabling this increases metric cardinality (node_count * index_count).
Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-shards.html.
value:
type: boolean
example: false
- name: admin_forwarder
description: |
Specifies a URL that includes a context root
Expand Down
1 change: 1 addition & 0 deletions elastic/changelog.d/22426.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
elastic: add `prirep` and `index` tags to the `elasticsearch.shards` metric through an additional `detailed_shard_metrics` option
3 changes: 3 additions & 0 deletions elastic/datadog_checks/elastic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'url',
'pending_task_stats',
'cat_allocation_stats',
'detailed_shard_metrics',
'custom_queries',
'submit_events',
],
Expand Down Expand Up @@ -69,6 +70,7 @@ def from_instance(instance):
pending_task_stats = is_affirmative(instance.get('pending_task_stats', True))
admin_forwarder = is_affirmative(instance.get('admin_forwarder', False))
cat_allocation_stats = is_affirmative(instance.get('cat_allocation_stats', False))
detailed_shard_metrics = is_affirmative(instance.get('detailed_shard_metrics', False))

# Support URLs that have a path in them from the config, for
# backwards-compatibility.
Expand Down Expand Up @@ -108,6 +110,7 @@ def from_instance(instance):
url=url,
pending_task_stats=pending_task_stats,
cat_allocation_stats=cat_allocation_stats,
detailed_shard_metrics=detailed_shard_metrics,
custom_queries=custom_queries,
submit_events=submit_events,
)
Expand Down
4 changes: 4 additions & 0 deletions elastic/datadog_checks/elastic/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def instance_detailed_index_stats():
return False


def instance_detailed_shard_metrics():
return False


def instance_disable_generic_tags():
return False

Expand Down
1 change: 1 addition & 0 deletions elastic/datadog_checks/elastic/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class InstanceConfig(BaseModel):
connect_timeout: Optional[float] = None
custom_queries: Optional[tuple[CustomQuery, ...]] = None
detailed_index_stats: Optional[bool] = None
detailed_shard_metrics: Optional[bool] = None
disable_generic_tags: Optional[bool] = None
disable_legacy_cluster_tag: Optional[bool] = None
disable_legacy_service_check_tags: Optional[bool] = None
Expand Down
10 changes: 10 additions & 0 deletions elastic/datadog_checks/elastic/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ instances:
#
# cat_allocation_stats: false

## @param detailed_shard_metrics - boolean - optional - default: false
## Enable to collect detailed shard placement metrics with `index` and `prirep` (primary/replica) tags.
## Requires `cat_allocation_stats` to be enabled.
## When disabled (default), reports aggregated shard counts per node.
## When enabled, reports shard counts per node-index-prirep combination for detailed placement reporting.
## Enabling this increases metric cardinality (node_count * index_count).
## Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-shards.html.
#
# detailed_shard_metrics: false

## @param admin_forwarder - boolean - optional - default: false
## Specifies a URL that includes a context root
## needed for a forwarder application to access Elasticsearch REST services, for example:
Expand Down
71 changes: 70 additions & 1 deletion elastic/datadog_checks/elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ESCheck(AgentCheck):
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
CAT_ALLOC_PATH = '/_cat/allocation?v=true&format=json&bytes=b'
CAT_SHARDS_PATH = '/_cat/shards?format=json&bytes=b'
SOURCE_TYPE_NAME = 'elasticsearch'

def __init__(self, name, init_config, instances):
Expand Down Expand Up @@ -497,16 +498,84 @@ def _process_cat_allocation_data(self, admin_forwarder, version, base_tags):

# we need to remap metric names because the ones from elastic
# contain dots and that would confuse `_process_metric()` (sic)
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent', 'shards'}
if self._config.detailed_shard_metrics:
# when detailed metrics are enabled, only collect disk metrics from _cat/allocation
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent'}
else:
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent', 'shards'}

for dic in cat_allocation_data:
cat_allocation_dic = {
k.replace('.', '_'): v for k, v in dic.items() if k in data_to_collect and v is not None
}
tags = base_tags + ['node_name:' + dic.get('node').lower()]
for metric in CAT_ALLOCATION_METRICS:
# skip shards metric if detailed metrics are enabled (we'll get it from _cat/shards)
if metric == 'elasticsearch.shards' and self._config.detailed_shard_metrics:
continue
desc = CAT_ALLOCATION_METRICS[metric]
self._process_metric(cat_allocation_dic, metric, *desc, tags=tags)

if self._config.detailed_shard_metrics:
self.log.debug("Collecting detailed shard placement metrics")
cat_shards_url = self._join_url(self.CAT_SHARDS_PATH, admin_forwarder)
try:
cat_shards_data = self._get_data(cat_shards_url)
except requests.ReadTimeout as e:
self.log.error("Timed out reading cat shards stats from servers (%s) - stats will be missing", e)
return

from collections import Counter

shard_counts = Counter()

for shard in cat_shards_data:
node = shard.get('node')
index = shard.get('index')
prirep_raw = shard.get('prirep')
state = shard.get('state')

# skip unassigned shards (they have no node)
if node is None or node == 'UNASSIGNED':
continue

if index is None or prirep_raw is None:
continue

# when in RELOCATING state, the reported node is: "source-node -> ip uuid target-node"
# try extracting the source node to count the shard towards it until relocation is over
if state == 'RELOCATING':
if ' -> ' in node:
source_node = node.split(' -> ')[0].strip()
if source_node and ' ' not in source_node:
node = source_node
else:
self.log.debug(
"invalid source node for RELOCATING shard, got: %s, parsed: %s", node, source_node
)
continue
else:
self.log.debug(
"unexpected format for RELOCATING shard (expected 'source -> target'), got: %s", node
)
continue
elif state != 'STARTED':
continue

# better readability: p->primary and r->replica
prirep = 'primary' if prirep_raw == 'p' else 'replica'

key = (node, index, prirep)
shard_counts[key] += 1

for (node, index, prirep), count in shard_counts.items():
tags = base_tags + [
'node_name:{}'.format(node.lower()),
'index:{}'.format(index),
'prirep:{}'.format(prirep),
]
self.gauge('elasticsearch.shards', count, tags=tags)

def _process_custom_metric(
self,
value,
Expand Down
2 changes: 1 addition & 1 deletion elastic/datadog_checks/elastic/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@
}

NODE_SYSTEM_METRICS_POST_5 = {
'system.cpu.idle': ('gauge', 'os.cpu.percent', lambda v: (100 - v)),
'system.cpu.idle': ('gauge', 'os.cpu.percent', lambda v: 100 - v),
'system.load.1': ('gauge', 'os.cpu.load_average.1m'),
'system.load.5': ('gauge', 'os.cpu.load_average.5m'),
'system.load.15': ('gauge', 'os.cpu.load_average.15m'),
Expand Down
2 changes: 1 addition & 1 deletion elastic/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ elasticsearch.search.scroll.time,gauge,,second,,The total time spent on scroll q
elasticsearch.search.scroll.time.count,count,,second,,The total time spent on scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll query time,
elasticsearch.search.scroll.total,gauge,,query,,The total number of scroll queries [v5+].,0,elasticsearch,total scroll queries,
elasticsearch.search.scroll.total.count,count,,query,,The total number of scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll queries,
elasticsearch.shards,gauge,,shard,,Number of primary and replica shards assigned to the node.,0,elasticsearch,shards,
elasticsearch.shards,gauge,,shard,,Number of shards assigned to each node. When detailed_shard_metrics is disabled (default) reports total shards per node. When enabled reports shards per node-index-prirep combination with tags: node_name index and prirep (primary or replica).,0,elasticsearch,shards,
elasticsearch.slm.snapshot_deletion_failures,gauge,,error,,The total number of snapshot deletion failures.,0,elasticsearch,slm del fail,
elasticsearch.slm.snapshots_deleted,gauge,,,,The total number of deleted snapshots.,0,elasticsearch,slm del,
elasticsearch.slm.snapshots_failed,gauge,,error,,The total number of failed snapshots.,0,elasticsearch,slm fail,
Expand Down
47 changes: 47 additions & 0 deletions elastic/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,56 @@ def test_cat_allocation_metrics(dd_environment, aggregator, instance, cluster_ta
elastic_check = ESCheck('elastic', {}, instances=[instance])

elastic_check.check(None)

# Check all cat allocation metrics including aggregated shards
for m_name in CAT_ALLOCATION_METRICS:
aggregator.assert_metric(m_name)

# Verify default behavior - shards metric should NOT have index/prirep tags
metrics = aggregator.metrics('elasticsearch.shards')
assert len(metrics) > 0, "Expected at least one elasticsearch.shards metric"

# Default behavior: should only have node_name tag, no index or prirep
for metric in metrics:
tags = metric.tags
has_index_tag = any(tag.startswith('index:') for tag in tags)
has_prirep_tag = any(tag.startswith('prirep:') for tag in tags)
assert not has_index_tag, "Should not have index tag with detailed_shard_metrics disabled"
assert not has_prirep_tag, "Should not have prirep tag with detailed_shard_metrics disabled"


def test_cat_allocation_detailed_shard_metrics(dd_environment, aggregator, instance, cluster_tags):
instance['cat_allocation_stats'] = True
instance['detailed_shard_metrics'] = True
elastic_check = ESCheck('elastic', {}, instances=[instance])

elastic_check.check(None)

# Check disk metrics (all except shards)
for m_name in CAT_ALLOCATION_METRICS:
if m_name != 'elasticsearch.shards':
aggregator.assert_metric(m_name)

# Check detailed shard placement metrics
aggregator.assert_metric('elasticsearch.shards')

# Verify tags are present - we should have index and prirep tags
metrics = aggregator.metrics('elasticsearch.shards')
assert metrics, "Expected at least one elasticsearch.shards metric"

# Check that at least one metric has both index and prirep tags
has_index_tag = False
has_prirep_tag = False
for metric in metrics:
tags = metric.tags
if any(tag.startswith('index:') for tag in tags):
has_index_tag = True
if any(tag.startswith('prirep:') for tag in tags):
has_prirep_tag = True

assert has_index_tag, "Expected index tag on elasticsearch.shards metric"
assert has_prirep_tag, "Expected prirep tag on elasticsearch.shards metric"


def test_health_event(dd_environment, aggregator):
dummy_tags = ['elastique:recherche']
Expand Down
Loading