Skip to content

Commit e693545

Browse files
committed
add prirep and index tags to elasticsearch.shards metric
1 parent acabb0b commit e693545

File tree

5 files changed

+72
-6
lines changed

5 files changed

+72
-6
lines changed

elastic/datadog_checks/elastic/data/conf.yaml.example

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,12 @@ instances:
112112

113113
## @param cat_allocation_stats - boolean - optional - default: false
114114
## Enable to collect Elastic Cat Allocation metrics. Available only for Elasticsearch 5.0 or higher.
115-
## Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-allocation.html.
115+
## Collects disk usage metrics per node and detailed shard placement metrics.
116+
## The elasticsearch.shards metric includes index and prirep tags for monitoring
117+
## shard placement skew and imbalance between nodes.
118+
## Tags: node_name:<node>, index:<index_name>, prirep:primary or prirep:replica
119+
## Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-allocation.html
120+
## https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-shards.html
116121
#
117122
# cat_allocation_stats: false
118123

elastic/datadog_checks/elastic/elastic.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ class ESCheck(AgentCheck):
9999
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
100100
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
101101
CAT_ALLOC_PATH = '/_cat/allocation?v=true&format=json&bytes=b'
102+
CAT_SHARDS_PATH = '/_cat/shards?format=json&bytes=b'
102103
SOURCE_TYPE_NAME = 'elasticsearch'
103104

104105
def __init__(self, name, init_config, instances):
@@ -490,16 +491,18 @@ def _process_cat_allocation_data(self, admin_forwarder, version, base_tags):
490491
return
491492

492493
self.log.debug("Collecting cat allocation metrics")
494+
495+
# Collect disk metrics from /_cat/allocation
493496
cat_allocation_url = self._join_url(self.CAT_ALLOC_PATH, admin_forwarder)
494497
try:
495498
cat_allocation_data = self._get_data(cat_allocation_url)
496499
except requests.ReadTimeout as e:
497500
self.log.error("Timed out reading cat allocation stats from servers (%s) - stats will be missing", e)
498501
return
499502

500-
# we need to remap metric names because the ones from elastic
501-
# contain dots and that would confuse `_process_metric()` (sic)
502-
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent', 'shards'}
503+
# Process disk metrics (we need to remap metric names because the ones from elastic
504+
# contain dots and that would confuse `_process_metric()`)
505+
data_to_collect = {'disk.indices', 'disk.used', 'disk.avail', 'disk.total', 'disk.percent'}
503506
for dic in cat_allocation_data:
504507
cat_allocation_dic = {
505508
k.replace('.', '_'): v for k, v in dic.items() if k in data_to_collect and v is not None
@@ -509,6 +512,43 @@ def _process_cat_allocation_data(self, admin_forwarder, version, base_tags):
509512
desc = CAT_ALLOCATION_METRICS[metric]
510513
self._process_metric(cat_allocation_dic, metric, *desc, tags=tags)
511514

515+
# Collect detailed shard placement from /_cat/shards
516+
self.log.debug("Collecting detailed shard placement metrics")
517+
cat_shards_url = self._join_url(self.CAT_SHARDS_PATH, admin_forwarder)
518+
try:
519+
cat_shards_data = self._get_data(cat_shards_url)
520+
except requests.ReadTimeout as e:
521+
self.log.error("Timed out reading cat shards stats from servers (%s) - stats will be missing", e)
522+
return
523+
524+
# Group shards by node, index, and prirep to count them
525+
from collections import Counter
526+
shard_counts = Counter()
527+
528+
for shard in cat_shards_data:
529+
node = shard.get('node')
530+
index = shard.get('index')
531+
prirep_raw = shard.get('prirep')
532+
533+
# Skip unassigned shards (they have no node)
534+
if node is None or node == 'UNASSIGNED':
535+
continue
536+
537+
# Map p/r to primary/replica for better readability
538+
prirep = 'primary' if prirep_raw == 'p' else 'replica'
539+
540+
key = (node, index, prirep)
541+
shard_counts[key] += 1
542+
543+
# Submit metrics with tags
544+
for (node, index, prirep), count in shard_counts.items():
545+
tags = base_tags + [
546+
'node_name:{}'.format(node.lower()),
547+
'index:{}'.format(index),
548+
'prirep:{}'.format(prirep)
549+
]
550+
self.gauge('elasticsearch.shards', count, tags=tags)
551+
512552
def _process_custom_metric(
513553
self,
514554
value,

elastic/datadog_checks/elastic/metrics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@
766766
}
767767

768768
CAT_ALLOCATION_METRICS = {
769-
'elasticsearch.shards': ('gauge', 'shards'),
770769
'elasticsearch.disk.indices': ('gauge', 'disk_indices'),
771770
'elasticsearch.disk.used': ('gauge', 'disk_used'),
772771
'elasticsearch.disk.avail': ('gauge', 'disk_avail'),

elastic/metadata.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ elasticsearch.search.scroll.time,gauge,,second,,The total time spent on scroll q
187187
elasticsearch.search.scroll.time.count,count,,second,,The total time spent on scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll query time,
188188
elasticsearch.search.scroll.total,gauge,,query,,The total number of scroll queries [v5+].,0,elasticsearch,total scroll queries,
189189
elasticsearch.search.scroll.total.count,count,,query,,The total number of scroll queries submitted as a count [v5+].,0,elasticsearch,total scroll queries,
190-
elasticsearch.shards,gauge,,shard,,Number of primary and replica shards assigned to the node.,0,elasticsearch,shards,
190+
elasticsearch.shards,gauge,,shard,,Number of shards per node-index-prirep combination. Collected when cat_allocation_stats is enabled. Includes tags: node_name index and prirep (primary or replica) for detailed shard placement analysis.,0,elasticsearch,shards,
191191
elasticsearch.slm.snapshot_deletion_failures,gauge,,error,,The total number of snapshot deletion failures.,0,elasticsearch,slm del fail,
192192
elasticsearch.slm.snapshots_deleted,gauge,,,,The total number of deleted snapshots.,0,elasticsearch,slm del,
193193
elasticsearch.slm.snapshots_failed,gauge,,error,,The total number of failed snapshots.,0,elasticsearch,slm fail,

elastic/tests/test_integration.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,31 @@ def test_cat_allocation_metrics(dd_environment, aggregator, instance, cluster_ta
360360
elastic_check = ESCheck('elastic', {}, instances=[instance])
361361

362362
elastic_check.check(None)
363+
364+
# Check disk metrics
363365
for m_name in CAT_ALLOCATION_METRICS:
364366
aggregator.assert_metric(m_name)
365367

368+
# Check detailed shard placement metrics
369+
aggregator.assert_metric('elasticsearch.shards')
370+
371+
# Verify tags are present - we should have index and prirep tags
372+
metrics = aggregator.metrics('elasticsearch.shards')
373+
assert len(metrics) > 0, "Expected at least one elasticsearch.shards metric"
374+
375+
# Check that at least one metric has both index and prirep tags
376+
has_index_tag = False
377+
has_prirep_tag = False
378+
for metric in metrics:
379+
tags = metric.tags
380+
if any(tag.startswith('index:') for tag in tags):
381+
has_index_tag = True
382+
if any(tag.startswith('prirep:') for tag in tags):
383+
has_prirep_tag = True
384+
385+
assert has_index_tag, "Expected index tag on elasticsearch.shards metric"
386+
assert has_prirep_tag, "Expected prirep tag on elasticsearch.shards metric"
387+
366388

367389
def test_health_event(dd_environment, aggregator):
368390
dummy_tags = ['elastique:recherche']

0 commit comments

Comments
 (0)