Skip to content

Commit fc8aa6b

Browse files
committed
add prirep and index tags to elasticsearch.shards metric
1 parent bbfe5e9 commit fc8aa6b

File tree

5 files changed

+72
-6
lines changed

5 files changed

+72
-6
lines changed

elastic/datadog_checks/elastic/data/conf.yaml.example

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,12 @@ instances:
112112

113113
## @param cat_allocation_stats - boolean - optional - default: false
114114
## Enable to collect Elastic Cat Allocation metrics. Available only for Elasticsearch 5.0 or higher.
115-
## Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-allocation.html.
115+
## Collects disk usage metrics per node and detailed shard placement metrics.
116+
## The elasticsearch.shards metric includes index and prirep tags for monitoring
117+
## shard placement skew and imbalance between nodes.
118+
## Tags: node_name:<node>, index:<index_name>, prirep:primary or prirep:replica
119+
## Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-allocation.html
120+
## https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-shards.html
116121
#
117122
# cat_allocation_stats: false
118123

elastic/datadog_checks/elastic/elastic.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class ESCheck(AgentCheck):
9898
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
9999
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
100100
CAT_ALLOC_PATH = '/_cat/allocation?v=true&format=json&bytes=b'
101+
CAT_SHARDS_PATH = '/_cat/shards?format=json&bytes=b'
101102
SOURCE_TYPE_NAME = 'elasticsearch'
102103

103104
def __init__(self, name, init_config, instances):
@@ -488,16 +489,18 @@ def _process_cat_allocation_data(self, admin_forwarder, version, base_tags):
488489
return
489490

490491
self.log.debug("Collecting cat allocation metrics")
492+
493+
# Collect disk metrics from /_cat/allocation
491494
cat_allocation_url = self._join_url(self.CAT_ALLOC_PATH, admin_forwarder)
492495
try:
493496
cat_allocation_data = self._get_data(cat_allocation_url)
494497
except requests.ReadTimeout as e:
495498
self.log.error("Timed out reading cat allocation stats from servers (%s) - stats will be missing", e)
496499
return
497500

498-
# we need to remap metric names because the ones from elastic
499-
# contain dots and that would confuse `_process_metric()` (sic)
500-
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent', 'shards'}
501+
# Process disk metrics (we need to remap metric names because the ones from elastic
502+
# contain dots and that would confuse `_process_metric()`)
503+
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent'}
501504
for dic in cat_allocation_data:
502505
cat_allocation_dic = {
503506
k.replace('.', '_'): v for k, v in dic.items() if k in data_to_collect and v is not None
@@ -507,6 +510,43 @@ def _process_cat_allocation_data(self, admin_forwarder, version, base_tags):
507510
desc = CAT_ALLOCATION_METRICS[metric]
508511
self._process_metric(cat_allocation_dic, metric, *desc, tags=tags)
509512

513+
# Collect detailed shard placement from /_cat/shards
514+
self.log.debug("Collecting detailed shard placement metrics")
515+
cat_shards_url = self._join_url(self.CAT_SHARDS_PATH, admin_forwarder)
516+
try:
517+
cat_shards_data = self._get_data(cat_shards_url)
518+
except requests.ReadTimeout as e:
519+
self.log.error("Timed out reading cat shards stats from servers (%s) - stats will be missing", e)
520+
return
521+
522+
# Group shards by node, index, and prirep to count them
523+
from collections import Counter
524+
shard_counts = Counter()
525+
526+
for shard in cat_shards_data:
527+
node = shard.get('node')
528+
index = shard.get('index')
529+
prirep_raw = shard.get('prirep')
530+
531+
# Skip unassigned shards (they have no node)
532+
if node is None or node == 'UNASSIGNED':
533+
continue
534+
535+
# Map p/r to primary/replica for better readability
536+
prirep = 'primary' if prirep_raw == 'p' else 'replica'
537+
538+
key = (node, index, prirep)
539+
shard_counts[key] += 1
540+
541+
# Submit metrics with tags
542+
for (node, index, prirep), count in shard_counts.items():
543+
tags = base_tags + [
544+
'node_name:{}'.format(node.lower()),
545+
'index:{}'.format(index),
546+
'prirep:{}'.format(prirep)
547+
]
548+
self.gauge('elasticsearch.shards', count, tags=tags)
549+
510550
def _process_custom_metric(
511551
self,
512552
value,

elastic/datadog_checks/elastic/metrics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@
766766
}
767767

768768
CAT_ALLOCATION_METRICS = {
769-
'elasticsearch.shards': ('gauge', 'shards'),
770769
'elasticsearch.disk.indices': ('gauge', 'disk_indices'),
771770
'elasticsearch.disk.used': ('gauge', 'disk_used'),
772771
'elasticsearch.disk.avail': ('gauge', 'disk_avail'),

elastic/metadata.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ elasticsearch.search.scroll.time,gauge,,second,,The total time spent on scroll q
214214
elasticsearch.search.scroll.time.count,count,,second,,The total time spent on scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll query time,
215215
elasticsearch.search.scroll.total,gauge,,query,,The total number of scroll queries [v5+].,0,elasticsearch,total scroll queries,
216216
elasticsearch.search.scroll.total.count,count,,query,,The total number of scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll queries,
217-
elasticsearch.shards,gauge,,shard,,Number of primary and replica shards assigned to the node.,0,elasticsearch,shards,
217+
elasticsearch.shards,gauge,,shard,,Number of shards per node-index-prirep combination. Collected when cat_allocation_stats is enabled. Includes tags: node_name index and prirep (primary or replica) for detailed shard placement analysis.,0,elasticsearch,shards,
218218
elasticsearch.slm.snapshot_deletion_failures,gauge,,error,,The total number of snapshot deletion failures.,0,elasticsearch,slm del fail,
219219
elasticsearch.slm.snapshots_deleted,gauge,,,,The total number of deleted snapshots.,0,elasticsearch,slm del,
220220
elasticsearch.slm.snapshots_failed,gauge,,error,,The total number of failed snapshots.,0,elasticsearch,slm fail,

elastic/tests/test_integration.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,31 @@ def test_cat_allocation_metrics(dd_environment, aggregator, instance, cluster_ta
359359
elastic_check = ESCheck('elastic', {}, instances=[instance])
360360

361361
elastic_check.check(None)
362+
363+
# Check disk metrics
362364
for m_name in CAT_ALLOCATION_METRICS:
363365
aggregator.assert_metric(m_name)
364366

367+
# Check detailed shard placement metrics
368+
aggregator.assert_metric('elasticsearch.shards')
369+
370+
# Verify tags are present - we should have index and prirep tags
371+
metrics = aggregator.metrics('elasticsearch.shards')
372+
assert len(metrics) > 0, "Expected at least one elasticsearch.shards metric"
373+
374+
# Check that at least one metric has both index and prirep tags
375+
has_index_tag = False
376+
has_prirep_tag = False
377+
for metric in metrics:
378+
tags = metric.tags
379+
if any(tag.startswith('index:') for tag in tags):
380+
has_index_tag = True
381+
if any(tag.startswith('prirep:') for tag in tags):
382+
has_prirep_tag = True
383+
384+
assert has_index_tag, "Expected index tag on elasticsearch.shards metric"
385+
assert has_prirep_tag, "Expected prirep tag on elasticsearch.shards metric"
386+
365387

366388
def test_health_event(dd_environment, aggregator):
367389
dummy_tags = ['elastique:recherche']

0 commit comments

Comments
 (0)