@@ -16,8 +16,8 @@ package hawkular
1616
1717import (
1818 "fmt"
19+ "hash/fnv"
1920 "math"
20- "reflect"
2121 "regexp"
2222 "strings"
2323 "sync"
@@ -28,35 +28,99 @@ import (
2828 "k8s.io/heapster/metrics/core"
2929)
3030
31- // Fetches definitions from the server and checks that they're matching the descriptors
32- func (h * hawkularSink ) updateDefinitions (mt metrics.MetricType ) error {
33- m := make ([]metrics.Modifier , len (h .modifiers ), len (h .modifiers )+ 1 )
34- copy (m , h .modifiers )
35- m = append (m , metrics .Filters (metrics .TypeFilter (mt )))
36-
37- mds , err := h .client .Definitions (m ... )
38- if err != nil {
39- return err
31+ // cacheDefinitions Fetches all known definitions from all tenants (all projects in Openshift)
32+ func (h * hawkularSink ) cacheDefinitions () error {
33+ if ! h .disablePreCaching {
34+ mds , err := h .client .AllDefinitions (h .modifiers ... )
35+ if err != nil {
36+ return err
37+ }
38+ err = h .updateDefinitions (mds )
39+ if err != nil {
40+ return err
41+ }
42+ }
43+
44+ glog .V (4 ).Infof ("Hawkular definition pre-caching completed, cached %d definitions\n " , len (h .expReg ))
45+
46+ return nil
47+ }
48+
49+ // cache inserts the item to the cache
50+ func (h * hawkularSink ) cache (md * metrics.MetricDefinition ) {
51+ h .pushToCache (md .ID , hashDefinition (md ))
52+ }
53+
54+ // toCache inserts the item and updates the TTL in the cache to current time
55+ func (h * hawkularSink ) pushToCache (key string , hash uint64 ) {
56+ h .regLock .Lock ()
57+ h .expReg [key ] = & expiringItem {
58+ hash : hash ,
59+ ttl : h .runId ,
4060 }
61+ h .regLock .Unlock ()
62+ }
63+
64+ // checkCache returns false if the cached instance is not current. Updates the TTL in the cache
65+ func (h * hawkularSink ) checkCache (key string , hash uint64 ) bool {
66+ h .regLock .Lock ()
67+ defer h .regLock .Unlock ()
68+ _ , found := h .expReg [key ]
69+ if ! found || h .expReg [key ].hash != hash {
70+ return false
71+ }
72+ // Update the TTL
73+ h .expReg [key ].ttl = h .runId
74+ return true
75+ }
4176
77+ // expireCache will process the map and check for any item that has been expired and release it
78+ func (h * hawkularSink ) expireCache (runId uint64 ) {
4279 h .regLock .Lock ()
4380 defer h .regLock .Unlock ()
4481
82+ for k , v := range h .expReg {
83+ if (v .ttl + h .cacheAge ) <= runId {
84+ delete (h .expReg , k )
85+ }
86+ }
87+ }
88+
89+ // Fetches definitions from the server and checks that they're matching the descriptors
90+ func (h * hawkularSink ) updateDefinitions (mds []* metrics.MetricDefinition ) error {
4591 for _ , p := range mds {
46- // If no descriptorTag is found, this metric does not belong to Heapster
47- if mk , found := p .Tags [descriptorTag ]; found {
48- if model , f := h .models [mk ]; f && ! h .recent (p , model ) {
49- if err := h .client .UpdateTags (mt , p .ID , p .Tags , h .modifiers ... ); err != nil {
50- return err
51- }
92+ if model , f := h.models [p.Tags [descriptorTag ]]; f && ! h .recent (p , model ) {
93+ if err := h .client .UpdateTags (p .Type , p .ID , p .Tags , h .modifiers ... ); err != nil {
94+ return err
5295 }
53- h .reg [p .ID ] = p
5496 }
97+ h .cache (p )
5598 }
5699
57100 return nil
58101}
59102
103+ func hashDefinition (md * metrics.MetricDefinition ) uint64 {
104+ h := fnv .New64a ()
105+
106+ h .Write ([]byte (md .Type ))
107+ h .Write ([]byte (md .ID ))
108+
109+ helper := fnv .New64a ()
110+
111+ var hashCode uint64
112+
113+ for k , v := range md .Tags {
114+ helper .Reset ()
115+ helper .Write ([]byte (k ))
116+ helper .Write ([]byte (v ))
117+ vH := helper .Sum64 ()
118+ hashCode = hashCode ^ vH
119+ }
120+
121+ return hashCode
122+ }
123+
60124// Checks that stored definition is up to date with the model
61125func (h * hawkularSink ) recent (live * metrics.MetricDefinition , model * metrics.MetricDefinition ) bool {
62126 recent := true
@@ -150,13 +214,33 @@ func (h *hawkularSink) nodeName(ms *core.MetricSet) string {
150214 return ms .Labels [core .LabelNodename .Key ]
151215}
152216
153- func (h * hawkularSink ) createDefinitionFromModel (ms * core.MetricSet , metric core.LabeledMetric ) (* metrics.MetricDefinition , error ) {
217+ func (h * hawkularSink ) createDefinitionFromModel (ms * core.MetricSet , metric core.LabeledMetric ) (* metrics.MetricDefinition , uint64 ) {
154218 if md , f := h .models [metric .Name ]; f {
219+ hasher := fnv .New64a ()
220+
221+ hasher .Write ([]byte (md .Type ))
222+ hasher .Write ([]byte (md .ID ))
223+
224+ helper := fnv .New64a ()
225+
226+ var hashCode uint64
227+
228+ helperFunc := func (k string , v string , hashCode uint64 ) uint64 {
229+ helper .Reset ()
230+ helper .Write ([]byte (k ))
231+ helper .Write ([]byte (v ))
232+ vH := helper .Sum64 ()
233+ hashCode = hashCode ^ vH
234+
235+ return hashCode
236+ }
237+
155238 // Copy the original map
156239 mdd := * md
157- tags := make (map [string ]string )
240+ tags := make (map [string ]string , len ( mdd . Tags ) + len ( ms . Labels ) + len ( metric . Labels ) + 2 + 8 ) // 8 is just arbitrary extra for potential splits
158241 for k , v := range mdd .Tags {
159242 tags [k ] = v
243+ hashCode = helperFunc (k , v , hashCode )
160244 }
161245 mdd .Tags = tags
162246
@@ -170,7 +254,9 @@ func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core
170254 if len (labelKeyValue ) != 2 {
171255 glog .V (4 ).Infof ("Could not split the label %v into its key and value pair. This label will not be added as a tag in Hawkular Metrics." , label )
172256 } else {
173- mdd .Tags [h .labelTagPrefix + labelKeyValue [0 ]] = labelKeyValue [1 ]
257+ labelKey := h .labelTagPrefix + labelKeyValue [0 ]
258+ mdd .Tags [labelKey ] = labelKeyValue [1 ]
259+ hashCode = helperFunc (labelKey , labelKeyValue [1 ], hashCode )
174260 }
175261 }
176262 }
@@ -179,51 +265,47 @@ func (h *hawkularSink) createDefinitionFromModel(ms *core.MetricSet, metric core
179265 // Set the labeled values
180266 for k , v := range metric .Labels {
181267 mdd .Tags [k ] = v
268+ hashCode = helperFunc (k , v , hashCode )
182269 }
183270
184- mdd .Tags [groupTag ] = h .groupName (ms , metric .Name )
271+ groupName := h .groupName (ms , metric .Name )
272+ mdd .Tags [groupTag ] = groupName
185273 mdd .Tags [descriptorTag ] = metric .Name
186274
187- return & mdd , nil
275+ hashCode = helperFunc (groupTag , groupName , hashCode )
276+ hashCode = helperFunc (descriptorTag , metric .Name , hashCode )
277+
278+ return & mdd , hashCode
188279 }
189- return nil , fmt .Errorf ("Could not find definition model with name %s" , metric .Name )
280+ return nil , 0
281+ // return nil, fmt.Errorf("Could not find definition model with name %s", metric.Name)
190282}
191283
192- func (h * hawkularSink ) registerLabeledIfNecessary (ms * core.MetricSet , metric core.LabeledMetric , m ... metrics.Modifier ) error {
193-
284+ func (h * hawkularSink ) registerLabeledIfNecessaryInline (ms * core.MetricSet , metric core.LabeledMetric , wg * sync.WaitGroup , m ... metrics.Modifier ) error {
194285 var key string
195286 if resourceID , found := metric .Labels [core .LabelResourceID .Key ]; found {
196287 key = h .idName (ms , metric .Name + separator + resourceID )
197288 } else {
198289 key = h .idName (ms , metric .Name )
199290 }
200291
201- mdd , err := h .createDefinitionFromModel (ms , metric )
202- if err != nil {
203- return err
204- }
205-
206- h .regLock .RLock ()
207- if _ , found := h .reg [key ]; ! found || ! reflect .DeepEqual (mdd .Tags , h .reg [key ].Tags ) {
208- // I'm going to release the lock to allow concurrent processing, even if that
209- // can cause dual updates (highly unlikely). The UpdateTags is idempotent in any case.
210- h .regLock .RUnlock ()
211- m = append (m , h .modifiers ... )
212-
213- // Create metric, use updateTags instead of Create because we don't care about uniqueness
214- if err := h .client .UpdateTags (heapsterTypeToHawkularType (metric .MetricType ), key , mdd .Tags , m ... ); err != nil {
215- // Log error and don't add this key to the lookup table
216- glog .Errorf ("Could not update tags: %s" , err )
217- return err
218- }
219-
220- h .regLock .Lock ()
221- h .reg [key ] = mdd
222- h .regLock .Unlock ()
223- } else {
224- h .regLock .RUnlock ()
292+ mdd , mddHash := h .createDefinitionFromModel (ms , metric )
293+ if mddHash != 0 && ! h .checkCache (key , mddHash ) {
294+
295+ wg .Add (1 )
296+ go func (ms * core.MetricSet , labeledMetric core.LabeledMetric , m ... metrics.Modifier ) {
297+ defer wg .Done ()
298+ m = append (m , h .modifiers ... )
299+ // Create metric, use updateTags instead of Create because we don't care about uniqueness
300+ if err := h .client .UpdateTags (heapsterTypeToHawkularType (metric .MetricType ), key , mdd .Tags , m ... ); err != nil {
301+ // Log error and don't add this key to the lookup table
302+ glog .Errorf ("Could not update tags: %s" , err )
303+ return
304+ // return err
305+ }
306+ h .pushToCache (key , mddHash )
307+ }(ms , metric , m ... )
225308 }
226-
227309 return nil
228310}
229311
0 commit comments