Skip to content

Commit 8d9850d

Browse files
woehrl01kgeckhart
andauthored
feat: reimplement history data export (#1642)
Signed-off-by: Lukas Wöhrl <[email protected]> Co-authored-by: Kyle Eckhart <[email protected]>
1 parent c07762a commit 8d9850d

File tree

18 files changed

+608
-251
lines changed

18 files changed

+608
-251
lines changed

docs/configuration.md

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ statistics:
137137
# Export the metric with the original CloudWatch timestamp (General Setting for all metrics in this job)
138138
[ addCloudwatchTimestamp: <boolean> ]
139139

140+
# Enables the inclusion of past metric data points from the CloudWatch response if available.
141+
# This is useful when a metric is configured with a 60-second period and a 300-second duration, ensuring that all
142+
# five data points are exposed at the metrics endpoint instead of only the latest one.
143+
# Note: This option requires `addCloudwatchTimestamp` to be enabled.
144+
# The metric destination must support out of order timestamps, see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb
145+
# (General Setting for all metrics in this job)
146+
[ exportAllDataPoints: <boolean> ]
147+
140148
# List of metric definitions
141149
metrics:
142150
[ - <metric_config> ... ]
@@ -276,6 +284,14 @@ statistics:
276284
# Export the metric with the original CloudWatch timestamp (General Setting for all metrics in this job)
277285
[ addCloudwatchTimestamp: <boolean> ]
278286

287+
# Enables the inclusion of past metric data points from the CloudWatch response if available.
288+
# This is useful when a metric is configured with a 60-second period and a 300-second duration, ensuring that all
289+
# five data points are exposed at the metrics endpoint instead of only the latest one.
290+
# Note: This option requires `addCloudwatchTimestamp` to be enabled.
291+
# The metric destination must support out of order timestamps, see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb
292+
# (General Setting for all metrics in this job)
293+
[ exportAllDataPoints: <boolean> ]
294+
279295
# List of metric definitions
280296
metrics:
281297
[ - <metric_config> ... ]
@@ -333,12 +349,20 @@ statistics:
333349

334350
# Export the metric with the original CloudWatch timestamp (Overrides job level setting)
335351
[ addCloudwatchTimestamp: <boolean> ]
352+
353+
# Enables the inclusion of past metric data points from the CloudWatch response if available.
354+
# This is useful when a metric is configured with a 60-second period and a 300-second duration, ensuring that all
355+
# five data points are exposed at the metrics endpoint instead of only the latest one.
356+
# Note: This option requires `addCloudwatchTimestamp` to be enabled.
357+
# The metric destination must support out of order timestamps, see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tsdb
358+
# (General Setting for all metrics in this job)
359+
[ exportAllDataPoints: <boolean> ]
336360
```
337361
338362
Notes:
339363
- Available statistics: `Maximum`, `Minimum`, `Sum`, `SampleCount`, `Average`, `pXX` (e.g. `p90`).
340364

341-
- Watch out using `addCloudwatchTimestamp` for sparse metrics, e.g from S3, since Prometheus won't scrape metrics containing timestamps older than 2-3 hours.
365+
- Watch out using `addCloudwatchTimestamp` for sparse metrics, e.g from S3, since Prometheus won't scrape metrics containing timestamps older than 2-3 hours. Also the same applies when enabling `exportAllDataPoints` in any metric.
342366

343367
### `exported_tags_config`
344368

@@ -390,4 +414,4 @@ This is an example of the `dimensions_config` block:
390414
dimensions:
391415
- name: AutoScalingGroupName
392416
value: MyGroup
393-
```
417+
```

examples/historic-data.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
apiVersion: v1alpha1
2+
discovery:
3+
jobs:
4+
- type: AWS/SQS
5+
regions:
6+
- us-east-1
7+
period: 60
8+
length: 300
9+
addCloudwatchTimestamp: true
10+
exportAllDataPoints: true
11+
metrics:
12+
- name: NumberOfMessagesSent
13+
statistics: [Sum]
14+
- name: NumberOfMessagesReceived
15+
statistics: [Sum]
16+
- name: NumberOfMessagesDeleted
17+
statistics: [Sum]
18+
- name: ApproximateAgeOfOldestMessage
19+
statistics: [Average]
20+
- name: NumberOfEmptyReceives
21+
statistics: [Sum]
22+
- name: SentMessageSize
23+
statistics: [Average]
24+
- name: ApproximateNumberOfMessagesNotVisible
25+
statistics: [Sum]
26+
- name: ApproximateNumberOfMessagesDelayed
27+
statistics: [Sum]
28+
- name: ApproximateNumberOfMessagesVisible
29+
statistics: [Sum]

pkg/clients/cloudwatch/client.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Client interface {
3737
GetMetricData(ctx context.Context, getMetricData []*model.CloudwatchData, namespace string, startTime time.Time, endTime time.Time) []MetricDataResult
3838

3939
// GetMetricStatistics returns the output of the GetMetricStatistics CloudWatch API.
40-
GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint
40+
GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.MetricStatisticsResult
4141
}
4242

4343
// ConcurrencyLimiter limits the concurrency when calling AWS CloudWatch APIs. The functions implemented
@@ -55,9 +55,12 @@ type ConcurrencyLimiter interface {
5555
}
5656

5757
type MetricDataResult struct {
58-
ID string
59-
// A nil datapoint is a marker for no datapoint being found
60-
Datapoint *float64
58+
ID string
59+
DataPoints []DataPoint
60+
}
61+
62+
type DataPoint struct {
63+
Value *float64
6164
Timestamp time.Time
6265
}
6366

@@ -73,7 +76,7 @@ func NewLimitedConcurrencyClient(client Client, limiter ConcurrencyLimiter) Clie
7376
}
7477
}
7578

76-
func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
79+
func (c limitedConcurrencyClient) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.MetricStatisticsResult {
7780
c.limiter.Acquire(getMetricStatisticsCall)
7881
res := c.client.GetMetricStatistics(ctx, logger, dimensions, namespace, metric)
7982
c.limiter.Release(getMetricStatisticsCall)

pkg/clients/cloudwatch/v1/client.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func toModelDimensions(dimensions []*cloudwatch.Dimension) []model.Dimension {
9595

9696
func (c client) GetMetricData(ctx context.Context, getMetricData []*model.CloudwatchData, namespace string, startTime time.Time, endTime time.Time) []cloudwatch_client.MetricDataResult {
9797
metricDataQueries := make([]*cloudwatch.MetricDataQuery, 0, len(getMetricData))
98+
exportAllDataPoints := false
9899
for _, data := range getMetricData {
99100
metricStat := &cloudwatch.MetricStat{
100101
Metric: &cloudwatch.Metric{
@@ -110,6 +111,7 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
110111
MetricStat: metricStat,
111112
ReturnData: aws.Bool(true),
112113
})
114+
exportAllDataPoints = exportAllDataPoints || data.MetricMigrationParams.ExportAllDataPoints
113115
}
114116
input := &cloudwatch.GetMetricDataInput{
115117
EndTime: &endTime,
@@ -137,23 +139,31 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
137139
c.logger.Error("GetMetricData error", "err", err)
138140
return nil
139141
}
140-
return toMetricDataResult(resp)
142+
return toMetricDataResult(resp, exportAllDataPoints)
141143
}
142144

143-
func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client.MetricDataResult {
145+
func toMetricDataResult(resp cloudwatch.GetMetricDataOutput, exportAllDataPoints bool) []cloudwatch_client.MetricDataResult {
144146
output := make([]cloudwatch_client.MetricDataResult, 0, len(resp.MetricDataResults))
145147
for _, metricDataResult := range resp.MetricDataResults {
146-
mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id}
147-
if len(metricDataResult.Values) > 0 {
148-
mappedResult.Datapoint = metricDataResult.Values[0]
149-
mappedResult.Timestamp = *metricDataResult.Timestamps[0]
148+
mappedResult := cloudwatch_client.MetricDataResult{
149+
ID: *metricDataResult.Id,
150+
DataPoints: make([]cloudwatch_client.DataPoint, 0, len(metricDataResult.Timestamps))}
151+
for i := 0; i < len(metricDataResult.Timestamps); i++ {
152+
mappedResult.DataPoints = append(mappedResult.DataPoints, cloudwatch_client.DataPoint{
153+
Value: metricDataResult.Values[i],
154+
Timestamp: *metricDataResult.Timestamps[i],
155+
})
156+
157+
if !exportAllDataPoints {
158+
break
159+
}
150160
}
151161
output = append(output, mappedResult)
152162
}
153163
return output
154164
}
155165

156-
func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
166+
func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.MetricStatisticsResult {
157167
filter := createGetMetricStatisticsInput(dimensions, &namespace, metric, logger)
158168

159169
c.logger.Debug("GetMetricStatistics", "input", filter)
@@ -171,14 +181,14 @@ func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, di
171181
return nil
172182
}
173183

174-
return toModelDatapoints(resp.Datapoints)
184+
return toModelDataPoints(resp.Datapoints)
175185
}
176186

177-
func toModelDatapoints(cwDatapoints []*cloudwatch.Datapoint) []*model.Datapoint {
178-
modelDataPoints := make([]*model.Datapoint, 0, len(cwDatapoints))
187+
func toModelDataPoints(cwDataPoints []*cloudwatch.Datapoint) []*model.MetricStatisticsResult {
188+
modelDataPoints := make([]*model.MetricStatisticsResult, 0, len(cwDataPoints))
179189

180-
for _, cwDatapoint := range cwDatapoints {
181-
modelDataPoints = append(modelDataPoints, &model.Datapoint{
190+
for _, cwDatapoint := range cwDataPoints {
191+
modelDataPoints = append(modelDataPoints, &model.MetricStatisticsResult{
182192
Average: cwDatapoint.Average,
183193
ExtendedStatistics: cwDatapoint.ExtendedStatistics,
184194
Maximum: cwDatapoint.Maximum,

pkg/clients/cloudwatch/v1/client_test.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@ func Test_toMetricDataResult(t *testing.T) {
4747
name string
4848
getMetricDataOutput cloudwatch.GetMetricDataOutput
4949
expectedMetricDataResults []cloudwatch_client.MetricDataResult
50+
exportAllDataPoints bool
5051
}
5152

5253
testCases := []testCase{
5354
{
54-
name: "all metrics present",
55+
name: "all metrics present",
56+
exportAllDataPoints: false,
5557
getMetricDataOutput: cloudwatch.GetMetricDataOutput{
5658
MetricDataResults: []*cloudwatch.MetricDataResult{
5759
{
@@ -67,12 +69,21 @@ func Test_toMetricDataResult(t *testing.T) {
6769
},
6870
},
6971
expectedMetricDataResults: []cloudwatch_client.MetricDataResult{
70-
{ID: "metric-1", Datapoint: aws.Float64(1.0), Timestamp: ts.Add(10 * time.Minute)},
71-
{ID: "metric-2", Datapoint: aws.Float64(2.0), Timestamp: ts},
72+
{
73+
ID: "metric-1", DataPoints: []cloudwatch_client.DataPoint{
74+
{Value: aws.Float64(1.0), Timestamp: ts.Add(10 * time.Minute)},
75+
},
76+
},
77+
{
78+
ID: "metric-2", DataPoints: []cloudwatch_client.DataPoint{
79+
{Value: aws.Float64(2.0), Timestamp: ts},
80+
},
81+
},
7282
},
7383
},
7484
{
75-
name: "metric with no values",
85+
name: "metric with no values",
86+
exportAllDataPoints: false,
7687
getMetricDataOutput: cloudwatch.GetMetricDataOutput{
7788
MetricDataResults: []*cloudwatch.MetricDataResult{
7889
{
@@ -88,15 +99,54 @@ func Test_toMetricDataResult(t *testing.T) {
8899
},
89100
},
90101
expectedMetricDataResults: []cloudwatch_client.MetricDataResult{
91-
{ID: "metric-1", Datapoint: aws.Float64(1.0), Timestamp: ts.Add(10 * time.Minute)},
92-
{ID: "metric-2", Datapoint: nil, Timestamp: time.Time{}},
102+
{
103+
ID: "metric-1", DataPoints: []cloudwatch_client.DataPoint{
104+
{Value: aws.Float64(1.0), Timestamp: ts.Add(10 * time.Minute)},
105+
},
106+
},
107+
{
108+
ID: "metric-2",
109+
DataPoints: []cloudwatch_client.DataPoint{},
110+
},
111+
},
112+
},
113+
{
114+
name: "export all data points",
115+
exportAllDataPoints: true,
116+
getMetricDataOutput: cloudwatch.GetMetricDataOutput{
117+
MetricDataResults: []*cloudwatch.MetricDataResult{
118+
{
119+
Id: aws.String("metric-1"),
120+
Values: []*float64{aws.Float64(1.0), aws.Float64(2.0), aws.Float64(3.0)},
121+
Timestamps: []*time.Time{aws.Time(ts.Add(10 * time.Minute)), aws.Time(ts.Add(5 * time.Minute)), aws.Time(ts)},
122+
},
123+
{
124+
Id: aws.String("metric-2"),
125+
Values: []*float64{aws.Float64(2.0)},
126+
Timestamps: []*time.Time{aws.Time(ts)},
127+
},
128+
},
129+
},
130+
expectedMetricDataResults: []cloudwatch_client.MetricDataResult{
131+
{
132+
ID: "metric-1", DataPoints: []cloudwatch_client.DataPoint{
133+
{Value: aws.Float64(1.0), Timestamp: ts.Add(10 * time.Minute)},
134+
{Value: aws.Float64(2.0), Timestamp: ts.Add(5 * time.Minute)},
135+
{Value: aws.Float64(3.0), Timestamp: ts},
136+
},
137+
},
138+
{
139+
ID: "metric-2", DataPoints: []cloudwatch_client.DataPoint{
140+
{Value: aws.Float64(2.0), Timestamp: ts},
141+
},
142+
},
93143
},
94144
},
95145
}
96146

97147
for _, tc := range testCases {
98148
t.Run(tc.name, func(t *testing.T) {
99-
metricDataResults := toMetricDataResult(tc.getMetricDataOutput)
149+
metricDataResults := toMetricDataResult(tc.getMetricDataOutput, tc.exportAllDataPoints)
100150
require.Equal(t, tc.expectedMetricDataResults, metricDataResults)
101151
})
102152
}

pkg/clients/cloudwatch/v2/client.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func toModelDimensions(dimensions []types.Dimension) []model.Dimension {
9898

9999
func (c client) GetMetricData(ctx context.Context, getMetricData []*model.CloudwatchData, namespace string, startTime time.Time, endTime time.Time) []cloudwatch_client.MetricDataResult {
100100
metricDataQueries := make([]types.MetricDataQuery, 0, len(getMetricData))
101+
exportAllDataPoints := false
101102
for _, data := range getMetricData {
102103
metricStat := &types.MetricStat{
103104
Metric: &types.Metric{
@@ -113,6 +114,7 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
113114
MetricStat: metricStat,
114115
ReturnData: aws.Bool(true),
115116
})
117+
exportAllDataPoints = exportAllDataPoints || data.MetricMigrationParams.ExportAllDataPoints
116118
}
117119

118120
input := &cloudwatch.GetMetricDataInput{
@@ -143,23 +145,32 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
143145

144146
c.logger.Debug("GetMetricData", "output", resp)
145147

146-
return toMetricDataResult(resp)
148+
return toMetricDataResult(resp, exportAllDataPoints)
147149
}
148150

149-
func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client.MetricDataResult {
151+
func toMetricDataResult(resp cloudwatch.GetMetricDataOutput, exportAllDataPoints bool) []cloudwatch_client.MetricDataResult {
150152
output := make([]cloudwatch_client.MetricDataResult, 0, len(resp.MetricDataResults))
151153
for _, metricDataResult := range resp.MetricDataResults {
152-
mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id}
153-
if len(metricDataResult.Values) > 0 {
154-
mappedResult.Datapoint = &metricDataResult.Values[0]
155-
mappedResult.Timestamp = metricDataResult.Timestamps[0]
154+
mappedResult := cloudwatch_client.MetricDataResult{
155+
ID: *metricDataResult.Id,
156+
DataPoints: make([]cloudwatch_client.DataPoint, 0, len(metricDataResult.Timestamps)),
157+
}
158+
for i := 0; i < len(metricDataResult.Timestamps); i++ {
159+
mappedResult.DataPoints = append(mappedResult.DataPoints, cloudwatch_client.DataPoint{
160+
Value: &metricDataResult.Values[i],
161+
Timestamp: metricDataResult.Timestamps[i],
162+
})
163+
164+
if !exportAllDataPoints {
165+
break
166+
}
156167
}
157168
output = append(output, mappedResult)
158169
}
159170
return output
160171
}
161172

162-
func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.Datapoint {
173+
func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, dimensions []model.Dimension, namespace string, metric *model.MetricConfig) []*model.MetricStatisticsResult {
163174
filter := createGetMetricStatisticsInput(logger, dimensions, &namespace, metric)
164175
c.logger.Debug("GetMetricStatistics", "input", filter)
165176

@@ -181,18 +192,18 @@ func (c client) GetMetricStatistics(ctx context.Context, logger *slog.Logger, di
181192
ptrs = append(ptrs, &datapoint)
182193
}
183194

184-
return toModelDatapoints(ptrs)
195+
return toModelDataPoints(ptrs)
185196
}
186197

187-
func toModelDatapoints(cwDatapoints []*types.Datapoint) []*model.Datapoint {
188-
modelDataPoints := make([]*model.Datapoint, 0, len(cwDatapoints))
198+
func toModelDataPoints(cwDataPoints []*types.Datapoint) []*model.MetricStatisticsResult {
199+
modelDataPoints := make([]*model.MetricStatisticsResult, 0, len(cwDataPoints))
189200

190-
for _, cwDatapoint := range cwDatapoints {
201+
for _, cwDatapoint := range cwDataPoints {
191202
extendedStats := make(map[string]*float64, len(cwDatapoint.ExtendedStatistics))
192203
for name, value := range cwDatapoint.ExtendedStatistics {
193204
extendedStats[name] = &value
194205
}
195-
modelDataPoints = append(modelDataPoints, &model.Datapoint{
206+
modelDataPoints = append(modelDataPoints, &model.MetricStatisticsResult{
196207
Average: cwDatapoint.Average,
197208
ExtendedStatistics: extendedStats,
198209
Maximum: cwDatapoint.Maximum,

0 commit comments

Comments
 (0)