Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit 9077e7e

Browse files
committed
Allow disabling the initial caching of metric definitions and compare the cache on each datapoint store to the given metricSet
1 parent d6741b3 commit 9077e7e

File tree

5 files changed

+141
-82
lines changed

5 files changed

+141
-82
lines changed

docs/sink-configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ The following options are available:
8888
* `batchSize`- How many metrics are sent in each request to Hawkular-Metrics (default is 1000)
8989
* `concurrencyLimit`- How many concurrent requests are used to send data to the Hawkular-Metrics (default is 5)
9090
* `labelTagPrefix` - A prefix to be placed in front of each label when stored as a tag for the metric (default is `labels.`)
91+
* `disablePreCache` - Disable cache initialization by fetching metric definitions from Hawkular-Metrics
9192

9293
A combination of `insecure` / `caCert` / `auth` is not supported, only a single of these parameters is allowed at once. Also, combination of `useServiceAccount` and `user` + `pass` is not supported. To increase the performance of Hawkular sink in case of multiple instances of Hawkular-Metrics (such as scaled scenario in OpenShift) modify the parameters of batchSize and concurrencyLimit to balance the load on Hawkular-Metrics instances.
9394

metrics/sinks/hawkular/client.go

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package hawkular
1717
import (
1818
"fmt"
1919
"math"
20+
"reflect"
2021
"regexp"
2122
"strings"
2223
"sync"
@@ -52,6 +53,7 @@ func (h *hawkularSink) updateDefinitions(mt metrics.MetricType) error {
5253
h.reg[p.Id] = p
5354
}
5455
}
56+
5557
return nil
5658
}
5759

@@ -148,69 +150,79 @@ func (h *hawkularSink) nodeName(ms *core.MetricSet) string {
148150
return ms.Labels[core.LabelNodename.Key]
149151
}
150152

151-
func (h *hawkularSink) registerLabeledIfNecessary(ms *core.MetricSet, metric core.LabeledMetric, m ...metrics.Modifier) error {
152-
key := h.idName(ms, metric.Name)
153-
154-
if resourceID, found := metric.Labels[core.LabelResourceID.Key]; found {
155-
key = h.idName(ms, metric.Name+separator+resourceID)
156-
}
157-
158-
h.regLock.Lock()
159-
defer h.regLock.Unlock()
153+
func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core.LabeledMetric) (*metrics.MetricDefinition, error) {
154+
if md, f := h.models[metric.Name]; f {
155+
// Copy the original map
156+
mdd := *md
157+
tags := make(map[string]string)
158+
for k, v := range mdd.Tags {
159+
tags[k] = v
160+
}
161+
mdd.Tags = tags
160162

161-
// If found, check it matches the current stored definition (could be old info from
162-
// the stored metrics cache for example)
163-
if _, found := h.reg[key]; !found {
164-
// Register the metric descriptor here..
165-
if md, f := h.models[metric.Name]; f {
166-
// Copy the original map
167-
mdd := *md
168-
tags := make(map[string]string)
169-
for k, v := range mdd.Tags {
170-
tags[k] = v
171-
}
172-
mdd.Tags = tags
173-
174-
// Set tag values
175-
for k, v := range ms.Labels {
176-
mdd.Tags[k] = v
177-
if k == core.LabelLabels.Key {
178-
labels := strings.Split(v, ",")
179-
for _, label := range labels {
180-
labelKeyValue := strings.Split(label, ":")
181-
if len(labelKeyValue) != 2 {
182-
glog.V(4).Infof("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics.", label)
183-
} else {
184-
mdd.Tags[h.labelTagPrefix+labelKeyValue[0]] = labelKeyValue[1]
185-
}
163+
// Set tag values
164+
for k, v := range ms.Labels {
165+
mdd.Tags[k] = v
166+
if k == core.LabelLabels.Key {
167+
labels := strings.Split(v, ",")
168+
for _, label := range labels {
169+
labelKeyValue := strings.Split(label, ":")
170+
if len(labelKeyValue) != 2 {
171+
glog.V(4).Infof("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics.", label)
172+
} else {
173+
mdd.Tags[h.labelTagPrefix+labelKeyValue[0]] = labelKeyValue[1]
186174
}
187175
}
188176
}
177+
}
189178

190-
// Set the labeled values
191-
for k, v := range metric.Labels {
192-
mdd.Tags[k] = v
193-
}
179+
// Set the labeled values
180+
for k, v := range metric.Labels {
181+
mdd.Tags[k] = v
182+
}
194183

195-
mdd.Tags[groupTag] = h.groupName(ms, metric.Name)
196-
mdd.Tags[descriptorTag] = metric.Name
184+
mdd.Tags[groupTag] = h.groupName(ms, metric.Name)
185+
mdd.Tags[descriptorTag] = metric.Name
197186

198-
m = append(m, h.modifiers...)
187+
return &mdd, nil
188+
}
189+
return nil, fmt.Errorf("Could not find definition model with name %s", metric.Name)
190+
}
199191

200-
// Create metric, use updateTags instead of Create because we know it is unique
201-
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
202-
// Log error and don't add this key to the lookup table
203-
glog.Errorf("Could not update tags: %s", err)
204-
return err
205-
}
192+
func (h *hawkularSink) registerLabeledIfNecessary(ms *core.MetricSet, metric core.LabeledMetric, m ...metrics.Modifier) error {
206193

207-
// Add to the lookup table
208-
h.reg[key] = &mdd
209-
} else {
210-
return fmt.Errorf("Could not find definition model with name %s", metric.Name)
194+
var key string
195+
if resourceID, found := metric.Labels[core.LabelResourceID.Key]; found {
196+
key = h.idName(ms, metric.Name+separator+resourceID)
197+
} else {
198+
key = h.idName(ms, metric.Name)
199+
}
200+
201+
mdd, err := h.createDefinitionFromModel(ms, metric)
202+
if err != nil {
203+
return err
204+
}
205+
206+
h.regLock.RLock()
207+
if _, found := h.reg[key]; !found || !reflect.DeepEqual(mdd.Tags, h.reg[key].Tags) {
208+
// I'm going to release the lock to allow concurrent processing, even if that
209+
// can cause dual updates (highly unlikely). The UpdateTags is idempotent in any case.
210+
h.regLock.RUnlock()
211+
m = append(m, h.modifiers...)
212+
213+
// Create metric, use updateTags instead of Create because we don't care about uniqueness
214+
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
215+
// Log error and don't add this key to the lookup table
216+
glog.Errorf("Could not update tags: %s", err)
217+
return err
211218
}
219+
220+
h.regLock.Lock()
221+
h.reg[key] = mdd
222+
h.regLock.Unlock()
223+
} else {
224+
h.regLock.RUnlock()
212225
}
213-
// TODO Compare the definition tags and update if necessary? Quite expensive operation..
214226

215227
return nil
216228
}

metrics/sinks/hawkular/driver.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ func (h *hawkularSink) Register(mds []core.MetricDescriptor) error {
5858
h.models[md.Name] = &hmd
5959
}
6060

61-
// Fetch currently known metrics from Hawkular-Metrics and cache them
62-
types := []metrics.MetricType{metrics.Gauge, metrics.Counter}
63-
for _, t := range types {
64-
err := h.updateDefinitions(t)
65-
if err != nil {
66-
return err
61+
if !h.disablePreCaching {
62+
// Fetch currently known metrics from Hawkular-Metrics and cache them
63+
types := []metrics.MetricType{metrics.Gauge, metrics.Counter}
64+
for _, t := range types {
65+
err := h.updateDefinitions(t)
66+
if err != nil {
67+
return err
68+
}
6769
}
6870
}
6971

@@ -94,12 +96,15 @@ func (h *hawkularSink) ExportData(db *core.DataBatch) {
9496

9597
for _, ms := range db.MetricSets {
9698

97-
// // Transform ms.MetricValues to LabeledMetrics first
98-
lms := metricValueToLabeledMetric(ms.MetricValues)
99-
ms.LabeledMetrics = append(ms.LabeledMetrics, lms...)
99+
// Transform ms.MetricValues to LabeledMetrics first
100+
mvlms := metricValueToLabeledMetric(ms.MetricValues)
101+
lms := make([]core.LabeledMetric, 0, len(mvlms)+len(ms.LabeledMetrics))
102+
103+
lms = append(lms, mvlms...)
104+
lms = append(lms, ms.LabeledMetrics...)
100105

101106
Store:
102-
for _, labeledMetric := range ms.LabeledMetrics {
107+
for _, labeledMetric := range lms {
103108

104109
for _, filter := range h.filters {
105110
if !filter(ms, labeledMetric.Name) {
@@ -321,6 +326,14 @@ func (h *hawkularSink) init() error {
321326
h.batchSize = bs
322327
}
323328

329+
if v, found := opts["disablePreCache"]; found {
330+
dpc, err := strconv.ParseBool(v[0])
331+
if err != nil {
332+
return fmt.Errorf("disablePreCache parameter value %s is invalid", v[0])
333+
}
334+
h.disablePreCaching = dpc
335+
}
336+
324337
c, err := metrics.NewHawkularClient(p)
325338
if err != nil {
326339
return err

metrics/sinks/hawkular/driver_test.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,23 @@ func TestRegister(t *testing.T) {
367367
assert.True(t, definitionsCalled["gauge"], "Gauge definitions were not fetched")
368368
assert.True(t, definitionsCalled["counter"], "Counter definitions were not fetched")
369369
assert.True(t, updateTagsCalled, "Updating outdated tags was not called")
370+
371+
// Try without pre caching
372+
definitionsCalled = make(map[string]bool)
373+
updateTagsCalled = false
374+
375+
hSink, err = integSink(s.URL + "?tenant=test-heapster&disablePreCache=true")
376+
assert.NoError(t, err)
377+
378+
err = hSink.Register(md)
379+
assert.NoError(t, err)
380+
381+
assert.Equal(t, 2, len(hSink.models))
382+
assert.Equal(t, 0, len(hSink.reg))
383+
384+
assert.False(t, definitionsCalled["gauge"], "Gauge definitions were fetched")
385+
assert.False(t, definitionsCalled["counter"], "Counter definitions were fetched")
386+
assert.False(t, updateTagsCalled, "Updating outdated tags was called")
370387
}
371388

372389
// Store timeseries with both gauges and cumulatives
@@ -381,7 +398,7 @@ func TestStoreTimeseries(t *testing.T) {
381398
w.Header().Set("Content-Type", "application/json")
382399

383400
typ := r.RequestURI[strings.Index(r.RequestURI, "hawkular/metrics/")+17:]
384-
typ = typ[:len(typ)-5]
401+
typ = typ[:len(typ)-4]
385402

386403
switch typ {
387404
case "counters":
@@ -460,6 +477,7 @@ func TestStoreTimeseries(t *testing.T) {
460477
func TestTags(t *testing.T) {
461478
m := &sync.Mutex{}
462479
calls := make([]string, 0, 2)
480+
serverTags := make(map[string]string)
463481
// how many times tags have been updated
464482
tagsUpdated := 0
465483
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -473,23 +491,8 @@ func TestTags(t *testing.T) {
473491
assert.NoError(t, err)
474492

475493
if strings.HasSuffix(r.RequestURI, "/tags") {
476-
tags := make(map[string]string)
477-
err := json.Unmarshal(b, &tags)
494+
err := json.Unmarshal(b, &serverTags)
478495
assert.NoError(t, err)
479-
480-
assert.Equal(t, 10, len(tags))
481-
assert.Equal(t, "test-label", tags["projectId"])
482-
assert.Equal(t, "test-container", tags[core.LabelContainerName.Key])
483-
assert.Equal(t, "test-podid", tags[core.LabelPodId.Key])
484-
assert.Equal(t, "test-container/test/metric/A", tags["group_id"])
485-
assert.Equal(t, "test/metric/A", tags["descriptor_name"])
486-
assert.Equal(t, "XYZ", tags[core.LabelResourceID.Key])
487-
assert.Equal(t, "bytes", tags["units"])
488-
489-
assert.Equal(t, "testLabelA:testValueA,testLabelB:testValueB", tags[core.LabelLabels.Key])
490-
assert.Equal(t, "testValueA", tags["labels.testLabelA"])
491-
assert.Equal(t, "testValueB", tags["labels.testLabelB"])
492-
493496
tagsUpdated++
494497
}
495498
}))
@@ -555,6 +558,35 @@ func TestTags(t *testing.T) {
555558
assert.Equal(t, "testLabelA:testValueA,testLabelB:testValueB", tags[core.LabelLabels.Key])
556559
assert.Equal(t, "testValueA", tags["labels.testLabelA"])
557560
assert.Equal(t, "testValueB", tags["labels.testLabelB"])
561+
562+
assert.Equal(t, 10, len(serverTags))
563+
assert.Equal(t, "test-label", serverTags["projectId"])
564+
assert.Equal(t, "test-container", serverTags[core.LabelContainerName.Key])
565+
assert.Equal(t, "test-podid", serverTags[core.LabelPodId.Key])
566+
assert.Equal(t, "test-container/test/metric/A", serverTags["group_id"])
567+
assert.Equal(t, "test/metric/A", serverTags["descriptor_name"])
568+
assert.Equal(t, "XYZ", serverTags[core.LabelResourceID.Key])
569+
assert.Equal(t, "bytes", serverTags["units"])
570+
571+
assert.Equal(t, "testLabelA:testValueA,testLabelB:testValueB", serverTags[core.LabelLabels.Key])
572+
assert.Equal(t, "testValueA", serverTags["labels.testLabelA"])
573+
assert.Equal(t, "testValueB", serverTags["labels.testLabelB"])
574+
575+
// Make modifications to the metrics and check that they're updated correctly
576+
577+
// First, no changes - no update should happen
578+
hSink.registerLabeledIfNecessary(&metricSet, labeledMetric)
579+
assert.Equal(t, 1, tagsUpdated)
580+
581+
// Now modify the labels and expect an update
582+
metricSet.Labels[core.LabelLabels.Key] = "testLabelA:testValueA,testLabelB:testValueB,testLabelC:testValueC"
583+
hSink.registerLabeledIfNecessary(&metricSet, labeledMetric)
584+
assert.Equal(t, 2, tagsUpdated)
585+
586+
assert.Equal(t, "testLabelA:testValueA,testLabelB:testValueB,testLabelC:testValueC", serverTags[core.LabelLabels.Key])
587+
assert.Equal(t, "testValueA", serverTags["labels.testLabelA"])
588+
assert.Equal(t, "testValueB", serverTags["labels.testLabelB"])
589+
assert.Equal(t, "testValueC", serverTags["labels.testLabelC"])
558590
}
559591

560592
func TestUserPass(t *testing.T) {
@@ -594,7 +626,7 @@ func TestFiltering(t *testing.T) {
594626
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
595627
m.Lock()
596628
defer m.Unlock()
597-
if strings.Contains(r.RequestURI, "data") {
629+
if strings.Contains(r.RequestURI, "raw") {
598630
defer r.Body.Close()
599631
b, err := ioutil.ReadAll(r.Body)
600632
assert.NoError(t, err)

metrics/sinks/hawkular/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (f FilterType) From(s string) FilterType {
4848
type hawkularSink struct {
4949
client *metrics.Client
5050
models map[string]*metrics.MetricDefinition // Model definitions
51-
regLock sync.Mutex
51+
regLock sync.RWMutex
5252
reg map[string]*metrics.MetricDefinition // Real definitions
5353

5454
uri *url.URL
@@ -59,7 +59,8 @@ type hawkularSink struct {
5959
modifiers []metrics.Modifier
6060
filters []Filter
6161

62-
batchSize int
62+
disablePreCaching bool
63+
batchSize int
6364
}
6465

6566
func heapsterTypeToHawkularType(t core.MetricType) metrics.MetricType {

0 commit comments

Comments
 (0)