Skip to content

Commit 3436e3b

Browse files
authored
Merge branch 'influxdata:master' into master
2 parents 92b4381 + b9ea28c commit 3436e3b

17 files changed

+272
-87
lines changed

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,11 @@ func (c *Config) addSecretStore(name, source string, table *ast.Table) error {
10121012
return fmt.Errorf("invalid secret-store ID %q, must only contain letters, numbers or underscore", storeID)
10131013
}
10141014

1015+
tags := map[string]string{
1016+
"_id": storeID,
1017+
"secretstore": name,
1018+
}
1019+
10151020
creator, ok := secretstores.SecretStores[name]
10161021
if !ok {
10171022
// Handle removed, deprecated plugins
@@ -1033,6 +1038,7 @@ func (c *Config) addSecretStore(name, source string, table *ast.Table) error {
10331038

10341039
logger := logging.New("secretstores", name, "")
10351040
models.SetLoggerOnPlugin(store, logger)
1041+
models.SetStatisticsOnPlugin(store, logger, tags)
10361042

10371043
if err := store.Init(); err != nil {
10381044
return fmt.Errorf("error initializing secret-store %q: %w", storeID, err)

models/running_aggregator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
4242
logger.Error(err)
4343
}
4444
SetLoggerOnPlugin(aggregator, logger)
45+
SetStatisticsOnPlugin(aggregator, logger, tags)
4546

4647
return &RunningAggregator{
4748
Aggregator: aggregator,

models/running_input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
5555
logger.Error(err)
5656
}
5757
SetLoggerOnPlugin(input, logger)
58+
SetStatisticsOnPlugin(input, logger, tags)
5859

5960
return &RunningInput{
6061
Input: input,

models/running_parsers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func NewRunningParser(parser telegraf.Parser, config *ParserConfig) *RunningPars
3232
logger.Error(err)
3333
}
3434
SetLoggerOnPlugin(parser, logger)
35+
SetStatisticsOnPlugin(parser, logger, tags)
3536

3637
return &RunningParser{
3738
Parser: parser,

models/running_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
5050
logger.Error(err)
5151
}
5252
SetLoggerOnPlugin(processor, logger)
53+
SetStatisticsOnPlugin(processor, logger, tags)
5354

5455
return &RunningProcessor{
5556
Processor: processor,

models/running_serializer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func NewRunningSerializer(serializer telegraf.Serializer, config *SerializerConf
4242
logger.Error(err)
4343
}
4444
SetLoggerOnPlugin(serializer, logger)
45+
SetStatisticsOnPlugin(serializer, logger, tags)
4546

4647
return &RunningSerializer{
4748
Serializer: serializer,

plugins/inputs/influxdb_v2_listener/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ to use them.
8787
## 'internal' is the default. 'upstream' is a newer parser that is faster
8888
## and more memory efficient.
8989
# parser_type = "internal"
90+
91+
## Use new internal metrics. When true, it adds tag for alias if set.
92+
# use_internal_statistics = false
9093
```
9194

9295
## Metrics

plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@ type InfluxDBV2Listener struct {
4545
port int
4646
common_tls.ServerConfig
4747

48-
MaxUndeliveredMetrics int `toml:"max_undelivered_metrics"`
49-
ReadTimeout config.Duration `toml:"read_timeout"`
50-
WriteTimeout config.Duration `toml:"write_timeout"`
51-
MaxBodySize config.Size `toml:"max_body_size"`
52-
Token config.Secret `toml:"token"`
53-
BucketTag string `toml:"bucket_tag"`
54-
ParserType string `toml:"parser_type"`
55-
56-
Log telegraf.Logger `toml:"-"`
48+
MaxUndeliveredMetrics int `toml:"max_undelivered_metrics"`
49+
ReadTimeout config.Duration `toml:"read_timeout"`
50+
WriteTimeout config.Duration `toml:"write_timeout"`
51+
MaxBodySize config.Size `toml:"max_body_size"`
52+
Token config.Secret `toml:"token"`
53+
BucketTag string `toml:"bucket_tag"`
54+
ParserType string `toml:"parser_type"`
55+
UseInternalStatistics bool `toml:"use_internal_statistics"`
56+
Statistics *selfstat.Collector `toml:"-"`
57+
Log telegraf.Logger `toml:"-"`
5758

5859
ctx context.Context
5960
cancel context.CancelFunc
@@ -97,15 +98,33 @@ func (h *InfluxDBV2Listener) Init() error {
9798
tags := map[string]string{
9899
"address": h.ServiceAddress,
99100
}
100-
h.bytesRecv = selfstat.Register("influxdb_v2_listener", "bytes_received", tags)
101-
h.requestsServed = selfstat.Register("influxdb_v2_listener", "requests_served", tags)
102-
h.writesServed = selfstat.Register("influxdb_v2_listener", "writes_served", tags)
103-
h.healthsServed = selfstat.Register("influxdb_v2_listener", "healths_served", tags)
104-
h.readysServed = selfstat.Register("influxdb_v2_listener", "readys_served", tags)
105-
h.requestsRecv = selfstat.Register("influxdb_v2_listener", "requests_received", tags)
106-
h.notFoundsServed = selfstat.Register("influxdb_v2_listener", "not_founds_served", tags)
107-
h.pingsServed = selfstat.Register("influxdb_v2_listener", "pings_served", tags)
108-
h.authFailures = selfstat.Register("influxdb_v2_listener", "auth_failures", tags)
101+
if !h.UseInternalStatistics {
102+
h.bytesRecv = selfstat.Register("influxdb_v2_listener", "bytes_received", tags)
103+
h.requestsServed = selfstat.Register("influxdb_v2_listener", "requests_served", tags)
104+
h.writesServed = selfstat.Register("influxdb_v2_listener", "writes_served", tags)
105+
h.healthsServed = selfstat.Register("influxdb_v2_listener", "healths_served", tags)
106+
h.readysServed = selfstat.Register("influxdb_v2_listener", "readys_served", tags)
107+
h.requestsRecv = selfstat.Register("influxdb_v2_listener", "requests_received", tags)
108+
h.notFoundsServed = selfstat.Register("influxdb_v2_listener", "not_founds_served", tags)
109+
h.pingsServed = selfstat.Register("influxdb_v2_listener", "pings_served", tags)
110+
h.authFailures = selfstat.Register("influxdb_v2_listener", "auth_failures", tags)
111+
config.PrintOptionValueDeprecationNotice("inputs.influxdb_v2_listener", "use_internal_statistics", false, telegraf.DeprecationInfo{
112+
Since: "1.37.0",
113+
RemovalIn: "1.45.0",
114+
Notice: "please update to 'use_internal_statistics = true'",
115+
})
116+
} else {
117+
h.bytesRecv = h.Statistics.Register("influxdb_v2_listener", "bytes_received", tags)
118+
h.requestsServed = h.Statistics.Register("influxdb_v2_listener", "requests_served", tags)
119+
h.writesServed = h.Statistics.Register("influxdb_v2_listener", "writes_served", tags)
120+
h.healthsServed = h.Statistics.Register("influxdb_v2_listener", "healths_served", tags)
121+
h.readysServed = h.Statistics.Register("influxdb_v2_listener", "readys_served", tags)
122+
h.requestsRecv = h.Statistics.Register("influxdb_v2_listener", "requests_received", tags)
123+
h.notFoundsServed = h.Statistics.Register("influxdb_v2_listener", "not_founds_served", tags)
124+
h.pingsServed = h.Statistics.Register("influxdb_v2_listener", "pings_served", tags)
125+
h.authFailures = h.Statistics.Register("influxdb_v2_listener", "auth_failures", tags)
126+
}
127+
109128
if err := h.routes(); err != nil {
110129
return err
111130
}

plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_benchmark_test.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,37 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/gofrs/uuid/v5"
12+
"github.com/stretchr/testify/require"
13+
1114
"github.com/influxdata/telegraf/config"
1215
"github.com/influxdata/telegraf/selfstat"
1316
"github.com/influxdata/telegraf/testutil"
1417
)
1518

1619
// newListener is the minimal InfluxDBV2Listener construction to serve writes.
17-
func newListener() *InfluxDBV2Listener {
20+
func newListener(b *testing.B) *InfluxDBV2Listener {
21+
// Make sure we do have a unique stats for this instance to avoid side effects
22+
id, err := uuid.NewV4()
23+
require.NoError(b, err)
24+
1825
listener := &InfluxDBV2Listener{
19-
timeFunc: time.Now,
20-
acc: &testutil.NopAccumulator{},
21-
bytesRecv: selfstat.Register("influxdb_v2_listener", "bytes_received", map[string]string{}),
22-
writesServed: selfstat.Register("influxdb_v2_listener", "writes_served", map[string]string{}),
23-
MaxBodySize: config.Size(defaultMaxBodySize),
26+
timeFunc: time.Now,
27+
acc: &testutil.NopAccumulator{},
28+
bytesRecv: selfstat.Register("influxdb_v2_listener", "bytes_received", map[string]string{
29+
"id": id.String(),
30+
}),
31+
writesServed: selfstat.Register("influxdb_v2_listener", "writes_served", map[string]string{
32+
"id": id.String(),
33+
}),
34+
MaxBodySize: config.Size(defaultMaxBodySize),
2435
}
36+
37+
b.Cleanup(func() {
38+
listener.bytesRecv.Unregister()
39+
listener.writesServed.Unregister()
40+
})
41+
2542
return listener
2643
}
2744

@@ -65,18 +82,14 @@ func BenchmarkInfluxDBV2Listener_serveWrite(b *testing.B) {
6582

6683
for _, bm := range benchmarks {
6784
b.Run(bm.name, func(b *testing.B) {
68-
listener := newListener()
85+
listener := newListener(b)
6986

7087
b.ResetTimer()
7188
for n := 0; n < b.N; n++ {
7289
req, err := http.NewRequest("POST", addr, strings.NewReader(bm.lines))
73-
if err != nil {
74-
b.Error(err)
75-
}
90+
require.NoError(b, err)
7691
listener.handleWrite()(res, req)
77-
if res.Code != http.StatusNoContent {
78-
b.Errorf("unexpected status %d", res.Code)
79-
}
92+
require.Equal(b, http.StatusNoContent, res.Code, "unexpected status")
8093
}
8194
})
8295
}

0 commit comments

Comments
 (0)