Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit 7d19aff

Browse files
authored
Merge pull request #1666 from burmanm/hwkmetrics_660
Hawkular-sink updates to definition cache handling
2 parents 546ab66 + 114a808 commit 7d19aff

File tree

8 files changed

+503
-261
lines changed

8 files changed

+503
-261
lines changed

docs/sink-configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ The following options are available:
9999
* `batchSize`- How many metrics are sent in each request to Hawkular-Metrics (default is 1000)
100100
* `concurrencyLimit`- How many concurrent requests are used to send data to the Hawkular-Metrics (default is 5)
101101
* `labelTagPrefix` - A prefix to be placed in front of each label when stored as a tag for the metric (default is `labels.`)
102+
* `disablePreCache` - Disable cache initialization by fetching metric definitions from Hawkular-Metrics
102103

103104
A combination of `insecure` / `caCert` / `auth` is not supported, only a single of these parameters is allowed at once. Also, combination of `useServiceAccount` and `user` + `pass` is not supported. To increase the performance of Hawkular sink in case of multiple instances of Hawkular-Metrics (such as scaled scenario in OpenShift) modify the parameters of batchSize and concurrencyLimit to balance the load on Hawkular-Metrics instances.
104105

metrics/sinks/hawkular/client.go

Lines changed: 69 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package hawkular
1717
import (
1818
"fmt"
1919
"math"
20+
"reflect"
2021
"regexp"
2122
"strings"
2223
"sync"
@@ -45,13 +46,14 @@ func (h *hawkularSink) updateDefinitions(mt metrics.MetricType) error {
4546
// If no descriptorTag is found, this metric does not belong to Heapster
4647
if mk, found := p.Tags[descriptorTag]; found {
4748
if model, f := h.models[mk]; f && !h.recent(p, model) {
48-
if err := h.client.UpdateTags(mt, p.Id, p.Tags, h.modifiers...); err != nil {
49+
if err := h.client.UpdateTags(mt, p.ID, p.Tags, h.modifiers...); err != nil {
4950
return err
5051
}
5152
}
52-
h.reg[p.Id] = p
53+
h.reg[p.ID] = p
5354
}
5455
}
56+
5557
return nil
5658
}
5759

@@ -86,7 +88,7 @@ func (h *hawkularSink) descriptorToDefinition(md *core.MetricDescriptor) metrics
8688
tags[descriptorTag] = md.Name
8789

8890
hmd := metrics.MetricDefinition{
89-
Id: md.Name,
91+
ID: md.Name,
9092
Tags: tags,
9193
Type: heapsterTypeToHawkularType(md.Type),
9294
}
@@ -148,69 +150,79 @@ func (h *hawkularSink) nodeName(ms *core.MetricSet) string {
148150
return ms.Labels[core.LabelNodename.Key]
149151
}
150152

151-
func (h *hawkularSink) registerLabeledIfNecessary(ms *core.MetricSet, metric core.LabeledMetric, m ...metrics.Modifier) error {
152-
key := h.idName(ms, metric.Name)
153-
154-
if resourceID, found := metric.Labels[core.LabelResourceID.Key]; found {
155-
key = h.idName(ms, metric.Name+separator+resourceID)
156-
}
157-
158-
h.regLock.Lock()
159-
defer h.regLock.Unlock()
153+
func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core.LabeledMetric) (*metrics.MetricDefinition, error) {
154+
if md, f := h.models[metric.Name]; f {
155+
// Copy the original map
156+
mdd := *md
157+
tags := make(map[string]string)
158+
for k, v := range mdd.Tags {
159+
tags[k] = v
160+
}
161+
mdd.Tags = tags
160162

161-
// If found, check it matches the current stored definition (could be old info from
162-
// the stored metrics cache for example)
163-
if _, found := h.reg[key]; !found {
164-
// Register the metric descriptor here..
165-
if md, f := h.models[metric.Name]; f {
166-
// Copy the original map
167-
mdd := *md
168-
tags := make(map[string]string)
169-
for k, v := range mdd.Tags {
170-
tags[k] = v
171-
}
172-
mdd.Tags = tags
173-
174-
// Set tag values
175-
for k, v := range ms.Labels {
176-
mdd.Tags[k] = v
177-
if k == core.LabelLabels.Key {
178-
labels := strings.Split(v, ",")
179-
for _, label := range labels {
180-
labelKeyValue := strings.Split(label, ":")
181-
if len(labelKeyValue) != 2 {
182-
glog.V(4).Infof("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics.", label)
183-
} else {
184-
mdd.Tags[h.labelTagPrefix+labelKeyValue[0]] = labelKeyValue[1]
185-
}
163+
// Set tag values
164+
for k, v := range ms.Labels {
165+
mdd.Tags[k] = v
166+
if k == core.LabelLabels.Key {
167+
labels := strings.Split(v, ",")
168+
for _, label := range labels {
169+
labelKeyValue := strings.Split(label, ":")
170+
if len(labelKeyValue) != 2 {
171+
glog.V(4).Infof("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics.", label)
172+
} else {
173+
mdd.Tags[h.labelTagPrefix+labelKeyValue[0]] = labelKeyValue[1]
186174
}
187175
}
188176
}
177+
}
189178

190-
// Set the labeled values
191-
for k, v := range metric.Labels {
192-
mdd.Tags[k] = v
193-
}
179+
// Set the labeled values
180+
for k, v := range metric.Labels {
181+
mdd.Tags[k] = v
182+
}
194183

195-
mdd.Tags[groupTag] = h.groupName(ms, metric.Name)
196-
mdd.Tags[descriptorTag] = metric.Name
184+
mdd.Tags[groupTag] = h.groupName(ms, metric.Name)
185+
mdd.Tags[descriptorTag] = metric.Name
197186

198-
m = append(m, h.modifiers...)
187+
return &mdd, nil
188+
}
189+
return nil, fmt.Errorf("Could not find definition model with name %s", metric.Name)
190+
}
199191

200-
// Create metric, use updateTags instead of Create because we know it is unique
201-
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
202-
// Log error and don't add this key to the lookup table
203-
glog.Errorf("Could not update tags: %s", err)
204-
return err
205-
}
192+
func (h *hawkularSink) registerLabeledIfNecessary(ms *core.MetricSet, metric core.LabeledMetric, m ...metrics.Modifier) error {
206193

207-
// Add to the lookup table
208-
h.reg[key] = &mdd
209-
} else {
210-
return fmt.Errorf("Could not find definition model with name %s", metric.Name)
194+
var key string
195+
if resourceID, found := metric.Labels[core.LabelResourceID.Key]; found {
196+
key = h.idName(ms, metric.Name+separator+resourceID)
197+
} else {
198+
key = h.idName(ms, metric.Name)
199+
}
200+
201+
mdd, err := h.createDefinitionFromModel(ms, metric)
202+
if err != nil {
203+
return err
204+
}
205+
206+
h.regLock.RLock()
207+
if _, found := h.reg[key]; !found || !reflect.DeepEqual(mdd.Tags, h.reg[key].Tags) {
208+
// I'm going to release the lock to allow concurrent processing, even if that
209+
// can cause dual updates (highly unlikely). The UpdateTags is idempotent in any case.
210+
h.regLock.RUnlock()
211+
m = append(m, h.modifiers...)
212+
213+
// Create metric, use updateTags instead of Create because we don't care about uniqueness
214+
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
215+
// Log error and don't add this key to the lookup table
216+
glog.Errorf("Could not update tags: %s", err)
217+
return err
211218
}
219+
220+
h.regLock.Lock()
221+
h.reg[key] = mdd
222+
h.regLock.Unlock()
223+
} else {
224+
h.regLock.RUnlock()
212225
}
213-
// TODO Compare the definition tags and update if necessary? Quite expensive operation..
214226

215227
return nil
216228
}
@@ -275,11 +287,11 @@ func (h *hawkularSink) pointToLabeledMetricHeader(ms *core.MetricSet, metric cor
275287

276288
m := metrics.Datapoint{
277289
Value: value,
278-
Timestamp: metrics.UnixMilli(timestamp),
290+
Timestamp: timestamp,
279291
}
280292

281293
mh := &metrics.MetricHeader{
282-
Id: name,
294+
ID: name,
283295
Data: []metrics.Datapoint{m},
284296
Type: heapsterTypeToHawkularType(metric.MetricType),
285297
}

metrics/sinks/hawkular/driver.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ func (h *hawkularSink) Register(mds []core.MetricDescriptor) error {
5858
h.models[md.Name] = &hmd
5959
}
6060

61-
// Fetch currently known metrics from Hawkular-Metrics and cache them
62-
types := []metrics.MetricType{metrics.Gauge, metrics.Counter}
63-
for _, t := range types {
64-
err := h.updateDefinitions(t)
65-
if err != nil {
66-
return err
61+
if !h.disablePreCaching {
62+
// Fetch currently known metrics from Hawkular-Metrics and cache them
63+
types := []metrics.MetricType{metrics.Gauge, metrics.Counter}
64+
for _, t := range types {
65+
err := h.updateDefinitions(t)
66+
if err != nil {
67+
return err
68+
}
6769
}
6870
}
6971

@@ -94,12 +96,15 @@ func (h *hawkularSink) ExportData(db *core.DataBatch) {
9496

9597
for _, ms := range db.MetricSets {
9698

97-
// // Transform ms.MetricValues to LabeledMetrics first
98-
lms := metricValueToLabeledMetric(ms.MetricValues)
99-
ms.LabeledMetrics = append(ms.LabeledMetrics, lms...)
99+
// Transform ms.MetricValues to LabeledMetrics first
100+
mvlms := metricValueToLabeledMetric(ms.MetricValues)
101+
lms := make([]core.LabeledMetric, 0, len(mvlms)+len(ms.LabeledMetrics))
102+
103+
lms = append(lms, mvlms...)
104+
lms = append(lms, ms.LabeledMetrics...)
100105

101106
Store:
102-
for _, labeledMetric := range ms.LabeledMetrics {
107+
for _, labeledMetric := range lms {
103108

104109
for _, filter := range h.filters {
105110
if !filter(ms, labeledMetric.Name) {
@@ -321,6 +326,14 @@ func (h *hawkularSink) init() error {
321326
h.batchSize = bs
322327
}
323328

329+
if v, found := opts["disablePreCache"]; found {
330+
dpc, err := strconv.ParseBool(v[0])
331+
if err != nil {
332+
return fmt.Errorf("disablePreCache parameter value %s is invalid", v[0])
333+
}
334+
h.disablePreCaching = dpc
335+
}
336+
324337
c, err := metrics.NewHawkularClient(p)
325338
if err != nil {
326339
return err

0 commit comments

Comments
 (0)