diff --git a/docs/configuration.md b/docs/configuration.md index a782e1f67..9d810f96e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -137,6 +137,11 @@ statistics: # Export the metric with the original CloudWatch timestamp (General Setting for all metrics in this job) [ addCloudwatchTimestamp: ] +# Include any metrics in the past if they are present in the CloudWatch metric response. This is useful, for example, if a metric is setup with +# period 60s and length 300s so all the 5 data points are exposed in the metrics endpoint and not just the last one +# (General Setting for all metrics in this job) +[ addHistoricalMetrics: ] + # List of metric definitions metrics: [ - ... ] @@ -276,6 +281,11 @@ statistics: # Export the metric with the original CloudWatch timestamp (General Setting for all metrics in this job) [ addCloudwatchTimestamp: ] +# Include any metrics in the past if they are present in the CloudWatch metric response. This is useful, for example, if a metric is setup with +# period 60s and length 300s so all the 5 data points are exposed in the metrics endpoint and not just the last one +# (General Setting for all metrics in this job) +[ addHistoricalMetrics: ] + # List of metric definitions metrics: [ - ... ] @@ -339,6 +349,7 @@ Notes: - Available statistics: `Maximum`, `Minimum`, `Sum`, `SampleCount`, `Average`, `pXX` (e.g. `p90`). - Watch out using `addCloudwatchTimestamp` for sparse metrics, e.g from S3, since Prometheus won't scrape metrics containing timestamps older than 2-3 hours. +Also the same applies when enabling `addHistoricalMetrics` in any metric ### `exported_tags_config` diff --git a/pkg/clients/cloudwatch/client.go b/pkg/clients/cloudwatch/client.go index b7d85d70f..8e35b59ed 100644 --- a/pkg/clients/cloudwatch/client.go +++ b/pkg/clients/cloudwatch/client.go @@ -22,7 +22,7 @@ type Client interface { // GetMetricData returns the output of the GetMetricData CloudWatch API. // Results pagination is handled automatically. - GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []MetricDataResult + GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64, addHistoricalMetrics bool) []MetricDataResult // GetMetricStatistics returns the output of the GetMetricStatistics CloudWatch API. GetMetricStatistics(ctx context.Context, logger logging.Logger, dimensions []*model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint @@ -67,9 +67,9 @@ func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logge return res } -func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []MetricDataResult { +func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64, addHistoricalMetrics bool) []MetricDataResult { c.limiter.Acquire(getMetricDataCall) - res := c.client.GetMetricData(ctx, logger, getMetricData, namespace, length, delay, configuredRoundingPeriod) + res := c.client.GetMetricData(ctx, logger, getMetricData, namespace, length, delay, configuredRoundingPeriod, addHistoricalMetrics) c.limiter.Release(getMetricDataCall) return res } diff --git a/pkg/clients/cloudwatch/v1/client.go b/pkg/clients/cloudwatch/v1/client.go index da0b5a687..3f429ad6e 100644 --- a/pkg/clients/cloudwatch/v1/client.go +++ b/pkg/clients/cloudwatch/v1/client.go @@ -84,7 +84,7 @@ func toModelDimensions(dimensions []*cloudwatch.Dimension) []*model.Dimension { return modelDimensions } -func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []cloudwatch_client.MetricDataResult { +func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64, addHistoricalMetrics bool) []cloudwatch_client.MetricDataResult { var resp cloudwatch.GetMetricDataOutput filter := createGetMetricDataInput(getMetricData, &namespace, length, delay, configuredRoundingPeriod, logger) if c.logger.IsDebugEnabled() { @@ -109,18 +109,22 @@ func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMet c.logger.Error(err, "GetMetricData error") return nil } - return toMetricDataResult(resp) + return toMetricDataResult(resp, addHistoricalMetrics) } -func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client.MetricDataResult { +func toMetricDataResult(resp cloudwatch.GetMetricDataOutput, addHistoricalMetrics bool) []cloudwatch_client.MetricDataResult { output := make([]cloudwatch_client.MetricDataResult, 0, len(resp.MetricDataResults)) for _, metricDataResult := range resp.MetricDataResults { - mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id} - if len(metricDataResult.Values) > 0 { - mappedResult.Datapoint = metricDataResult.Values[0] - mappedResult.Timestamp = *metricDataResult.Timestamps[0] + + for i := 0; i < len(metricDataResult.Values); i++ { + mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id} + mappedResult.Datapoint = metricDataResult.Values[i] + mappedResult.Timestamp = *metricDataResult.Timestamps[i] + output = append(output, mappedResult) + if !addHistoricalMetrics { + break + } } - output = append(output, mappedResult) } return output } diff --git a/pkg/clients/cloudwatch/v1/client_test.go b/pkg/clients/cloudwatch/v1/client_test.go index fbfabc004..d3ce37c95 100644 --- a/pkg/clients/cloudwatch/v1/client_test.go +++ b/pkg/clients/cloudwatch/v1/client_test.go @@ -84,7 +84,7 @@ func Test_toMetricDataResult(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - metricDataResults := toMetricDataResult(tc.getMetricDataOutput) + metricDataResults := toMetricDataResult(tc.getMetricDataOutput, false) require.Equal(t, tc.expectedMetricDataResults, metricDataResults) }) } diff --git a/pkg/clients/cloudwatch/v2/client.go b/pkg/clients/cloudwatch/v2/client.go index e6a9a62a6..7fa1af6dc 100644 --- a/pkg/clients/cloudwatch/v2/client.go +++ b/pkg/clients/cloudwatch/v2/client.go @@ -87,7 +87,7 @@ func toModelDimensions(dimensions []types.Dimension) []*model.Dimension { return modelDimensions } -func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64) []cloudwatch_client.MetricDataResult { +func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMetricData []*model.CloudwatchData, namespace string, length int64, delay int64, configuredRoundingPeriod *int64, addHistoricalMetrics bool) []cloudwatch_client.MetricDataResult { filter := createGetMetricDataInput(logger, getMetricData, &namespace, length, delay, configuredRoundingPeriod) var resp cloudwatch.GetMetricDataOutput @@ -115,18 +115,21 @@ func (c client) GetMetricData(ctx context.Context, logger logging.Logger, getMet c.logger.Debug("GetMetricData", "output", resp) } - return toMetricDataResult(resp) + return toMetricDataResult(resp, addHistoricalMetrics) } -func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client.MetricDataResult { +func toMetricDataResult(resp cloudwatch.GetMetricDataOutput, addHistoricalMetrics bool) []cloudwatch_client.MetricDataResult { output := make([]cloudwatch_client.MetricDataResult, 0, len(resp.MetricDataResults)) for _, metricDataResult := range resp.MetricDataResults { - mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id} - if len(metricDataResult.Values) > 0 { - mappedResult.Datapoint = &metricDataResult.Values[0] - mappedResult.Timestamp = metricDataResult.Timestamps[0] + for i := 0; i < len(metricDataResult.Values); i++ { + mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id} + mappedResult.Datapoint = &metricDataResult.Values[i] + mappedResult.Timestamp = metricDataResult.Timestamps[i] + output = append(output, mappedResult) + if !addHistoricalMetrics { + break + } } - output = append(output, mappedResult) } return output } diff --git a/pkg/clients/cloudwatch/v2/client_test.go b/pkg/clients/cloudwatch/v2/client_test.go index bab062e94..e48b1aa54 100644 --- a/pkg/clients/cloudwatch/v2/client_test.go +++ b/pkg/clients/cloudwatch/v2/client_test.go @@ -69,7 +69,7 @@ func Test_toMetricDataResult(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - metricDataResults := toMetricDataResult(tc.getMetricDataOutput) + metricDataResults := toMetricDataResult(tc.getMetricDataOutput, false) require.Equal(t, tc.expectedMetricDataResults, metricDataResults) }) } diff --git a/pkg/clients/v2/cache_test.go b/pkg/clients/v2/cache_test.go new file mode 100644 index 000000000..129eb3263 --- /dev/null +++ b/pkg/clients/v2/cache_test.go @@ -0,0 +1,470 @@ +package v2 + +import ( + "context" + "os" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + aws_config "github.com/aws/aws-sdk-go-v2/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + cloudwatch_client "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging" + "github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model" +) + +func TestNewClientCache_initializes_clients(t *testing.T) { + role1 := config.Role{ + RoleArn: "role1", + ExternalID: "external1", + } + role2 := config.Role{ + RoleArn: "role2", + ExternalID: "external2", + } + role3 := config.Role{ + RoleArn: "role3", + ExternalID: "external3", + } + + region1 := "region1" + region2 := "region2" + region3 := "region3" + tests := []struct { + name string + config config.ScrapeConf + onlyStatic *bool + }{ + { + name: "from discovery config", + config: config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Regions: []string{region1, region2, region3}, + Roles: []config.Role{role1, role2, role3}, + }, + }, + }, + }, + onlyStatic: aws.Bool(false), + }, + { + name: "from static config", + config: config.ScrapeConf{ + Static: []*config.Static{{ + Regions: []string{region1, region2, region3}, + Roles: []config.Role{role1, role2, role3}, + }}, + }, + onlyStatic: aws.Bool(true), + }, + { + name: "from custom config", + config: config.ScrapeConf{ + CustomNamespace: []*config.CustomNamespace{{ + Regions: []string{region1, region2, region3}, + Roles: []config.Role{role1, role2, role3}, + }}, + }, + onlyStatic: aws.Bool(true), + }, + { + name: "from all configs", + config: config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Regions: []string{region1, region2}, + Roles: []config.Role{role1, role2}, + }, + }, + }, + Static: []*config.Static{{ + Regions: []string{region2, region3}, + Roles: []config.Role{role2, role3}, + }}, + CustomNamespace: []*config.CustomNamespace{{ + Regions: []string{region1, region3}, + Roles: []config.Role{role1, role3}, + }}, + }, + onlyStatic: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + output, err := NewCache(test.config, false, logging.NewNopLogger()) + require.NoError(t, err) + cache := output.(*clientCache) + require.NotNil(t, cache) + + assert.False(t, cache.refreshed) + assert.False(t, cache.cleared) + + assert.Len(t, cache.clients, 3) + assert.Contains(t, cache.clients, role1) + assert.Contains(t, cache.clients, role2) + assert.Contains(t, cache.clients, role3) + + for role, regionalClients := range cache.clients { + assert.Len(t, regionalClients, 3) + + assert.Contains(t, regionalClients, region1) + assert.Contains(t, regionalClients, region2) + assert.Contains(t, regionalClients, region3) + + for region, clients := range regionalClients { + assert.NotNil(t, clients, "role %s region %s had nil clients", role, region) + if test.onlyStatic != nil { + assert.Equal(t, *test.onlyStatic, clients.onlyStatic, "role %s region %s had unexpected onlyStatic value", role, region) + } + } + } + }) + } +} + +func TestNewClientCache_sets_fips(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + output, err := NewCache(config, true, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + clients := cache.clients[defaultRole]["region1"] + assert.NotNil(t, clients) + + foundLoadOptions := false + for _, sources := range clients.awsConfig.ConfigSources { + options, ok := sources.(aws_config.LoadOptions) + if !ok { + continue + } + foundLoadOptions = true + assert.Equal(t, aws.FIPSEndpointStateEnabled, options.UseFIPSEndpoint) + } + assert.True(t, foundLoadOptions) +} + +func TestNewClientCache_sets_endpoint_override(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + err := os.Setenv("AWS_ENDPOINT_URL", "https://totallynotaws.com") + require.NoError(t, err) + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + clients := cache.clients[defaultRole]["region1"] + assert.NotNil(t, clients) + assert.NotNil(t, clients.awsConfig.EndpointResolverWithOptions) +} + +func TestClientCache_Clear(t *testing.T) { + cache := &clientCache{ + logger: logging.NewNopLogger(), + clients: map[config.Role]map[awsRegion]*cachedClients{ + defaultRole: { + "region1": &cachedClients{ + awsConfig: nil, + cloudwatch: testClient{}, + tagging: testClient{}, + account: testClient{}, + }, + }, + }, + refreshed: true, + cleared: false, + } + + cache.Clear() + assert.True(t, cache.cleared) + assert.False(t, cache.refreshed) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + assert.Nil(t, clients.cloudwatch) + assert.Nil(t, clients.account) + assert.Nil(t, clients.tagging) +} + +func TestClientCache_Refresh(t *testing.T) { + t.Run("creates all clients when config contains only discovery jobs", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + cache.Refresh() + assert.False(t, cache.cleared) + assert.True(t, cache.refreshed) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + assert.NotNil(t, clients.cloudwatch) + assert.NotNil(t, clients.account) + assert.NotNil(t, clients.tagging) + }) + + t.Run("creates only cloudwatch when config is only static jobs", func(t *testing.T) { + config := config.ScrapeConf{ + Static: []*config.Static{{ + Regions: []string{"region1"}, + Roles: []config.Role{{}}, + }}, + CustomNamespace: []*config.CustomNamespace{{ + Regions: []string{"region1"}, + Roles: []config.Role{{}}, + }}, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + cache.Refresh() + assert.False(t, cache.cleared) + assert.True(t, cache.refreshed) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + assert.NotNil(t, clients.cloudwatch) + assert.Nil(t, clients.account) + assert.Nil(t, clients.tagging) + }) +} + +func TestClientCache_GetAccountClient(t *testing.T) { + t.Run("refreshed cache does not create new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + cache.Refresh() + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + assert.Equal(t, clients.account, output.GetAccountClient("region1", defaultRole)) + }) + + t.Run("unrefreshed cache creates a new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + require.Nil(t, clients.account) + + client := output.GetAccountClient("region1", defaultRole) + assert.Equal(t, clients.account, client) + }) +} + +func TestClientCache_GetCloudwatchClient(t *testing.T) { + t.Run("refreshed cache does not create new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + cache.Refresh() + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + // Can't do equality comparison due to concurrency limiter + assert.NotNil(t, output.GetCloudwatchClient("region1", defaultRole, 1)) + }) + + t.Run("unrefreshed cache creates a new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + require.Nil(t, clients.cloudwatch) + + output.GetCloudwatchClient("region1", defaultRole, 1) + assert.NotNil(t, clients.cloudwatch) + }) +} + +func TestClientCache_GetTaggingClient(t *testing.T) { + t.Run("refreshed cache does not create new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + cache.Refresh() + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + // Can't do equality comparison due to concurrency limiter + assert.NotNil(t, output.GetTaggingClient("region1", defaultRole, 1)) + }) + + t.Run("unrefreshed cache creates a new client", func(t *testing.T) { + config := config.ScrapeConf{ + Discovery: config.Discovery{ + ExportedTagsOnMetrics: nil, + Jobs: []*config.Job{ + { + Roles: []config.Role{{}}, + Regions: []string{"region1"}, + }, + }, + }, + } + + output, err := NewCache(config, false, logging.NewNopLogger()) + require.NoError(t, err) + + cache := output.(*clientCache) + require.NotNil(t, cache) + + clients := cache.clients[defaultRole]["region1"] + require.NotNil(t, clients) + require.Nil(t, clients.tagging) + + output.GetTaggingClient("region1", defaultRole, 1) + assert.NotNil(t, clients.tagging) + }) +} + +type testClient struct{} + +func (t testClient) GetResources(_ context.Context, _ *config.Job, _ string) ([]*model.TaggedResource, error) { + return nil, nil +} + +func (t testClient) GetAccount(_ context.Context) (string, error) { + return "", nil +} + +func (t testClient) ListMetrics(_ context.Context, _ string, _ *config.Metric, _ bool, _ func(page []*model.Metric)) ([]*model.Metric, error) { + return nil, nil +} + +func (t testClient) GetMetricData(_ context.Context, _ logging.Logger, _ []*model.CloudwatchData, _ string, _ int64, _ int64, _ *int64, _ bool) []*cloudwatch_client.MetricDataResult { + return nil +} + +func (t testClient) GetMetricStatistics(_ context.Context, _ logging.Logger, _ []*model.Dimension, _ string, _ *config.Metric) []*model.Datapoint { + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 4c53352cc..950ec53a5 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,6 +40,7 @@ type JobLevelMetricFields struct { Delay int64 `yaml:"delay"` NilToZero *bool `yaml:"nilToZero"` AddCloudwatchTimestamp *bool `yaml:"addCloudwatchTimestamp"` + AddHistoricalMetrics *bool `yaml:"addHistoricalMetrics"` } type Job struct { @@ -87,6 +88,7 @@ type Metric struct { Delay int64 `yaml:"delay"` NilToZero *bool `yaml:"nilToZero"` AddCloudwatchTimestamp *bool `yaml:"addCloudwatchTimestamp"` + AddHistoricalMetrics *bool `yaml:"addHistoricalMetrics"` } type Dimension struct { @@ -342,6 +344,15 @@ func (m *Metric) validateMetric(metricIdx int, parent string, discovery *JobLeve } } + mAddHistoricalMetrics := m.AddHistoricalMetrics + if mAddHistoricalMetrics == nil { + if discovery != nil && discovery.AddHistoricalMetrics != nil { + mAddHistoricalMetrics = discovery.AddHistoricalMetrics + } else { + mAddHistoricalMetrics = aws.Bool(false) + } + } + if mLength < mPeriod { return fmt.Errorf( "Metric [%s/%d] in %v: length(%d) is smaller than period(%d). This can cause that the data requested is not ready and generate data gaps", @@ -353,6 +364,7 @@ func (m *Metric) validateMetric(metricIdx int, parent string, discovery *JobLeve m.Delay = mDelay m.NilToZero = mNilToZero m.AddCloudwatchTimestamp = mAddCloudwatchTimestamp + m.AddHistoricalMetrics = mAddHistoricalMetrics m.Statistics = mStatistics return nil @@ -377,6 +389,7 @@ func (c *ScrapeConf) toModelConfig() model.JobsConfig { job.Delay = discoveryJob.Delay job.NilToZero = discoveryJob.NilToZero job.AddCloudwatchTimestamp = discoveryJob.AddCloudwatchTimestamp + job.AddHistoricalMetrics = discoveryJob.AddHistoricalMetrics job.Roles = toModelRoles(discoveryJob.Roles) job.SearchTags = toModelSearchTags(discoveryJob.SearchTags) job.CustomTags = toModelTags(discoveryJob.CustomTags) @@ -422,6 +435,7 @@ func (c *ScrapeConf) toModelConfig() model.JobsConfig { job.Delay = customNamespaceJob.Delay job.NilToZero = customNamespaceJob.NilToZero job.AddCloudwatchTimestamp = customNamespaceJob.AddCloudwatchTimestamp + job.AddHistoricalMetrics = customNamespaceJob.AddHistoricalMetrics job.Roles = toModelRoles(customNamespaceJob.Roles) job.CustomTags = toModelTags(customNamespaceJob.CustomTags) job.Metrics = toModelMetricConfig(customNamespaceJob.Metrics) @@ -488,6 +502,7 @@ func toModelMetricConfig(metrics []*Metric) []*model.MetricConfig { Delay: m.Delay, NilToZero: m.NilToZero, AddCloudwatchTimestamp: m.AddCloudwatchTimestamp, + AddHistoricalMetrics: m.AddHistoricalMetrics, }) } return ret diff --git a/pkg/job/custom.go b/pkg/job/custom.go index 0b5bad09e..1f1f16ad0 100644 --- a/pkg/job/custom.go +++ b/pkg/job/custom.go @@ -38,6 +38,11 @@ func runCustomNamespaceJob( wg.Add(partition) + var addHistoricalMetrics bool + if job.AddHistoricalMetrics != nil { + addHistoricalMetrics = *job.AddHistoricalMetrics + } + for i := 0; i < metricDataLength; i += maxMetricCount { go func(i int) { defer wg.Done() @@ -47,7 +52,7 @@ func runCustomNamespaceJob( end = metricDataLength } input := getMetricDatas[i:end] - data := clientCloudwatch.GetMetricData(ctx, logger, input, job.Namespace, length, job.Delay, job.RoundingPeriod) + data := clientCloudwatch.GetMetricData(ctx, logger, input, job.Namespace, length, job.Delay, job.RoundingPeriod, addHistoricalMetrics) if data != nil { output := make([]*model.CloudwatchData, 0) @@ -116,6 +121,7 @@ func getMetricDataForQueriesForCustomNamespace( Statistics: []string{stats}, NilToZero: metric.NilToZero, AddCloudwatchTimestamp: metric.AddCloudwatchTimestamp, + AddHistoricalMetrics: metric.AddHistoricalMetrics, Dimensions: cwMetric.Dimensions, Period: metric.Period, }) diff --git a/pkg/job/discovery.go b/pkg/job/discovery.go index 61c2e6675..60c076e24 100644 --- a/pkg/job/discovery.go +++ b/pkg/job/discovery.go @@ -73,6 +73,11 @@ func runDiscoveryJob( count := 0 + var addHistoricalMetrics bool + if job.AddHistoricalMetrics != nil { + addHistoricalMetrics = *job.AddHistoricalMetrics + } + for i := 0; i < metricDataLength; i += maxMetricCount { start := i end := i + maxMetricCount @@ -86,7 +91,7 @@ func runDiscoveryJob( logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum) input := getMetricDatas[start:end] - data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod) + data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod, addHistoricalMetrics) if data != nil { mu.Lock() getMetricDataOutput = append(getMetricDataOutput, data) @@ -104,7 +109,7 @@ func runDiscoveryJob( return nil, nil } - mapResultsToMetricDatas(getMetricDataOutput, getMetricDatas, logger) + getMetricDatas = mapResultsToMetricDatas(getMetricDataOutput, getMetricDatas, logger) // Remove unprocessed/unknown elements in place, if any. Since getMetricDatas // is a slice of pointers, the compaction can be easily done in-place. @@ -117,7 +122,7 @@ func runDiscoveryJob( // mapResultsToMetricDatas walks over all CW GetMetricData results, and map each one with the corresponding model.CloudwatchData. // // This has been extracted into a separate function to make benchmarking easier. -func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) { +func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*model.CloudwatchData, logger logging.Logger) []*model.CloudwatchData { // metricIDToData is a support structure used to easily find via a MetricID, the corresponding // model.CloudatchData. metricIDToData := make(map[string]*model.CloudwatchData, len(datas)) @@ -138,6 +143,8 @@ func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*mo if data == nil { continue } + + previousID := "" for _, metricDataResult := range data { // find into index metricData, ok := metricIDToData[metricDataResult.ID] @@ -145,15 +152,34 @@ func mapResultsToMetricDatas(output [][]cloudwatch.MetricDataResult, datas []*mo logger.Warn("GetMetricData returned unknown metric ID", "metric_id", metricDataResult.ID) continue } + + // TODO: This logic needs to be guarded by a feature flag! Also, remember to add compatibility in the client v2 // skip elements that have been already mapped but still exist in metricIDToData if metricData.MetricID == nil { + if *metricData.AddHistoricalMetrics { + // Use the previousIdx to make a copy + if previousID == metricDataResult.ID { + // Create a new CloudwatchData object + newData := *metricData + newData.GetMetricDataPoint = metricDataResult.Datapoint + newData.GetMetricDataTimestamps = metricDataResult.Timestamp + + datas = append(datas, &newData) + } + } + continue } + metricData.GetMetricDataPoint = metricDataResult.Datapoint metricData.GetMetricDataTimestamps = metricDataResult.Timestamp + previousID = metricDataResult.ID + metricData.MetricID = nil // mark as processed } } + + return datas } func getMetricDataInputLength(metrics []*model.MetricConfig) int64 { @@ -266,6 +292,7 @@ func getFilteredMetricDatas( Statistics: []string{stats}, NilToZero: m.NilToZero, AddCloudwatchTimestamp: m.AddCloudwatchTimestamp, + AddHistoricalMetrics: m.AddHistoricalMetrics, Tags: metricTags, Dimensions: cwMetric.Dimensions, Period: m.Period, diff --git a/pkg/job/discovery_test.go b/pkg/job/discovery_test.go index 803389aff..82c891913 100644 --- a/pkg/job/discovery_test.go +++ b/pkg/job/discovery_test.go @@ -90,6 +90,7 @@ func Test_getFilteredMetricDatas(t *testing.T) { []model.CloudwatchData{ { AddCloudwatchTimestamp: aws.Bool(false), + AddHistoricalMetrics: aws.Bool(false), Dimensions: []*model.Dimension{ { Name: "FileSystemId", @@ -173,6 +174,7 @@ func Test_getFilteredMetricDatas(t *testing.T) { []model.CloudwatchData{ { AddCloudwatchTimestamp: aws.Bool(false), + AddHistoricalMetrics: aws.Bool(false), Dimensions: []*model.Dimension{ { Name: "InstanceId", @@ -252,6 +254,7 @@ func Test_getFilteredMetricDatas(t *testing.T) { []model.CloudwatchData{ { AddCloudwatchTimestamp: aws.Bool(false), + AddHistoricalMetrics: aws.Bool(false), Dimensions: []*model.Dimension{ { Name: "Cluster Name", @@ -375,6 +378,7 @@ func Test_getFilteredMetricDatas(t *testing.T) { []model.CloudwatchData{ { AddCloudwatchTimestamp: aws.Bool(false), + AddHistoricalMetrics: aws.Bool(false), Dimensions: []*model.Dimension{ { Name: "LoadBalancer", @@ -421,6 +425,9 @@ func Test_getFilteredMetricDatas(t *testing.T) { if *got.AddCloudwatchTimestamp != *tt.wantGetMetricsData[i].AddCloudwatchTimestamp { t.Errorf("getFilteredMetricDatas().AddCloudwatchTimestamp = %v, want %v", *got.AddCloudwatchTimestamp, *tt.wantGetMetricsData[i].AddCloudwatchTimestamp) } + if *got.AddHistoricalMetrics != *tt.wantGetMetricsData[i].AddHistoricalMetrics { + t.Errorf("getFilteredMetricDatas().AddHistoricalMetrics = %v, want %v", *got.AddHistoricalMetrics, *tt.wantGetMetricsData[i].AddHistoricalMetrics) + } if *got.NilToZero != *tt.wantGetMetricsData[i].NilToZero { t.Errorf("getFilteredMetricDatas().NilToZero = %v, want %v", *got.NilToZero, *tt.wantGetMetricsData[i].NilToZero) } diff --git a/pkg/job/static.go b/pkg/job/static.go index ad86183d8..bfd4339a5 100644 --- a/pkg/job/static.go +++ b/pkg/job/static.go @@ -33,6 +33,7 @@ func runStaticJob( Statistics: metric.Statistics, NilToZero: metric.NilToZero, AddCloudwatchTimestamp: metric.AddCloudwatchTimestamp, + AddHistoricalMetrics: metric.AddHistoricalMetrics, Dimensions: createStaticDimensions(resource.Dimensions), } diff --git a/pkg/model/model.go b/pkg/model/model.go index 671349acc..4ba9ca84a 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -65,6 +65,7 @@ type JobLevelMetricFields struct { Delay int64 NilToZero *bool AddCloudwatchTimestamp *bool + AddHistoricalMetrics *bool } type Role struct { @@ -80,6 +81,7 @@ type MetricConfig struct { Delay int64 NilToZero *bool AddCloudwatchTimestamp *bool + AddHistoricalMetrics *bool } type DimensionsRegexp struct { @@ -164,6 +166,8 @@ type CloudwatchData struct { GetMetricDataTimestamps time.Time NilToZero *bool AddCloudwatchTimestamp *bool + AddHistoricalMetrics *bool + CustomTags []Tag Tags []Tag Dimensions []*Dimension Period int64 diff --git a/pkg/promutil/migrate.go b/pkg/promutil/migrate.go index 8516500f3..a5e727461 100644 --- a/pkg/promutil/migrate.go +++ b/pkg/promutil/migrate.go @@ -250,7 +250,9 @@ func EnsureLabelConsistencyAndRemoveDuplicates(metrics []*PrometheusMetric, obse } } - metricKey := fmt.Sprintf("%s-%d", *metric.Name, prom_model.LabelsToSignature(metric.Labels)) + // Include the timestamp to avoid genuine duplicates!? At this point we have all the metrics to be exposed under the `/metrics` endpoint so + // we aren't able to tell if some of the metrics are present because the `addHistoricalMetrics` is set to `true`!? + metricKey := fmt.Sprintf("%s-%d-%d", *metric.Name, prom_model.LabelsToSignature(metric.Labels), metric.Timestamp.Unix()) if _, exists := metricKeys[metricKey]; !exists { metricKeys[metricKey] = struct{}{} output = append(output, metric)