Skip to content

Commit 178cd3b

Browse files
dricrossagarakan
authored andcommitted
Implement prometheus adapter primary functionality (#1751)
1 parent a0590da commit 178cd3b

File tree

12 files changed

+943
-15
lines changed

12 files changed

+943
-15
lines changed

go.mod

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ go 1.24.4
44

55
replace github.com/influxdata/telegraf => github.com/aws/telegraf v0.10.2-0.20250113150713-a2dfaa4cdf6d
66

7+
replace collectd.org v0.4.0 => github.com/collectd/go-collectd v0.4.0
8+
79
// Replace with https://github.com/amazon-contributing/opentelemetry-collector-contrib, there are no requirements for all receivers/processors/exporters
810
// to be all replaced since there are some changes that will always be from upstream
911
replace (
@@ -231,6 +233,11 @@ require (
231233
k8s.io/klog/v2 v2.130.1
232234
)
233235

236+
require (
237+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.124.1
238+
go.opentelemetry.io/otel v1.35.0
239+
)
240+
234241
require (
235242
cloud.google.com/go/auth v0.15.0 // indirect
236243
cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect
@@ -461,7 +468,6 @@ require (
461468
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.124.1 // indirect
462469
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.124.1 // indirect
463470
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.124.1 // indirect
464-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.124.1 // indirect
465471
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.124.1 // indirect
466472
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.124.1 // indirect
467473
github.com/opencontainers/cgroups v0.0.1 // indirect
@@ -565,7 +571,6 @@ require (
565571
go.opentelemetry.io/contrib/otelconf v0.15.0 // indirect
566572
go.opentelemetry.io/contrib/propagators/b3 v1.35.0 // indirect
567573
go.opentelemetry.io/contrib/zpages v0.60.0 // indirect
568-
go.opentelemetry.io/otel v1.35.0 // indirect
569574
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.11.0 // indirect
570575
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 // indirect
571576
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.35.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
4848
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
4949
code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o=
5050
code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
51-
collectd.org v0.4.0 h1:nWNldfMqg7EVWAevG8oyOVsS9r/UHRG3LZRf6MdQho0=
52-
collectd.org v0.4.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
5351
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
5452
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
5553
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@@ -409,6 +407,8 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht
409407
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
410408
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk=
411409
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
410+
github.com/collectd/go-collectd v0.4.0 h1:0u1GXChSSlMw+riTADhcWwTjugckVWxjQHJi7Zj2Xe4=
411+
github.com/collectd/go-collectd v0.4.0/go.mod h1:xQe/Em/Q9IrN5ifWOvK5ZP2vMIB4KlsIA550yYzhkpY=
412412
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
413413
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
414414
github.com/containerd/containerd/api v1.8.0 h1:hVTNJKR8fMc/2Tiw60ZRijntNMd1U+JVMyTRdsD2bS0=

plugins/processors/prometheusadapter/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestCreateProcessor(t *testing.T) {
2929
require.NotNil(t, factory)
3030

3131
cfg := factory.CreateDefaultConfig()
32-
setting := processortest.NewNopSettings()
32+
setting := processortest.NewNopSettings(TypeStr)
3333

3434
tProcessor, err := factory.CreateTraces(context.Background(), setting, cfg, consumertest.NewNop())
3535
assert.Equal(t, err, pipeline.ErrSignalNotSupported)
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package internal
5+
6+
import (
7+
"log"
8+
"math"
9+
"time"
10+
11+
"github.com/aws/amazon-cloudwatch-agent/internal/mapWithExpiry"
12+
"github.com/prometheus/prometheus/model/value"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
)
16+
17+
const (
18+
CleanUpTimeThreshold = 60 * 1000 // 1 minute
19+
CacheTTL = 5 * time.Minute // 5 minutes
20+
)
21+
22+
type dataPoint struct {
23+
value float64
24+
timestamp pcommon.Timestamp
25+
}
26+
type DeltaCalculator struct {
27+
preDataPoints *mapWithExpiry.MapWithExpiry
28+
lastCleanUpTime pcommon.Timestamp
29+
}
30+
31+
func (dc *DeltaCalculator) Calculate(m pmetric.Metric) {
32+
33+
switch m.Type() {
34+
case pmetric.MetricTypeSum:
35+
// only calculate delta if it's a cumulative sum metric
36+
if m.Sum().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
37+
return
38+
}
39+
40+
dps := m.Sum().DataPoints()
41+
dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
42+
identity := MetricIdentity{
43+
name: m.Name(),
44+
tags: dp.Attributes().AsRaw(),
45+
}
46+
metricKey := identity.getKey()
47+
48+
var curVal float64
49+
switch dp.ValueType() {
50+
case pmetric.NumberDataPointValueTypeInt:
51+
curVal = float64(dp.IntValue())
52+
case pmetric.NumberDataPointValueTypeDouble:
53+
curVal = dp.DoubleValue()
54+
case pmetric.NumberDataPointValueTypeEmpty:
55+
fallthrough
56+
default:
57+
// cannot handle the type so drop the data point
58+
log.Printf("D! DeltaCalculator.calculate: Drop metric with value type: %v", dp.ValueType())
59+
return true
60+
}
61+
62+
if !isValueValid(dp.DoubleValue()) {
63+
log.Printf("D! DeltaCalculator.calculate: Drop metric with NaN or Inf value: %v", curVal)
64+
//When the raws values are like this: 1, 2, 3, 4, NaN, NaN, NaN, ..., 100, 101, 102,
65+
//and the previous value is not reset, we will get a wrong delta value (at 100) as 100 - 4 = 96
66+
//To avoid this issue, we reset the previous value whenever an invalid value is encountered
67+
dc.preDataPoints.Delete(metricKey)
68+
return true
69+
}
70+
71+
curTime := dp.Timestamp()
72+
73+
dropValue := false
74+
if newVal, ok := dc.calculateDatapoint(metricKey, curVal, curTime); ok {
75+
switch dp.ValueType() {
76+
case pmetric.NumberDataPointValueTypeInt:
77+
dp.SetIntValue(int64(newVal))
78+
case pmetric.NumberDataPointValueTypeDouble:
79+
dp.SetDoubleValue(newVal)
80+
case pmetric.NumberDataPointValueTypeEmpty:
81+
fallthrough
82+
default:
83+
log.Printf("D! DeltaCalculator.calculate: Drop metric with value type: %v", dp.ValueType())
84+
return true
85+
}
86+
} else {
87+
// Drop the initial value for delta calculations
88+
dropValue = true
89+
}
90+
91+
// Clean up the stale cache periodically
92+
if curTime-dc.lastCleanUpTime >= CleanUpTimeThreshold {
93+
dc.preDataPoints.CleanUp(time.Now())
94+
dc.lastCleanUpTime = curTime
95+
}
96+
97+
dc.preDataPoints.Set(metricKey, dataPoint{value: curVal, timestamp: curTime})
98+
99+
return dropValue
100+
})
101+
102+
case pmetric.MetricTypeSummary:
103+
dps := m.Summary().DataPoints()
104+
dps.RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
105+
sumIdentity := MetricIdentity{
106+
name: m.Name() + "_sum",
107+
tags: dp.Attributes().AsRaw(),
108+
}
109+
sumKey := sumIdentity.getKey()
110+
111+
curSum := dp.Sum()
112+
if !isValueValid(curSum) {
113+
log.Printf("D! DeltaCalculator.calculate: Drop metric with NaN or Inf value: %v", curSum)
114+
//When the raws values are like this: 1, 2, 3, 4, NaN, NaN, NaN, ..., 100, 101, 102,
115+
//and the previous value is not reset, we will get a wrong delta value (at 100) as 100 - 4 = 96
116+
//To avoid this issue, we reset the previous value whenever an invalid value is encountered
117+
dc.preDataPoints.Delete(sumKey)
118+
return true
119+
}
120+
121+
curTime := dp.Timestamp()
122+
curCount := float64(dp.Count())
123+
countIdentity := MetricIdentity{
124+
name: m.Name() + "_count",
125+
tags: dp.Attributes().AsRaw(),
126+
}
127+
countKey := countIdentity.getKey()
128+
129+
dropValue := false
130+
if newVal, ok := dc.calculateDatapoint(sumKey, curSum, curTime); ok {
131+
dp.SetSum(newVal)
132+
} else {
133+
dropValue = true
134+
}
135+
if newVal, ok := dc.calculateDatapoint(countKey, curCount, curTime); ok {
136+
dp.SetCount(uint64(newVal))
137+
} else {
138+
// we shouldn't ever see sum without count, but drop the entire summary if either are not present
139+
dropValue = true
140+
}
141+
142+
// Clean up the stale cache periodically
143+
if curTime-dc.lastCleanUpTime >= CleanUpTimeThreshold {
144+
dc.preDataPoints.CleanUp(time.Now())
145+
dc.lastCleanUpTime = curTime
146+
}
147+
148+
dc.preDataPoints.Set(countKey, dataPoint{value: curCount, timestamp: curTime})
149+
dc.preDataPoints.Set(sumKey, dataPoint{value: curSum, timestamp: curTime})
150+
151+
return dropValue
152+
})
153+
154+
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram:
155+
fallthrough
156+
default:
157+
log.Printf("W! DeltaCalculator.Calculate: ignoring metric %s", m.Name())
158+
return
159+
}
160+
161+
}
162+
163+
func (dc *DeltaCalculator) calculateDatapoint(key string, value float64, curTime pcommon.Timestamp) (float64, bool) {
164+
if v, ok := dc.preDataPoints.Get(key); ok {
165+
preDataPoint := v.(dataPoint)
166+
newVal := value
167+
if curTime > preDataPoint.timestamp {
168+
if value >= preDataPoint.value {
169+
newVal = value - preDataPoint.value
170+
} else {
171+
// the counter has been reset, keep the current value as delta
172+
}
173+
}
174+
return newVal, true
175+
}
176+
return value, false
177+
}
178+
179+
func NewDeltaCalculator() *DeltaCalculator {
180+
return &DeltaCalculator{preDataPoints: mapWithExpiry.NewMapWithExpiry(CacheTTL), lastCleanUpTime: 0}
181+
}
182+
183+
func isValueValid(v float64) bool {
184+
//treat NaN and +/-Inf values as invalid as emf log doesn't support them
185+
return !value.IsStaleNaN(v) && !math.IsNaN(v) && !math.IsInf(v, 0)
186+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal
5+
6+
import (
7+
"bytes"
8+
"fmt"
9+
"maps"
10+
"slices"
11+
)
12+
13+
type MetricIdentity struct {
14+
name string
15+
tags map[string]any
16+
}
17+
18+
func (mi *MetricIdentity) getKey() string {
19+
b := new(bytes.Buffer)
20+
keys := slices.Sorted(maps.Keys(mi.tags))
21+
for _, k := range keys {
22+
_, _ = fmt.Fprintf(b, "%s=%s,", k, mi.tags[k])
23+
}
24+
// We assume there won't be same metricName+tags with different metricType, so that it is not necessary to add metricType into uniqKey.
25+
_, _ = fmt.Fprintf(b, "metricName=%s,", mi.name)
26+
return b.String()
27+
}

0 commit comments

Comments
 (0)