Skip to content

Commit 3b419dc

Browse files
authored
updated countconnector timestamps (#42006)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Updates the countconnector metric timestamps to use earliest timestamp from batch as start timestamp and latest as timestamp, instead of using timestamp from metric creation time <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #40573 <!--Describe what testing was performed and which tests were added.--> #### Testing Updates TracesToMetric tests to include checks for timestamp values
1 parent 771b818 commit 3b419dc

12 files changed

+286
-117
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: countconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Updates the countconnector metric timestamps to use earliest timestamp from batch as start timestamp and latest as timestamp, instead of using timestamp from metric creation time.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40573]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/countconnector/connector.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,14 @@ func (c *count) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
5858

5959
for k := 0; k < scopeSpan.Spans().Len(); k++ {
6060
span := scopeSpan.Spans().At(k)
61+
spansCounter.updateTimestamp(span.StartTimestamp())
62+
spansCounter.updateTimestamp(span.EndTimestamp())
6163
sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
6264
multiError = errors.Join(multiError, spansCounter.update(ctx, span.Attributes(), sCtx))
6365

6466
for l := 0; l < span.Events().Len(); l++ {
6567
event := span.Events().At(l)
68+
spanEventsCounter.updateTimestamp(event.Timestamp())
6669
eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
6770
multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), eCtx))
6871
}
@@ -111,32 +114,42 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
111114
case pmetric.MetricTypeGauge:
112115
dps := metric.Gauge().DataPoints()
113116
for i := 0; i < dps.Len(); i++ {
114-
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
115-
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
117+
dp := dps.At(i)
118+
dataPointsCounter.updateTimestamp(dp.Timestamp())
119+
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
120+
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx))
116121
}
117122
case pmetric.MetricTypeSum:
118123
dps := metric.Sum().DataPoints()
119124
for i := 0; i < dps.Len(); i++ {
120-
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
121-
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
125+
dp := dps.At(i)
126+
dataPointsCounter.updateTimestamp(dp.Timestamp())
127+
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
128+
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx))
122129
}
123130
case pmetric.MetricTypeSummary:
124131
dps := metric.Summary().DataPoints()
125132
for i := 0; i < dps.Len(); i++ {
126-
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
127-
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
133+
dp := dps.At(i)
134+
dataPointsCounter.updateTimestamp(dp.Timestamp())
135+
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
136+
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx))
128137
}
129138
case pmetric.MetricTypeHistogram:
130139
dps := metric.Histogram().DataPoints()
131140
for i := 0; i < dps.Len(); i++ {
132-
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
133-
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
141+
dp := dps.At(i)
142+
dataPointsCounter.updateTimestamp(dp.Timestamp())
143+
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
144+
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx))
134145
}
135146
case pmetric.MetricTypeExponentialHistogram:
136147
dps := metric.ExponentialHistogram().DataPoints()
137148
for i := 0; i < dps.Len(); i++ {
138-
dCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
139-
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dps.At(i).Attributes(), dCtx))
149+
dp := dps.At(i)
150+
dataPointsCounter.updateTimestamp(dp.Timestamp())
151+
dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric)
152+
multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx))
140153
}
141154
case pmetric.MetricTypeEmpty:
142155
multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type()))
@@ -177,7 +190,7 @@ func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
177190

178191
for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
179192
logRecord := scopeLogs.LogRecords().At(k)
180-
193+
counter.updateTimestamp(logRecord.Timestamp())
181194
lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog)
182195
multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), lCtx))
183196
}
@@ -215,7 +228,7 @@ func (c *count) ConsumeProfiles(ctx context.Context, ld pprofile.Profiles) error
215228

216229
for k := 0; k < scopeProfile.Profiles().Len(); k++ {
217230
profile := scopeProfile.Profiles().At(k)
218-
231+
counter.updateTimestamp(profile.Time())
219232
pCtx := ottlprofile.NewTransformContext(profile, ld.Dictionary(), scopeProfile.Scope(), resourceProfile.Resource(), scopeProfile, resourceProfile)
220233
attributes := pprofile.FromAttributeIndices(ld.Dictionary().AttributeTable(), profile, ld.Dictionary())
221234
multiError = errors.Join(multiError, counter.update(ctx, attributes, pCtx))

connector/countconnector/connector_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ func TestTracesToMetrics(t *testing.T) {
272272
expected, err := golden.ReadMetrics(filepath.Join("testdata", "traces", tc.name+".yaml"))
273273
assert.NoError(t, err)
274274
assert.NoError(t, pmetrictest.CompareMetrics(expected, allMetrics[0],
275-
pmetrictest.IgnoreTimestamp(),
276275
pmetrictest.IgnoreResourceMetricsOrder(),
277276
pmetrictest.IgnoreMetricsOrder(),
278277
pmetrictest.IgnoreMetricDataPointsOrder()))
@@ -514,6 +513,7 @@ func TestMetricsToMetrics(t *testing.T) {
514513
expected, err := golden.ReadMetrics(filepath.Join("testdata", "metrics", tc.name+".yaml"))
515514
assert.NoError(t, err)
516515
assert.NoError(t, pmetrictest.CompareMetrics(expected, allMetrics[0],
516+
pmetrictest.IgnoreStartTimestamp(),
517517
pmetrictest.IgnoreTimestamp(),
518518
pmetrictest.IgnoreResourceMetricsOrder(),
519519
pmetrictest.IgnoreMetricsOrder(),
@@ -687,6 +687,7 @@ func TestLogsToMetrics(t *testing.T) {
687687
assert.NoError(t, err)
688688
assert.NoError(t, pmetrictest.CompareMetrics(expected, allMetrics[0],
689689
pmetrictest.IgnoreTimestamp(),
690+
pmetrictest.IgnoreStartTimestamp(),
690691
pmetrictest.IgnoreResourceMetricsOrder(),
691692
pmetrictest.IgnoreMetricsOrder(),
692693
pmetrictest.IgnoreMetricDataPointsOrder()))
@@ -858,6 +859,7 @@ func TestProfilesToMetrics(t *testing.T) {
858859
expected, err := golden.ReadMetrics(filepath.Join("testdata", "profiles", tc.name+".yaml"))
859860
assert.NoError(t, err)
860861
assert.NoError(t, pmetrictest.CompareMetrics(expected, allMetrics[0],
862+
pmetrictest.IgnoreStartTimestamp(),
861863
pmetrictest.IgnoreTimestamp(),
862864
pmetrictest.IgnoreResourceMetricsOrder(),
863865
pmetrictest.IgnoreMetricsOrder(),

connector/countconnector/counter.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ func newCounter[K any](metricDefs map[string]metricDef[K]) *counter[K] {
2020
return &counter[K]{
2121
metricDefs: metricDefs,
2222
counts: make(map[string]map[[16]byte]*attrCounter, len(metricDefs)),
23-
timestamp: time.Now(),
2423
}
2524
}
2625

2726
type counter[K any] struct {
2827
metricDefs map[string]metricDef[K]
2928
counts map[string]map[[16]byte]*attrCounter
30-
timestamp time.Time
29+
startTime pcommon.Timestamp
30+
endTime pcommon.Timestamp
3131
}
3232

3333
type attrCounter struct {
@@ -87,6 +87,32 @@ func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) erro
8787
return multiError
8888
}
8989

90+
// updateTimestamp updates the start and end timestamps based on the provided timestamp
91+
func (c *counter[K]) updateTimestamp(timestamp pcommon.Timestamp) {
92+
if timestamp != 0 {
93+
if c.startTime == 0 {
94+
c.endTime = timestamp
95+
c.startTime = timestamp
96+
} else {
97+
if timestamp < c.startTime {
98+
c.startTime = timestamp
99+
}
100+
if timestamp > c.endTime {
101+
c.endTime = timestamp
102+
}
103+
}
104+
}
105+
}
106+
107+
// getTimestamps either gets the valid start and end timestamps or returns the current time
108+
func (c *counter[K]) getTimestamps() (pcommon.Timestamp, pcommon.Timestamp) {
109+
if c.startTime != 0 {
110+
return c.startTime, c.endTime
111+
}
112+
now := pcommon.NewTimestampFromTime(time.Now())
113+
return now, now
114+
}
115+
90116
func (c *counter[K]) increment(metricName string, attrs pcommon.Map) error {
91117
if _, ok := c.counts[metricName]; !ok {
92118
c.counts[metricName] = make(map[[16]byte]*attrCounter)
@@ -121,8 +147,9 @@ func (c *counter[K]) appendMetricsTo(metricSlice pmetric.MetricSlice) {
121147
dp := sum.DataPoints().AppendEmpty()
122148
dpCount.attrs.CopyTo(dp.Attributes())
123149
dp.SetIntValue(int64(dpCount.count))
124-
// TODO determine appropriate start time
125-
dp.SetTimestamp(pcommon.NewTimestampFromTime(c.timestamp))
150+
startTime, endTime := c.getTimestamps()
151+
dp.SetStartTimestamp(startTime)
152+
dp.SetTimestamp(endTime)
126153
}
127154
}
128155
}

connector/countconnector/testdata/traces/condition_and_attribute.yaml

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ resourceMetrics:
1919
- key: span.required
2020
value:
2121
stringValue: foo
22-
timeUnixNano: "1678392127929005000"
22+
startTimeUnixNano: "1581452772000000321"
23+
timeUnixNano: "1581452773000000789"
2324
- asInt: "1"
2425
attributes:
2526
- key: span.required
2627
value:
2728
stringValue: notfoo
28-
timeUnixNano: "1678392127929005000"
29+
startTimeUnixNano: "1581452772000000321"
30+
timeUnixNano: "1581452773000000789"
2931
isMonotonic: true
3032
- description: Span event count by attribute if ...
3133
name: spanevent.count.if.by_attr
@@ -37,13 +39,15 @@ resourceMetrics:
3739
- key: event.required
3840
value:
3941
stringValue: foo
40-
timeUnixNano: "1678392127929006000"
42+
startTimeUnixNano: "1581452773000000123"
43+
timeUnixNano: "1581452773000000123"
4144
- asInt: "4"
4245
attributes:
4346
- key: event.required
4447
value:
4548
stringValue: notfoo
46-
timeUnixNano: "1678392127929006000"
49+
startTimeUnixNano: "1581452773000000123"
50+
timeUnixNano: "1581452773000000123"
4751
isMonotonic: true
4852
scope:
4953
name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector
@@ -67,13 +71,15 @@ resourceMetrics:
6771
- key: span.required
6872
value:
6973
stringValue: foo
70-
timeUnixNano: "1678392127929018000"
74+
startTimeUnixNano: "1581452772000000321"
75+
timeUnixNano: "1581452773000000789"
7176
- asInt: "1"
7277
attributes:
7378
- key: span.required
7479
value:
7580
stringValue: notfoo
76-
timeUnixNano: "1678392127929018000"
81+
startTimeUnixNano: "1581452772000000321"
82+
timeUnixNano: "1581452773000000789"
7783
isMonotonic: true
7884
- description: Span event count by attribute if ...
7985
name: spanevent.count.if.by_attr
@@ -85,13 +91,15 @@ resourceMetrics:
8591
- key: event.required
8692
value:
8793
stringValue: foo
88-
timeUnixNano: "1678392127929018000"
94+
startTimeUnixNano: "1581452773000000123"
95+
timeUnixNano: "1581452773000000123"
8996
- asInt: "4"
9097
attributes:
9198
- key: event.required
9299
value:
93100
stringValue: notfoo
94-
timeUnixNano: "1678392127929018000"
101+
startTimeUnixNano: "1581452773000000123"
102+
timeUnixNano: "1581452773000000123"
95103
isMonotonic: true
96104
scope:
97105
name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector

0 commit comments

Comments
 (0)