Skip to content

Commit c58cf36

Browse files
committed
Add RemoteStats for the Opensearch service
1 parent 4f33937 commit c58cf36

File tree

4 files changed

+380
-2
lines changed

4 files changed

+380
-2
lines changed

plugins/inputs/elasticsearch/elasticsearch.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Elasticsearch struct {
4545
ClusterStats bool `toml:"cluster_stats"`
4646
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
4747
EnrichStats bool `toml:"enrich_stats"`
48+
RemoteStoreStats bool `toml:"remote_store_stats"`
4849
IndicesInclude []string `toml:"indices_include"`
4950
IndicesLevel string `toml:"indices_level"`
5051
NodeStats []string `toml:"node_stats"`
@@ -231,6 +232,14 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
231232
}
232233
}
233234

235+
if e.RemoteStoreStats {
236+
for _, indexName := range e.IndicesInclude {
237+
if err := e.gatherRemoteStoreStats(s+"/_remotestore/stats/"+indexName, indexName, acc); err != nil {
238+
acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
239+
}
240+
}
241+
}
242+
234243
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
235244
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
236245
acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
@@ -650,6 +659,80 @@ func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now
650659
return nil
651660
}
652661

662+
func (e *Elasticsearch) gatherRemoteStoreStats(url string, indexName string, acc telegraf.Accumulator) error {
663+
var remoteData map[string]interface{}
664+
if err := e.gatherJSONData(url, &remoteData); err != nil {
665+
return err
666+
}
667+
now := time.Now()
668+
669+
if shards, ok := remoteData["_shards"].(map[string]interface{}); ok {
670+
globalTags := map[string]string{"index_name": indexName}
671+
acc.AddFields("elasticsearch_remotestore_global", shards, globalTags, now)
672+
}
673+
674+
indicesRaw, ok := remoteData["indices"].(map[string]interface{})
675+
if !ok {
676+
return fmt.Errorf("remote store API response missing 'indices' field")
677+
}
678+
679+
idxRaw, exists := indicesRaw[indexName]
680+
if !exists {
681+
return fmt.Errorf("index %s not found in remote store stats", indexName)
682+
}
683+
684+
idxData, ok := idxRaw.(map[string]interface{})
685+
if !ok {
686+
return fmt.Errorf("unexpected format for index %s data", indexName)
687+
}
688+
689+
shardsRaw, ok := idxData["shards"].(map[string]interface{})
690+
if !ok {
691+
return fmt.Errorf("shards field missing or malformed for index %s", indexName)
692+
}
693+
694+
for shardID, shardEntries := range shardsRaw {
695+
entries, ok := shardEntries.([]interface{})
696+
if !ok {
697+
continue
698+
}
699+
// Process each shard entry (primary and replicas)
700+
for _, entry := range entries {
701+
f := parsers_json.JSONFlattener{}
702+
if err := f.FullFlattenJSON("", entry, true, true); err != nil {
703+
return err
704+
}
705+
706+
tags := map[string]string{
707+
"index_name": indexName,
708+
"shard_id": shardID,
709+
}
710+
if entryMap, ok := entry.(map[string]interface{}); ok {
711+
if routing, exists := entryMap["routing"].(map[string]interface{}); exists {
712+
if state, ok := routing["state"].(string); ok {
713+
tags["routing_state"] = state
714+
}
715+
if primary, ok := routing["primary"].(bool); ok {
716+
if primary {
717+
tags["shard_type"] = "primary"
718+
} else {
719+
tags["shard_type"] = "replica"
720+
}
721+
}
722+
if node, ok := routing["node"].(string); ok {
723+
tags["node_id"] = node
724+
}
725+
}
726+
}
727+
728+
delete(f.Fields, "routing")
729+
acc.AddFields("elasticsearch_remotestore_stats_shards", f.Fields, tags, now)
730+
}
731+
}
732+
733+
return nil
734+
}
735+
653736
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
654737
req, err := http.NewRequest("GET", url, nil)
655738
if err != nil {

plugins/inputs/elasticsearch/elasticsearch_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package elasticsearch
22

33
import (
4+
"github.com/stretchr/testify/require"
45
"io"
56
"net/http"
67
"strings"
78
"testing"
89

9-
"github.com/stretchr/testify/require"
10-
1110
"github.com/influxdata/telegraf/testutil"
1211
)
1312

@@ -366,6 +365,44 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) {
366365
replicaTags)
367366
}
368367

368+
func TestGatherRemoteStoreStats(t *testing.T) {
369+
es := newElasticsearchWithClient()
370+
es.RemoteStoreStats = true
371+
es.Servers = []string{"http://example.com:9200"}
372+
es.IndicesInclude = []string{"remote-index"}
373+
es.client.Transport = newTransportMock(remoteStoreResponse)
374+
var acc testutil.Accumulator
375+
url := "http://example.com:9200/_remotestore/stats/remote-index"
376+
err := es.gatherRemoteStoreStats(url, "remote-index", &acc)
377+
require.NoError(t, err)
378+
379+
globalTags := map[string]string{"index_name": "remote-index"}
380+
expectedGlobalFields := map[string]interface{}{
381+
"total": float64(4),
382+
"successful": float64(4),
383+
"failed": float64(0),
384+
}
385+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_global", expectedGlobalFields, globalTags)
386+
387+
primaryTags := map[string]string{
388+
"index_name": "remote-index",
389+
"shard_id": "0",
390+
"routing_state": "STARTED",
391+
"shard_type": "primary",
392+
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
393+
}
394+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesPrimaryShardsExpected, primaryTags)
395+
396+
replicaTags := map[string]string{
397+
"index_name": "remote-index",
398+
"shard_id": "1",
399+
"routing_state": "STARTED",
400+
"shard_type": "replica",
401+
"node_id": "q1VxWZnCTICrfRc2bRW3nw",
402+
}
403+
acc.AssertContainsTaggedFields(t, "elasticsearch_remotestore_stats_shards", remoteStoreIndicesReplicaShardsExpected, replicaTags)
404+
}
405+
369406
func newElasticsearchWithClient() *Elasticsearch {
370407
es := newElasticsearch()
371408
es.client = &http.Client{}

plugins/inputs/elasticsearch/sample.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,9 @@
7171
## the wildcard. Metrics then are gathered for only the
7272
## 'num_most_recent_indices' amount of most recent indices.
7373
# num_most_recent_indices = 0
74+
75+
## With the remote-backed store (s3) enabled feature. This will
76+
## allow the plugin to fetch the stats from the remote s3 store
77+
## when the feature is enabled for the cluster itself.
78+
## To work this require local = true
79+
remote_store_stats = false

0 commit comments

Comments
 (0)