Skip to content

Commit 0563a71

Browse files
authored
Fix Observable Counters/UpDownCounters (#2298)
1 parent 049f7e8 commit 0563a71

File tree

2 files changed

+130
-12
lines changed

2 files changed

+130
-12
lines changed

sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,16 @@ class DefaultAggregation
2828
const opentelemetry::sdk::metrics::InstrumentDescriptor &instrument_descriptor,
2929
const AggregationConfig *aggregation_config)
3030
{
31-
switch (instrument_descriptor.type_)
31+
bool is_monotonic = true;
32+
auto aggr_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic);
33+
switch (aggr_type)
3234
{
33-
case InstrumentType::kCounter:
34-
case InstrumentType::kObservableCounter:
35+
case AggregationType::kSum:
3536
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
36-
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(true)))
37+
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(is_monotonic)))
3738
: std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(true)));
38-
case InstrumentType::kUpDownCounter:
39-
case InstrumentType::kObservableUpDownCounter:
40-
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
41-
? std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(false)))
42-
: std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(false)));
4339
break;
44-
case InstrumentType::kHistogram: {
40+
case AggregationType::kHistogram: {
4541
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
4642
{
4743
return (std::unique_ptr<Aggregation>(new LongHistogramAggregation(aggregation_config)));
@@ -53,7 +49,7 @@ class DefaultAggregation
5349

5450
break;
5551
}
56-
case InstrumentType::kObservableGauge:
52+
case AggregationType::kLastValue:
5753
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
5854
? std::move(std::unique_ptr<Aggregation>(new LongLastValueAggregation()))
5955
: std::move(std::unique_ptr<Aggregation>(new DoubleLastValueAggregation()));
@@ -121,6 +117,11 @@ class DefaultAggregation
121117
const Aggregation &to_copy)
122118
{
123119
const PointType point_data = to_copy.ToPoint();
120+
bool is_monotonic = true;
121+
if (aggregation_type == AggregationType::kDefault)
122+
{
123+
aggregation_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic);
124+
}
124125
switch (aggregation_type)
125126
{
126127
case AggregationType::kDrop:
@@ -159,7 +160,29 @@ class DefaultAggregation
159160
new DoubleSumAggregation(nostd::get<SumPointData>(point_data)));
160161
}
161162
default:
162-
return DefaultAggregation::CreateAggregation(instrument_descriptor, nullptr);
163+
return nullptr; // won't reach here
164+
}
165+
}
166+
167+
static AggregationType GetDefaultAggregationType(InstrumentType instrument_type,
168+
bool &is_monotonic)
169+
{
170+
is_monotonic = false;
171+
switch (instrument_type)
172+
{
173+
case InstrumentType::kCounter:
174+
case InstrumentType::kObservableCounter:
175+
is_monotonic = true;
176+
return AggregationType::kSum;
177+
case InstrumentType::kUpDownCounter:
178+
case InstrumentType::kObservableUpDownCounter:
179+
return AggregationType::kSum;
180+
case InstrumentType::kHistogram:
181+
return AggregationType::kHistogram;
182+
case InstrumentType::kObservableGauge:
183+
return AggregationType::kLastValue;
184+
default:
185+
return AggregationType::kDrop;
163186
}
164187
}
165188
};

sdk/test/metrics/async_metric_storage_test.cc

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ using M = std::map<std::string, std::string>;
3030
class WritableMetricStorageTestFixture : public ::testing::TestWithParam<AggregationTemporality>
3131
{};
3232

33+
class WritableMetricStorageTestUpDownFixture
34+
: public ::testing::TestWithParam<AggregationTemporality>
35+
{};
36+
3337
class WritableMetricStorageTestObservableGaugeFixture
3438
: public ::testing::TestWithParam<AggregationTemporality>
3539
{};
@@ -124,6 +128,97 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
124128
::testing::Values(AggregationTemporality::kCumulative,
125129
AggregationTemporality::kDelta));
126130

131+
TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
132+
{
133+
AggregationTemporality temporality = GetParam();
134+
135+
InstrumentDescriptor instr_desc = {"name", "desc", "1unit",
136+
InstrumentType::kObservableUpDownCounter,
137+
InstrumentValueType::kLong};
138+
139+
auto sdk_start_ts = std::chrono::system_clock::now();
140+
// Some computation here
141+
auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);
142+
143+
std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
144+
std::vector<std::shared_ptr<CollectorHandle>> collectors;
145+
collectors.push_back(collector);
146+
147+
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault,
148+
nullptr);
149+
int64_t get_count1 = 20;
150+
int64_t put_count1 = 10;
151+
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
152+
{{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
153+
storage.RecordLong(measurements1,
154+
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
155+
156+
storage.Collect(
157+
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
158+
for (const auto &data_attr : metric_data.point_data_attr_)
159+
{
160+
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
161+
if (opentelemetry::nostd::get<std::string>(
162+
data_attr.attributes.find("RequestType")->second) == "GET")
163+
{
164+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count1);
165+
}
166+
else if (opentelemetry::nostd::get<std::string>(
167+
data_attr.attributes.find("RequestType")->second) == "PUT")
168+
{
169+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count1);
170+
}
171+
}
172+
return true;
173+
});
174+
// subsequent recording after collection shouldn't fail
175+
// monotonic increasing values;
176+
int64_t get_count2 = -50;
177+
int64_t put_count2 = -70;
178+
179+
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements2 = {
180+
{{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
181+
storage.RecordLong(measurements2,
182+
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
183+
storage.Collect(
184+
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
185+
for (const auto &data_attr : metric_data.point_data_attr_)
186+
{
187+
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
188+
if (opentelemetry::nostd::get<std::string>(
189+
data_attr.attributes.find("RequestType")->second) == "GET")
190+
{
191+
if (temporality == AggregationTemporality::kCumulative)
192+
{
193+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2);
194+
}
195+
else
196+
{
197+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2 - get_count1);
198+
}
199+
}
200+
else if (opentelemetry::nostd::get<std::string>(
201+
data_attr.attributes.find("RequestType")->second) == "PUT")
202+
{
203+
if (temporality == AggregationTemporality::kCumulative)
204+
{
205+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2);
206+
}
207+
else
208+
{
209+
EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2 - put_count1);
210+
}
211+
}
212+
}
213+
return true;
214+
});
215+
}
216+
217+
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestUpDownLong,
218+
WritableMetricStorageTestUpDownFixture,
219+
::testing::Values(AggregationTemporality::kCumulative,
220+
AggregationTemporality::kDelta));
221+
127222
TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
128223
{
129224
AggregationTemporality temporality = GetParam();

0 commit comments

Comments
 (0)