Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.

Commit d2a1cf1

Browse files
authored
Merge pull request #1726 from allencloud/refactor-code-in-processors
refactor code in processors to add more readablity
2 parents fba7f53 + 94fe380 commit d2a1cf1

File tree

6 files changed

+167
-152
lines changed

6 files changed

+167
-152
lines changed

metrics/processors/namespace_aggregator.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,31 @@ func (this *NamespaceAggregator) Name() string {
3030
func (this *NamespaceAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) {
3131
namespaces := make(map[string]*core.MetricSet)
3232
for key, metricSet := range batch.MetricSets {
33-
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod {
34-
// Aggregating pods
35-
if namespaceName, found := metricSet.Labels[core.LabelNamespaceName.Key]; found {
36-
namespaceKey := core.NamespaceKey(namespaceName)
37-
namespace, found := namespaces[namespaceKey]
38-
if !found {
39-
if nsFromBatch, found := batch.MetricSets[namespaceKey]; found {
40-
namespace = nsFromBatch
41-
} else {
42-
namespace = namespaceMetricSet(namespaceName, metricSet.Labels[core.LabelPodNamespaceUID.Key])
43-
namespaces[namespaceKey] = namespace
44-
}
45-
}
46-
if err := aggregate(metricSet, namespace, this.MetricsToAggregate); err != nil {
47-
return nil, err
48-
}
33+
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; !found || metricSetType != core.MetricSetTypePod {
34+
continue
35+
}
36+
37+
namespaceName, found := metricSet.Labels[core.LabelNamespaceName.Key]
38+
if !found {
39+
glog.Errorf("No namespace info in pod %s: %v", key, metricSet.Labels)
40+
continue
41+
}
42+
43+
namespaceKey := core.NamespaceKey(namespaceName)
44+
namespace, found := namespaces[namespaceKey]
45+
if !found {
46+
if nsFromBatch, found := batch.MetricSets[namespaceKey]; found {
47+
namespace = nsFromBatch
4948
} else {
50-
glog.Errorf("No namespace info in pod %s: %v", key, metricSet.Labels)
49+
namespace = namespaceMetricSet(namespaceName, metricSet.Labels[core.LabelPodNamespaceUID.Key])
50+
namespaces[namespaceKey] = namespace
5151
}
5252
}
53+
54+
if err := aggregate(metricSet, namespace, this.MetricsToAggregate); err != nil {
55+
return nil, err
56+
}
57+
5358
}
5459
for key, val := range namespaces {
5560
batch.MetricSets[key] = val

metrics/processors/namespace_based_enricher.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,28 +46,33 @@ func (this *NamespaceBasedEnricher) Process(batch *core.DataBatch) (*core.DataBa
4646

4747
// Adds UID to all namespaced elements.
4848
func (this *NamespaceBasedEnricher) addNamespaceInfo(metricSet *core.MetricSet) {
49-
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found &&
50-
(metricSetType == core.MetricSetTypePodContainer ||
51-
metricSetType == core.MetricSetTypePod ||
52-
metricSetType == core.MetricSetTypeNamespace) {
49+
metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]
50+
if !found {
51+
return
52+
}
53+
if metricSetType != core.MetricSetTypePodContainer &&
54+
metricSetType != core.MetricSetTypePod &&
55+
metricSetType != core.MetricSetTypeNamespace {
56+
return
57+
}
58+
59+
namespaceName, found := metricSet.Labels[core.LabelNamespaceName.Key]
60+
if !found {
61+
return
62+
}
5363

54-
if namespaceName, found := metricSet.Labels[core.LabelNamespaceName.Key]; found {
55-
nsObj, exists, err := this.store.GetByKey(namespaceName)
56-
if exists && err == nil {
57-
namespace, ok := nsObj.(*kube_api.Namespace)
58-
if ok {
59-
metricSet.Labels[core.LabelPodNamespaceUID.Key] = string(namespace.UID)
60-
} else {
61-
glog.Errorf("Wrong namespace store content")
62-
}
63-
} else {
64-
if err != nil {
65-
glog.Warningf("Failed to get namespace %s: %v", namespaceName, err)
66-
} else if !exists {
67-
glog.Warningf("Namespace doesn't exist: %s", namespaceName)
68-
}
69-
}
64+
nsObj, exists, err := this.store.GetByKey(namespaceName)
65+
if exists && err == nil {
66+
namespace, ok := nsObj.(*kube_api.Namespace)
67+
if ok {
68+
metricSet.Labels[core.LabelPodNamespaceUID.Key] = string(namespace.UID)
69+
} else {
70+
glog.Errorf("Wrong namespace store content")
7071
}
72+
} else if err != nil {
73+
glog.Warningf("Failed to get namespace %s: %v", namespaceName, err)
74+
} else if !exists {
75+
glog.Warningf("Namespace doesn't exist: %s", namespaceName)
7176
}
7277
}
7378

metrics/processors/node_aggregator.go

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,27 @@ func (this *NodeAggregator) Name() string {
3030

3131
func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) {
3232
for key, metricSet := range batch.MetricSets {
33-
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod {
34-
// Aggregating pods
35-
nodeName, found := metricSet.Labels[core.LabelNodename.Key]
36-
if nodeName == "" {
37-
glog.V(8).Infof("Skipping pod %s: no node info", key)
38-
continue
39-
}
40-
if found {
41-
nodeKey := core.NodeKey(nodeName)
42-
node, found := batch.MetricSets[nodeKey]
43-
if !found {
44-
glog.V(1).Infof("No metric for node %s, cannot perform node level aggregation.", nodeKey)
45-
} else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil {
46-
return nil, err
47-
}
48-
} else {
49-
glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels)
50-
}
33+
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; !found || metricSetType != core.MetricSetTypePod {
34+
continue
5135
}
36+
// Aggregating pods
37+
nodeName, found := metricSet.Labels[core.LabelNodename.Key]
38+
if nodeName == "" {
39+
glog.V(8).Infof("Skipping pod %s: no node info", key)
40+
continue
41+
}
42+
if !found {
43+
glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels)
44+
continue
45+
}
46+
nodeKey := core.NodeKey(nodeName)
47+
node, found := batch.MetricSets[nodeKey]
48+
if !found {
49+
glog.V(1).Infof("No metric for node %s, cannot perform node level aggregation.", nodeKey)
50+
} else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil {
51+
return nil, err
52+
}
53+
5254
}
5355
return batch, nil
5456
}

metrics/processors/pod_aggregator.go

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,51 +43,53 @@ func (this *PodAggregator) Process(batch *core.DataBatch) (*core.DataBatch, erro
4343
newPods := make(map[string]*core.MetricSet)
4444

4545
for key, metricSet := range batch.MetricSets {
46-
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePodContainer {
47-
// Aggregating containers
48-
podName, found := metricSet.Labels[core.LabelPodName.Key]
49-
ns, found2 := metricSet.Labels[core.LabelNamespaceName.Key]
50-
if found && found2 {
51-
podKey := core.PodKey(ns, podName)
52-
pod, found := batch.MetricSets[podKey]
53-
if !found {
54-
pod, found = newPods[podKey]
55-
if !found {
56-
glog.V(2).Infof("Pod not found adding %s", podKey)
57-
pod = this.podMetricSet(metricSet.Labels)
58-
newPods[podKey] = pod
59-
}
46+
if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; !found || metricSetType != core.MetricSetTypePodContainer {
47+
continue
48+
}
49+
50+
// Aggregating containers
51+
podName, found := metricSet.Labels[core.LabelPodName.Key]
52+
ns, found2 := metricSet.Labels[core.LabelNamespaceName.Key]
53+
if !found || !found2 {
54+
glog.Errorf("No namespace and/or pod info in container %s: %v", key, metricSet.Labels)
55+
continue
56+
}
57+
58+
podKey := core.PodKey(ns, podName)
59+
pod, found := batch.MetricSets[podKey]
60+
if !found {
61+
pod, found = newPods[podKey]
62+
if !found {
63+
glog.V(2).Infof("Pod not found adding %s", podKey)
64+
pod = this.podMetricSet(metricSet.Labels)
65+
newPods[podKey] = pod
66+
}
67+
}
68+
69+
for metricName, metricValue := range metricSet.MetricValues {
70+
if _, found := this.skippedMetrics[metricName]; found {
71+
continue
72+
}
73+
74+
aggregatedValue, found := pod.MetricValues[metricName]
75+
if found {
76+
if aggregatedValue.ValueType != metricValue.ValueType {
77+
glog.Errorf("PodAggregator: inconsistent type in %s", metricName)
78+
continue
6079
}
6180

62-
for metricName, metricValue := range metricSet.MetricValues {
63-
if _, found := this.skippedMetrics[metricName]; found {
64-
continue
65-
}
66-
67-
aggregatedValue, found := pod.MetricValues[metricName]
68-
if found {
69-
if aggregatedValue.ValueType != metricValue.ValueType {
70-
glog.Errorf("PodAggregator: inconsistent type in %s", metricName)
71-
continue
72-
}
73-
74-
switch aggregatedValue.ValueType {
75-
case core.ValueInt64:
76-
aggregatedValue.IntValue += metricValue.IntValue
77-
case core.ValueFloat:
78-
aggregatedValue.FloatValue += metricValue.FloatValue
79-
default:
80-
return nil, fmt.Errorf("PodAggregator: type not supported in %s", metricName)
81-
}
82-
} else {
83-
aggregatedValue = metricValue
84-
}
85-
pod.MetricValues[metricName] = aggregatedValue
81+
switch aggregatedValue.ValueType {
82+
case core.ValueInt64:
83+
aggregatedValue.IntValue += metricValue.IntValue
84+
case core.ValueFloat:
85+
aggregatedValue.FloatValue += metricValue.FloatValue
86+
default:
87+
return nil, fmt.Errorf("PodAggregator: type not supported in %s", metricName)
8688
}
8789
} else {
88-
glog.Errorf("No namespace and/or pod info in container %s: %v", key, metricSet.Labels)
89-
continue
90+
aggregatedValue = metricValue
9091
}
92+
pod.MetricValues[metricName] = aggregatedValue
9193
}
9294
}
9395
for key, val := range newPods {

metrics/processors/pod_based_enricher.go

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -124,28 +124,30 @@ func addPodInfo(key string, podMs *core.MetricSet, pod *kube_api.Pod, batch *cor
124124
// Add cpu/mem requests and limits to containers
125125
for _, container := range pod.Spec.Containers {
126126
containerKey := core.PodContainerKey(pod.Namespace, pod.Name, container.Name)
127-
if _, found := batch.MetricSets[containerKey]; !found {
128-
if _, found := newMs[containerKey]; !found {
129-
glog.V(2).Infof("Container %s not found, creating a stub", containerKey)
130-
containerMs := &core.MetricSet{
131-
MetricValues: make(map[string]core.MetricValue),
132-
Labels: map[string]string{
133-
core.LabelMetricSetType.Key: core.MetricSetTypePodContainer,
134-
core.LabelNamespaceName.Key: pod.Namespace,
135-
core.LabelPodName.Key: pod.Name,
136-
core.LabelContainerName.Key: container.Name,
137-
core.LabelContainerBaseImage.Key: container.Image,
138-
core.LabelPodId.Key: string(pod.UID),
139-
core.LabelLabels.Key: util.LabelsToString(pod.Labels),
140-
core.LabelNodename.Key: podMs.Labels[core.LabelNodename.Key],
141-
core.LabelHostname.Key: podMs.Labels[core.LabelHostname.Key],
142-
core.LabelHostID.Key: podMs.Labels[core.LabelHostID.Key],
143-
},
144-
}
145-
updateContainerResourcesAndLimits(containerMs, container)
146-
newMs[containerKey] = containerMs
147-
}
127+
if _, found := batch.MetricSets[containerKey]; found {
128+
continue
129+
}
130+
if _, found := newMs[containerKey]; found {
131+
continue
132+
}
133+
glog.V(2).Infof("Container %s not found, creating a stub", containerKey)
134+
containerMs := &core.MetricSet{
135+
MetricValues: make(map[string]core.MetricValue),
136+
Labels: map[string]string{
137+
core.LabelMetricSetType.Key: core.MetricSetTypePodContainer,
138+
core.LabelNamespaceName.Key: pod.Namespace,
139+
core.LabelPodName.Key: pod.Name,
140+
core.LabelContainerName.Key: container.Name,
141+
core.LabelContainerBaseImage.Key: container.Image,
142+
core.LabelPodId.Key: string(pod.UID),
143+
core.LabelLabels.Key: util.LabelsToString(pod.Labels),
144+
core.LabelNodename.Key: podMs.Labels[core.LabelNodename.Key],
145+
core.LabelHostname.Key: podMs.Labels[core.LabelHostname.Key],
146+
core.LabelHostID.Key: podMs.Labels[core.LabelHostID.Key],
147+
},
148148
}
149+
updateContainerResourcesAndLimits(containerMs, container)
150+
newMs[containerKey] = containerMs
149151
}
150152
}
151153

metrics/processors/rate_calculator.go

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -42,47 +42,46 @@ func (this *RateCalculator) Process(batch *core.DataBatch) (*core.DataBatch, err
4242
}
4343

4444
for key, newMs := range batch.MetricSets {
45+
oldMs, found := this.previousBatch.MetricSets[key]
46+
if !found {
47+
continue
48+
}
49+
if !newMs.ScrapeTime.After(oldMs.ScrapeTime) {
50+
// New must be strictly after old.
51+
glog.V(4).Infof("Skipping rate calculations for %s - new batch (%s) was not scraped strictly after old batch (%s)", key, newMs.ScrapeTime, oldMs.ScrapeTime)
52+
continue
53+
}
54+
if !newMs.CreateTime.Equal(oldMs.CreateTime) {
55+
glog.V(4).Infof("Skipping rates for %s - different create time new:%v old:%v", key, newMs.CreateTime, oldMs.CreateTime)
56+
// Create time for container must be the same.
57+
continue
58+
}
4559

46-
if oldMs, found := this.previousBatch.MetricSets[key]; found {
47-
if !newMs.ScrapeTime.After(oldMs.ScrapeTime) {
48-
// New must be strictly after old.
49-
glog.V(4).Infof("Skipping rate calculations for %s - new batch (%s) was not scraped strictly after old batch (%s)", key, newMs.ScrapeTime, oldMs.ScrapeTime)
50-
continue
51-
}
52-
if !newMs.CreateTime.Equal(oldMs.CreateTime) {
53-
glog.V(4).Infof("Skipping rates for %s - different create time new:%v old:%v", key, newMs.CreateTime, oldMs.CreateTime)
54-
// Create time for container must be the same.
55-
continue
56-
}
57-
58-
for metricName, targetMetric := range this.rateMetricsMapping {
59-
metricValNew, foundNew := newMs.MetricValues[metricName]
60-
metricValOld, foundOld := oldMs.MetricValues[metricName]
61-
if foundNew && foundOld {
62-
if metricName == core.MetricCpuUsage.MetricDescriptor.Name {
63-
// cpu/usage values are in nanoseconds; we want to have it in millicores (that's why constant 1000 is here).
64-
newVal := 1000 * (metricValNew.IntValue - metricValOld.IntValue) /
65-
(newMs.ScrapeTime.UnixNano() - oldMs.ScrapeTime.UnixNano())
60+
for metricName, targetMetric := range this.rateMetricsMapping {
61+
metricValNew, foundNew := newMs.MetricValues[metricName]
62+
metricValOld, foundOld := oldMs.MetricValues[metricName]
63+
if foundNew && foundOld && metricName == core.MetricCpuUsage.MetricDescriptor.Name {
64+
// cpu/usage values are in nanoseconds; we want to have it in millicores (that's why constant 1000 is here).
65+
newVal := 1000 * (metricValNew.IntValue - metricValOld.IntValue) /
66+
(newMs.ScrapeTime.UnixNano() - oldMs.ScrapeTime.UnixNano())
6667

67-
newMs.MetricValues[targetMetric.MetricDescriptor.Name] = core.MetricValue{
68-
ValueType: core.ValueInt64,
69-
MetricType: core.MetricGauge,
70-
IntValue: newVal,
71-
}
68+
newMs.MetricValues[targetMetric.MetricDescriptor.Name] = core.MetricValue{
69+
ValueType: core.ValueInt64,
70+
MetricType: core.MetricGauge,
71+
IntValue: newVal,
72+
}
7273

73-
} else if targetMetric.MetricDescriptor.ValueType == core.ValueFloat {
74-
newVal := 1e9 * float32(metricValNew.IntValue-metricValOld.IntValue) /
75-
float32(newMs.ScrapeTime.UnixNano()-oldMs.ScrapeTime.UnixNano())
74+
} else if foundNew && foundOld && targetMetric.MetricDescriptor.ValueType == core.ValueFloat {
75+
newVal := 1e9 * float32(metricValNew.IntValue-metricValOld.IntValue) /
76+
float32(newMs.ScrapeTime.UnixNano()-oldMs.ScrapeTime.UnixNano())
7677

77-
newMs.MetricValues[targetMetric.MetricDescriptor.Name] = core.MetricValue{
78-
ValueType: core.ValueFloat,
79-
MetricType: core.MetricGauge,
80-
FloatValue: newVal,
81-
}
82-
}
83-
} else if foundNew && !foundOld || !foundNew && foundOld {
84-
glog.V(4).Infof("Skipping rates for %s in %s: metric not found in one of old (%v) or new (%v)", metricName, key, foundOld, foundNew)
78+
newMs.MetricValues[targetMetric.MetricDescriptor.Name] = core.MetricValue{
79+
ValueType: core.ValueFloat,
80+
MetricType: core.MetricGauge,
81+
FloatValue: newVal,
8582
}
83+
} else if foundNew && !foundOld || !foundNew && foundOld {
84+
glog.V(4).Infof("Skipping rates for %s in %s: metric not found in one of old (%v) or new (%v)", metricName, key, foundOld, foundNew)
8685
}
8786
}
8887
}

0 commit comments

Comments
 (0)