Skip to content

Commit a4ce31d

Browse files
committed
telegraf(elasticsearch): add stats for the Remote Backed API, when Opensearch with remote store is enabled
1 parent d8adb1e commit a4ce31d

File tree

4 files changed

+381
-2
lines changed

4 files changed

+381
-2
lines changed

plugins/inputs/elasticsearch/elasticsearch.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type Elasticsearch struct {
103103
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
104104
IndicesInclude []string `toml:"indices_include"`
105105
IndicesLevel string `toml:"indices_level"`
106+
RemoteStoreStats bool `toml:"remote_store_stats"`
106107
NodeStats []string `toml:"node_stats"`
107108
Username string `toml:"username"`
108109
Password string `toml:"password"`
@@ -248,6 +249,15 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
248249
}
249250
}
250251

252+
if e.RemoteStoreStats {
253+
// Here we use e.IndicesInclude; you might adjust if needed.
254+
for _, indexName := range e.IndicesInclude {
255+
if err := e.gatherRemoteStoreStats(s+"/_remotestore/stats/"+indexName, indexName, acc); err != nil {
256+
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
257+
}
258+
}
259+
}
260+
251261
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
252262
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
253263
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
@@ -622,6 +632,80 @@ func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now
622632
return nil
623633
}
624634

635+
func (e *Elasticsearch) gatherRemoteStoreStats(url string, indexName string, acc telegraf.Accumulator) error {
636+
var remoteData map[string]interface{}
637+
if err := e.gatherJSONData(url, &remoteData); err != nil {
638+
return err
639+
}
640+
now := time.Now()
641+
642+
if shards, ok := remoteData["_shards"].(map[string]interface{}); ok {
643+
globalTags := map[string]string{"index_name": indexName}
644+
acc.AddFields("elasticsearch_remotestore_global", shards, globalTags, now)
645+
}
646+
647+
indicesRaw, ok := remoteData["indices"].(map[string]interface{})
648+
if !ok {
649+
return fmt.Errorf("remote store API response missing 'indices' field")
650+
}
651+
652+
idxRaw, exists := indicesRaw[indexName]
653+
if !exists {
654+
return fmt.Errorf("index %s not found in remote store stats", indexName)
655+
}
656+
657+
idxData, ok := idxRaw.(map[string]interface{})
658+
if !ok {
659+
return fmt.Errorf("unexpected format for index %s data", indexName)
660+
}
661+
662+
shardsRaw, ok := idxData["shards"].(map[string]interface{})
663+
if !ok {
664+
return fmt.Errorf("shards field missing or malformed for index %s", indexName)
665+
}
666+
667+
for shardID, shardEntries := range shardsRaw {
668+
entries, ok := shardEntries.([]interface{})
669+
if !ok {
670+
continue
671+
}
672+
// Process each shard entry (primary and replicas)
673+
for _, entry := range entries {
674+
f := jsonparser.JSONFlattener{}
675+
if err := f.FullFlattenJSON("", entry, true, true); err != nil {
676+
return err
677+
}
678+
679+
tags := map[string]string{
680+
"index_name": indexName,
681+
"shard_id": shardID,
682+
}
683+
if entryMap, ok := entry.(map[string]interface{}); ok {
684+
if routing, exists := entryMap["routing"].(map[string]interface{}); exists {
685+
if state, ok := routing["state"].(string); ok {
686+
tags["routing_state"] = state
687+
}
688+
if primary, ok := routing["primary"].(bool); ok {
689+
if primary {
690+
tags["shard_type"] = "primary"
691+
} else {
692+
tags["shard_type"] = "replica"
693+
}
694+
}
695+
if node, ok := routing["node"].(string); ok {
696+
tags["node_id"] = node
697+
}
698+
}
699+
}
700+
701+
delete(f.Fields, "routing")
702+
acc.AddFields("elasticsearch_remotestore_stats_shards", f.Fields, tags, now)
703+
}
704+
}
705+
706+
return nil
707+
}
708+
625709
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
626710
req, err := http.NewRequest("GET", url, nil)
627711
if err != nil {

plugins/inputs/elasticsearch/elasticsearch_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package elasticsearch
22

33
import (
4+
"github.com/stretchr/testify/require"
45
"io"
56
"net/http"
67
"strings"
78
"testing"
89

9-
"github.com/stretchr/testify/require"
10-
1110
"github.com/influxdata/telegraf/testutil"
1211
)
1312

@@ -352,6 +351,44 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) {
352351
replicaTags)
353352
}
354353

354+
func TestGatherRemoteStoreStats(t *testing.T) {
355+
es := newElasticsearchWithClient()
356+
es.RemoteStoreStats = true
357+
es.Servers = []string{"http://example.com:9200"}
358+
es.IndicesInclude = []string{"remote-index"}
359+
es.client.Transport = newTransportMock(remoteStoreResponse)
360+
var acc testutil.Accumulator
361+
url := "http://example.com:9200/_remotestore/stats/remote-index"
362+
err := es.gatherRemoteStoreStats(url, "remote-index", &acc)
363+
require.NoError(t, err)
364+
365+
globalTags := map[string]string{"index_name": "remote-index"}
366+
expectedGlobalFields := map[string]interface{}{
367+
"total": float64(4),
368+
"successful": float64(4),
369+
"failed": float64(0),
370+
}
371+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_global", expectedGlobalFields, globalTags)
372+
373+
primaryTags := map[string]string{
374+
"index_name": "remote-index",
375+
"shard_id": "0",
376+
"routing_state": "STARTED",
377+
"shard_type": "primary",
378+
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
379+
}
380+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesPrimaryShardsExpected, primaryTags)
381+
382+
replicaTags := map[string]string{
383+
"index_name": "remote-index",
384+
"shard_id": "1",
385+
"routing_state": "STARTED",
386+
"shard_type": "replica",
387+
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
388+
}
389+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesReplicaShardsExpected, replicaTags)
390+
}
391+
355392
func newElasticsearchWithClient() *Elasticsearch {
356393
es := NewElasticsearch()
357394
es.client = &http.Client{}

plugins/inputs/elasticsearch/sample.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,9 @@
6161
## the wildcard. Metrics then are gathered for only the
6262
## 'num_most_recent_indices' amount of most recent indices.
6363
# num_most_recent_indices = 0
64+
65+
## With the remote-backed store (s3) enabled feature. This will
66+
## allow the plugin to fetch the stats from the remote s3 store
67+
## when the feature is enabled for the cluster itself.
68+
## To work this require local = true
69+
remote_store_stats = false

0 commit comments

Comments
 (0)