-
-
Notifications
You must be signed in to change notification settings - Fork 382
feat: reimplement history data export #1642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
6401f0f
e089bca
ce858fb
1d3536c
96e5285
52f7b4e
88404b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| apiVersion: v1alpha1 | ||
| discovery: | ||
| jobs: | ||
| - type: AWS/SQS | ||
| regions: | ||
| - us-east-1 | ||
| period: 60 | ||
| length: 300 | ||
| addCloudwatchTimestamp: true | ||
| exportAllDataPoints: true | ||
| metrics: | ||
| - name: NumberOfMessagesSent | ||
| statistics: [Sum] | ||
| - name: NumberOfMessagesReceived | ||
| statistics: [Sum] | ||
| - name: NumberOfMessagesDeleted | ||
| statistics: [Sum] | ||
| - name: ApproximateAgeOfOldestMessage | ||
| statistics: [Average] | ||
| - name: NumberOfEmptyReceives | ||
| statistics: [Sum] | ||
| - name: SentMessageSize | ||
| statistics: [Average] | ||
| - name: ApproximateNumberOfMessagesNotVisible | ||
| statistics: [Sum] | ||
| - name: ApproximateNumberOfMessagesDelayed | ||
| statistics: [Sum] | ||
| - name: ApproximateNumberOfMessagesVisible | ||
| statistics: [Sum] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,12 +55,26 @@ type ConcurrencyLimiter interface { | |
| } | ||
|
|
||
| type MetricDataResult struct { | ||
| ID string | ||
| // A nil datapoint is a marker for no datapoint being found | ||
| ID string | ||
| Datapoints []DatapointWithTimestamp | ||
| } | ||
|
|
||
| type DatapointWithTimestamp struct { | ||
|
||
| Datapoint *float64 | ||
| Timestamp time.Time | ||
| } | ||
|
|
||
| func NewDataPoint(datapoint *float64, timestamp time.Time) DatapointWithTimestamp { | ||
| return DatapointWithTimestamp{ | ||
| Timestamp: timestamp, | ||
| Datapoint: datapoint, | ||
| } | ||
| } | ||
|
|
||
| func SingleDataPoint(datapoint *float64, timestamp time.Time) []DatapointWithTimestamp { | ||
| return []DatapointWithTimestamp{NewDataPoint(datapoint, timestamp)} | ||
| } | ||
|
||
|
|
||
| type limitedConcurrencyClient struct { | ||
| client Client | ||
| limiter ConcurrencyLimiter | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -149,10 +149,12 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw | |
| func toMetricDataResult(resp cloudwatch.GetMetricDataOutput) []cloudwatch_client.MetricDataResult { | ||
| output := make([]cloudwatch_client.MetricDataResult, 0, len(resp.MetricDataResults)) | ||
| for _, metricDataResult := range resp.MetricDataResults { | ||
| mappedResult := cloudwatch_client.MetricDataResult{ID: *metricDataResult.Id} | ||
| if len(metricDataResult.Values) > 0 { | ||
| mappedResult.Datapoint = &metricDataResult.Values[0] | ||
| mappedResult.Timestamp = metricDataResult.Timestamps[0] | ||
| mappedResult := cloudwatch_client.MetricDataResult{ | ||
| ID: *metricDataResult.Id, | ||
| Datapoints: make([]cloudwatch_client.DatapointWithTimestamp, 0, len(metricDataResult.Timestamps)), | ||
| } | ||
| for i := 0; i < len(metricDataResult.Timestamps); i++ { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel this new behaviour should be used only if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is required because this function is not aware of any configuration. This is in my opinion the cleaned implementation. See comments from @kgeckhart in the linked pr There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Kyle's comment was mainly around allowing the client to return a slice of Datapoint + Timestamp, but he can confirm. My concern is around useless mem allocations when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure there is no significant useless memory allocation. All the values are already in memory because it's part of the api response. So we just allocate the slice a bit bigger, because we use a struct of slice and not a reference to the struct slice, there is a single memory alloc for the underlying array, everything else is just copying over values. So this just keeps the already returned data for a few function calls longer in memory, to unify the struct across v1 and v2. Feels like an unessary add of complexity, to pass around those parameters and adding additional tests for validate that behaviour. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following up on my earlier comment: even if we assume returning a slice of 5 structs increases memory usage, the overhead is still reasonable, around ~128 MB for 1 million metrics. That comes from storing 4 additional data points per metric, each at 32 bytes (a time.Time value + a *float64 pointer). It's just a larger backing array and a few more pointers to scan during GC. This only becomes relevant if we're allocating and retaining millions of these slices, which we're not. And if we were dealing with that many, querying 1 million CloudWatch metrics, the real issue wouldn't be memory. Even with 5 data points per request, that's 200K metric fetches, costing ~$2 per query. At that scale, CloudWatch costs and API limits are a far bigger concern than a ~100 MB memory difference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We run YACE in a stateless multi-tenant fashion so querying a very large amount of metrics in a short period of time often happens. I was concerned about the potential memory overhead for this change and had shared that with Cristian. This wasn't the area I was concerned about initially as we keep our length + period setup to only produce a single datapoint as much as possible. I do agree with Cristian, if we stick to only mapping a single datapoint when the setting is disabled it will ensure there's a minimal overhead to those who upgrade to latest without using the feature. I was primarily concerned about overhead from switching to a single datapoint -> a slice for our use case. I don't think we should do anything about it now. I wrote a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kgeckhart I just blindly merged yesterday your suggested changes, to get this PR of my plate. Still I had a look today, about what you changed around your memory concerns. Looking at the current changes, I don't see any change which would reduce memory usage at all. Despite your suggestion, it still creates a slice everytime and it always creates it with full size for all data points. There is now just added complexity around passing over a flag, to stop the loop early, which you could argue reduced CPU overhead, but is likely neglectable at the scale where the cloudwatch costs would explode. Before we are moving into premature optimisations, have you actually measured and proved that this actually is an issue? (see my comments above why I doubt that based on numbers). You mention that you are running this at large scale. Maybe you can run the initial PR, side by side for 1-5% of your workload (depending on the scale) and share some realworld memory impact? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @woehrl01 thanks for taking a look at it and providing feedback.
This was an oversight on my part as I was hastily making the change before I needed to go to an event yesterday 😅. My intention was to make sure to only allocate exactly what is necessary and nothing more which I can do with another minor change
IMO the added complexity is rather minimal and easily testable which is why I went through with the change.
When Cristian brought it up I didn't quite get it at first because we do run it at scale but we do it in such a way that we should only ever get one data point back. We do this intentionally for performance reasons, why ask for data you won't use? I don't know if it was Cristian's intent but my realization was that introducing this feature should not present a noticeable negative impact on the larger community just by upgrading. This exporter is embedded in to places like https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.exporter.cloudwatch/ which I know is used by customers at a scale large enough to incur some rather hefty CloudWatch costs. Going from a single data point to a slice presents some memory increase that is unlikely to be noticeable for most. We don't know how many people are setting up their configs with a length that is larger than their period (length > period = more than 1 data point) and to what degree they are doing it (2x, 3x, 10x?). This is part of the complexity of building the configs CloudWatch has some incredibly odd behaviors for different metrics so the configs get cargo culted in a way that is not optimal but works. Could this guard be unnecessary, yes but the complexity of adding it feels acceptable as a means to try to provide a stable experience to existing users. Since I messed up the DCO + need another change we will have to figure out what to do with this PR. But first it would probably be good to have @cristiangreco take a look to make sure there's nothing further. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kgeckhart @cristiangreco what's the plan with this PR? From what I see, this PR seems to be waiting on some action from your side and there is nothing really left the community could support with? If that is wrong and there is still something todo, please let me know, I'm happy to contribute as well. |
||
| mappedResult.Datapoints = append(mappedResult.Datapoints, cloudwatch_client.NewDataPoint(&metricDataResult.Values[i], metricDataResult.Timestamps[i])) | ||
| } | ||
| output = append(output, mappedResult) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,7 @@ type JobLevelMetricFields struct { | |
| Delay int64 `yaml:"delay"` | ||
| NilToZero *bool `yaml:"nilToZero"` | ||
| AddCloudwatchTimestamp *bool `yaml:"addCloudwatchTimestamp"` | ||
| ExportAllDataPoints *bool `yaml:"exportAllDataPoints"` | ||
| } | ||
|
|
||
| type Job struct { | ||
|
|
@@ -99,6 +100,7 @@ type Metric struct { | |
| Delay int64 `yaml:"delay"` | ||
| NilToZero *bool `yaml:"nilToZero"` | ||
| AddCloudwatchTimestamp *bool `yaml:"addCloudwatchTimestamp"` | ||
| ExportAllDataPoints *bool `yaml:"exportAllDataPoints"` | ||
| } | ||
|
|
||
| type Dimension struct { | ||
|
|
@@ -154,7 +156,7 @@ func (c *ScrapeConf) Load(file string, logger *slog.Logger) (model.JobsConfig, e | |
|
|
||
| func (c *ScrapeConf) Validate(logger *slog.Logger) (model.JobsConfig, error) { | ||
| if c.Discovery.Jobs == nil && c.Static == nil && c.CustomNamespace == nil { | ||
| return model.JobsConfig{}, fmt.Errorf("At least 1 Discovery job, 1 Static or one CustomNamespace must be defined") | ||
| return model.JobsConfig{}, fmt.Errorf("at least 1 Discovery job, 1 Static or one CustomNamespace must be defined") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: this change is unrelated to the PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It unrelated but is a lint error. Should I revert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What lint error is this solving? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The standard golang lint error that an error text should start with a lowercase letter. |
||
| } | ||
|
|
||
| if c.Discovery.Jobs != nil { | ||
|
|
@@ -387,6 +389,19 @@ func (m *Metric) validateMetric(logger *slog.Logger, metricIdx int, parent strin | |
| } | ||
| } | ||
|
|
||
| mExportAllDataPoints := m.ExportAllDataPoints | ||
| if mExportAllDataPoints == nil { | ||
| if discovery != nil && discovery.ExportAllDataPoints != nil { | ||
| mExportAllDataPoints = discovery.ExportAllDataPoints | ||
| } else { | ||
| mExportAllDataPoints = aws.Bool(false) | ||
| } | ||
| } | ||
|
|
||
| if aws.BoolValue(mExportAllDataPoints) && !aws.BoolValue(mAddCloudwatchTimestamp) { | ||
| return fmt.Errorf("Metric [%s/%d] in %v: ExportAllDataPoints can only be enabled if AddCloudwatchTimestamp is enabled", m.Name, metricIdx, parent) | ||
| } | ||
|
|
||
| if mLength < mPeriod { | ||
| return fmt.Errorf( | ||
| "Metric [%s/%d] in %v: length(%d) is smaller than period(%d). This can cause that the data requested is not ready and generate data gaps", | ||
|
|
@@ -398,6 +413,7 @@ func (m *Metric) validateMetric(logger *slog.Logger, metricIdx int, parent strin | |
| m.Delay = mDelay | ||
| m.NilToZero = mNilToZero | ||
| m.AddCloudwatchTimestamp = mAddCloudwatchTimestamp | ||
| m.ExportAllDataPoints = mExportAllDataPoints | ||
| m.Statistics = mStatistics | ||
|
|
||
| return nil | ||
|
|
@@ -519,6 +535,7 @@ func toModelMetricConfig(metrics []*Metric) []*model.MetricConfig { | |
| Delay: m.Delay, | ||
| NilToZero: aws.BoolValue(m.NilToZero), | ||
| AddCloudwatchTimestamp: aws.BoolValue(m.AddCloudwatchTimestamp), | ||
| ExportAllDataPoints: aws.BoolValue(m.ExportAllDataPoints), | ||
| }) | ||
| } | ||
| return ret | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind keeping the changes here to the minimum please?