Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit d25a176

Browse files
authored
Merge pull request #1914 from burmanm/hawkular_caching
Hawkular, reduce the sink's memory usage in definition caching
2 parents 628ecea + 385f79d commit d25a176

File tree

6 files changed

+370
-113
lines changed

6 files changed

+370
-113
lines changed

Godeps/Godeps.json

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

metrics/sinks/hawkular/client.go

Lines changed: 132 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ package hawkular
1616

1717
import (
1818
"fmt"
19+
"hash/fnv"
1920
"math"
20-
"reflect"
2121
"regexp"
2222
"strings"
2323
"sync"
@@ -28,35 +28,99 @@ import (
2828
"k8s.io/heapster/metrics/core"
2929
)
3030

31-
// Fetches definitions from the server and checks that they're matching the descriptors
32-
func (h *hawkularSink) updateDefinitions(mt metrics.MetricType) error {
33-
m := make([]metrics.Modifier, len(h.modifiers), len(h.modifiers)+1)
34-
copy(m, h.modifiers)
35-
m = append(m, metrics.Filters(metrics.TypeFilter(mt)))
36-
37-
mds, err := h.client.Definitions(m...)
38-
if err != nil {
39-
return err
31+
// cacheDefinitions Fetches all known definitions from all tenants (all projects in Openshift)
32+
func (h *hawkularSink) cacheDefinitions() error {
33+
if !h.disablePreCaching {
34+
mds, err := h.client.AllDefinitions(h.modifiers...)
35+
if err != nil {
36+
return err
37+
}
38+
err = h.updateDefinitions(mds)
39+
if err != nil {
40+
return err
41+
}
42+
}
43+
44+
glog.V(4).Infof("Hawkular definition pre-caching completed, cached %d definitions\n", len(h.expReg))
45+
46+
return nil
47+
}
48+
49+
// cache inserts the item to the cache
50+
func (h *hawkularSink) cache(md *metrics.MetricDefinition) {
51+
h.pushToCache(md.ID, hashDefinition(md))
52+
}
53+
54+
// toCache inserts the item and updates the TTL in the cache to current time
55+
func (h *hawkularSink) pushToCache(key string, hash uint64) {
56+
h.regLock.Lock()
57+
h.expReg[key] = &expiringItem{
58+
hash: hash,
59+
ttl: h.runId,
4060
}
61+
h.regLock.Unlock()
62+
}
63+
64+
// checkCache returns false if the cached instance is not current. Updates the TTL in the cache
65+
func (h *hawkularSink) checkCache(key string, hash uint64) bool {
66+
h.regLock.Lock()
67+
defer h.regLock.Unlock()
68+
_, found := h.expReg[key]
69+
if !found || h.expReg[key].hash != hash {
70+
return false
71+
}
72+
// Update the TTL
73+
h.expReg[key].ttl = h.runId
74+
return true
75+
}
4176

77+
// expireCache will process the map and check for any item that has been expired and release it
78+
func (h *hawkularSink) expireCache(runId uint64) {
4279
h.regLock.Lock()
4380
defer h.regLock.Unlock()
4481

82+
for k, v := range h.expReg {
83+
if (v.ttl + h.cacheAge) <= runId {
84+
delete(h.expReg, k)
85+
}
86+
}
87+
}
88+
89+
// Fetches definitions from the server and checks that they're matching the descriptors
90+
func (h *hawkularSink) updateDefinitions(mds []*metrics.MetricDefinition) error {
4591
for _, p := range mds {
46-
// If no descriptorTag is found, this metric does not belong to Heapster
47-
if mk, found := p.Tags[descriptorTag]; found {
48-
if model, f := h.models[mk]; f && !h.recent(p, model) {
49-
if err := h.client.UpdateTags(mt, p.ID, p.Tags, h.modifiers...); err != nil {
50-
return err
51-
}
92+
if model, f := h.models[p.Tags[descriptorTag]]; f && !h.recent(p, model) {
93+
if err := h.client.UpdateTags(p.Type, p.ID, p.Tags, h.modifiers...); err != nil {
94+
return err
5295
}
53-
h.reg[p.ID] = p
5496
}
97+
h.cache(p)
5598
}
5699

57100
return nil
58101
}
59102

103+
func hashDefinition(md *metrics.MetricDefinition) uint64 {
104+
h := fnv.New64a()
105+
106+
h.Write([]byte(md.Type))
107+
h.Write([]byte(md.ID))
108+
109+
helper := fnv.New64a()
110+
111+
var hashCode uint64
112+
113+
for k, v := range md.Tags {
114+
helper.Reset()
115+
helper.Write([]byte(k))
116+
helper.Write([]byte(v))
117+
vH := helper.Sum64()
118+
hashCode = hashCode ^ vH
119+
}
120+
121+
return hashCode
122+
}
123+
60124
// Checks that stored definition is up to date with the model
61125
func (h *hawkularSink) recent(live *metrics.MetricDefinition, model *metrics.MetricDefinition) bool {
62126
recent := true
@@ -150,13 +214,33 @@ func (h *hawkularSink) nodeName(ms *core.MetricSet) string {
150214
return ms.Labels[core.LabelNodename.Key]
151215
}
152216

153-
func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core.LabeledMetric) (*metrics.MetricDefinition, error) {
217+
func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core.LabeledMetric) (*metrics.MetricDefinition, uint64) {
154218
if md, f := h.models[metric.Name]; f {
219+
hasher := fnv.New64a()
220+
221+
hasher.Write([]byte(md.Type))
222+
hasher.Write([]byte(md.ID))
223+
224+
helper := fnv.New64a()
225+
226+
var hashCode uint64
227+
228+
helperFunc := func(k string, v string, hashCode uint64) uint64 {
229+
helper.Reset()
230+
helper.Write([]byte(k))
231+
helper.Write([]byte(v))
232+
vH := helper.Sum64()
233+
hashCode = hashCode ^ vH
234+
235+
return hashCode
236+
}
237+
155238
// Copy the original map
156239
mdd := *md
157-
tags := make(map[string]string)
240+
tags := make(map[string]string, len(mdd.Tags)+len(ms.Labels)+len(metric.Labels)+2+8) // 8 is just arbitrary extra for potential splits
158241
for k, v := range mdd.Tags {
159242
tags[k] = v
243+
hashCode = helperFunc(k, v, hashCode)
160244
}
161245
mdd.Tags = tags
162246

@@ -170,7 +254,9 @@ func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core
170254
if len(labelKeyValue) != 2 {
171255
glog.V(4).Infof("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics.", label)
172256
} else {
173-
mdd.Tags[h.labelTagPrefix+labelKeyValue[0]] = labelKeyValue[1]
257+
labelKey := h.labelTagPrefix + labelKeyValue[0]
258+
mdd.Tags[labelKey] = labelKeyValue[1]
259+
hashCode = helperFunc(labelKey, labelKeyValue[1], hashCode)
174260
}
175261
}
176262
}
@@ -179,51 +265,47 @@ func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core
179265
// Set the labeled values
180266
for k, v := range metric.Labels {
181267
mdd.Tags[k] = v
268+
hashCode = helperFunc(k, v, hashCode)
182269
}
183270

184-
mdd.Tags[groupTag] = h.groupName(ms, metric.Name)
271+
groupName := h.groupName(ms, metric.Name)
272+
mdd.Tags[groupTag] = groupName
185273
mdd.Tags[descriptorTag] = metric.Name
186274

187-
return &mdd, nil
275+
hashCode = helperFunc(groupTag, groupName, hashCode)
276+
hashCode = helperFunc(descriptorTag, metric.Name, hashCode)
277+
278+
return &mdd, hashCode
188279
}
189-
return nil, fmt.Errorf("Could not find definition model with name %s", metric.Name)
280+
return nil, 0
281+
// return nil, fmt.Errorf("Could not find definition model with name %s", metric.Name)
190282
}
191283

192-
func (h *hawkularSink) registerLabeledIfNecessary(ms *core.MetricSet, metric core.LabeledMetric, m ...metrics.Modifier) error {
193-
284+
func (h *hawkularSink) registerLabeledIfNecessaryInline(ms *core.MetricSet, metric core.LabeledMetric, wg *sync.WaitGroup, m ...metrics.Modifier) error {
194285
var key string
195286
if resourceID, found := metric.Labels[core.LabelResourceID.Key]; found {
196287
key = h.idName(ms, metric.Name+separator+resourceID)
197288
} else {
198289
key = h.idName(ms, metric.Name)
199290
}
200291

201-
mdd, err := h.createDefinitionFromModel(ms, metric)
202-
if err != nil {
203-
return err
204-
}
205-
206-
h.regLock.RLock()
207-
if _, found := h.reg[key]; !found || !reflect.DeepEqual(mdd.Tags, h.reg[key].Tags) {
208-
// I'm going to release the lock to allow concurrent processing, even if that
209-
// can cause dual updates (highly unlikely). The UpdateTags is idempotent in any case.
210-
h.regLock.RUnlock()
211-
m = append(m, h.modifiers...)
212-
213-
// Create metric, use updateTags instead of Create because we don't care about uniqueness
214-
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
215-
// Log error and don't add this key to the lookup table
216-
glog.Errorf("Could not update tags: %s", err)
217-
return err
218-
}
219-
220-
h.regLock.Lock()
221-
h.reg[key] = mdd
222-
h.regLock.Unlock()
223-
} else {
224-
h.regLock.RUnlock()
292+
mdd, mddHash := h.createDefinitionFromModel(ms, metric)
293+
if mddHash != 0 && !h.checkCache(key, mddHash) {
294+
295+
wg.Add(1)
296+
go func(ms *core.MetricSet, labeledMetric core.LabeledMetric, m ...metrics.Modifier) {
297+
defer wg.Done()
298+
m = append(m, h.modifiers...)
299+
// Create metric, use updateTags instead of Create because we don't care about uniqueness
300+
if err := h.client.UpdateTags(heapsterTypeToHawkularType(metric.MetricType), key, mdd.Tags, m...); err != nil {
301+
// Log error and don't add this key to the lookup table
302+
glog.Errorf("Could not update tags: %s", err)
303+
return
304+
// return err
305+
}
306+
h.pushToCache(key, mddHash)
307+
}(ms, metric, m...)
225308
}
226-
227309
return nil
228310
}
229311

metrics/sinks/hawkular/driver.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,9 @@ func (h *hawkularSink) Register(mds []core.MetricDescriptor) error {
6060

6161
if !h.disablePreCaching {
6262
// Fetch currently known metrics from Hawkular-Metrics and cache them
63-
types := []metrics.MetricType{metrics.Gauge, metrics.Counter}
64-
for _, t := range types {
65-
err := h.updateDefinitions(t)
66-
if err != nil {
67-
return err
68-
}
63+
err := h.cacheDefinitions()
64+
if err != nil {
65+
return err
6966
}
7067
}
7168

@@ -80,6 +77,7 @@ func (h *hawkularSink) Stop() {
8077

8178
func (h *hawkularSink) ExportData(db *core.DataBatch) {
8279
totalCount := 0
80+
h.runId++
8381
for _, ms := range db.MetricSets {
8482
totalCount += len(ms.MetricValues)
8583
totalCount += len(ms.LabeledMetrics)
@@ -120,12 +118,7 @@ func (h *hawkularSink) ExportData(db *core.DataBatch) {
120118
}
121119
}
122120

123-
wg.Add(1)
124-
go func(ms *core.MetricSet, labeledMetric core.LabeledMetric, tenant string) {
125-
defer wg.Done()
126-
h.registerLabeledIfNecessary(ms, labeledMetric, metrics.Tenant(tenant))
127-
}(ms, labeledMetric, tenant)
128-
121+
h.registerLabeledIfNecessaryInline(ms, labeledMetric, wg, metrics.Tenant(tenant))
129122
mH, err := h.pointToLabeledMetricHeader(ms, labeledMetric, db.Timestamp)
130123
if err != nil {
131124
// One transformation error should not prevent the whole process
@@ -142,7 +135,9 @@ func (h *hawkularSink) ExportData(db *core.DataBatch) {
142135
}
143136
h.sendData(tmhs, wg) // Send to a limited channel? Only batches.. egg.
144137
wg.Wait()
138+
// glog.V(4).Infof("ExportData updated %d tags, total size of cached tags is %d\n", updatedTags, len(h.reg))
145139
}
140+
h.expireCache(h.runId)
146141
}
147142

148143
func metricValueToLabeledMetric(msValues map[string]core.MetricValue) []core.LabeledMetric {
@@ -163,7 +158,7 @@ func (h *hawkularSink) DebugInfo() string {
163158

164159
h.regLock.Lock()
165160
defer h.regLock.Unlock()
166-
info += fmt.Sprintf("Known metrics: %d\n", len(h.reg))
161+
info += fmt.Sprintf("Cached metrics: %d\n", len(h.expReg))
167162
if len(h.labelTenant) > 0 {
168163
info += fmt.Sprintf("Using label '%s' as tenant information\n", h.labelTenant)
169164
}
@@ -198,11 +193,13 @@ func NewHawkularSink(u *url.URL) (core.DataSink, error) {
198193
}
199194

200195
func (h *hawkularSink) init() error {
201-
h.reg = make(map[string]*metrics.MetricDefinition)
202196
h.models = make(map[string]*metrics.MetricDefinition)
203197
h.modifiers = make([]metrics.Modifier, 0)
204198
h.filters = make([]Filter, 0)
205199
h.batchSize = batchSizeDefault
200+
h.expReg = make(map[string]*expiringItem)
201+
h.cacheAge = 2
202+
h.runId = 0
206203

207204
p := metrics.Parameters{
208205
Tenant: "heapster",

0 commit comments

Comments
 (0)