Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e98e336
[METRICS SDK] Fix hash collision in MetricAttributes
ThomsonTan Mar 25, 2025
adf6126
Fix ccardinality_limit test
ThomsonTan Mar 25, 2025
3196544
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Mar 25, 2025
938b241
Remove unused hash function
ThomsonTan Mar 25, 2025
274ed36
Remove unnecessary hash code
ThomsonTan Mar 25, 2025
7f2aba2
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Mar 25, 2025
1cad020
Add new line
ThomsonTan Mar 25, 2025
59fa221
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Mar 25, 2025
5ebc039
Removed unused variable
ThomsonTan Mar 25, 2025
e88c7ae
Avoid copying MetricAttributes
ThomsonTan Mar 25, 2025
1d0ebea
Update hash for remaining ctor
ThomsonTan Mar 25, 2025
3dcb0ad
Add copy/move ctor and assignment operator for FilteredOrderedAttribu…
ThomsonTan Mar 25, 2025
6b899ef
Add unit test on hash collision of MetricAttributes
ThomsonTan Mar 25, 2025
d836c10
Fix bazel build
ThomsonTan Mar 25, 2025
44a467a
Format
ThomsonTan Mar 25, 2025
a7698d0
More format
ThomsonTan Mar 25, 2025
72ef04e
Update changelog
ThomsonTan Mar 25, 2025
d9d0e82
Fix unused parameter
ThomsonTan Mar 25, 2025
f17399c
Add equity operator for MetricsAttributes
ThomsonTan Mar 26, 2025
809a70a
Update hash for empty MetricAttributes
ThomsonTan Mar 26, 2025
2c017e3
Fix reservior test
ThomsonTan Mar 26, 2025
a5735f3
Remove unnecessary set
ThomsonTan Mar 26, 2025
95a9f7f
Fix iwyu
ThomsonTan Mar 26, 2025
eef46df
More fix on iywu
ThomsonTan Mar 26, 2025
7cc78be
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Mar 27, 2025
4a8b77f
Addressing feedback
ThomsonTan Mar 27, 2025
adcd477
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Mar 27, 2025
47409a5
Merge branch 'main' into fix_metrics_hashmap_collision
marcalff Mar 28, 2025
ce7dc77
Merge branch 'main' into fix_metrics_hashmap_collision
marcalff Mar 30, 2025
7bb5652
Merge branch 'main' into fix_metrics_hashmap_collision
marcalff Apr 1, 2025
8d5fe18
Address feedback
ThomsonTan Apr 1, 2025
5e2778c
Merge branch 'main' into fix_metrics_hashmap_collision
ThomsonTan Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Increment the:

## [Unreleased]

* [METRICS SDK] Fix hash collision in MetricAttributes
[#3322](https://github.com/open-telemetry/opentelemetry-cpp/pull/3322)

* [BUILD] Fix misssing exported definition for OTLP file exporter and forceflush
[#3319](https://github.com/open-telemetry/opentelemetry-cpp/pull/3319)

Expand Down
25 changes: 0 additions & 25 deletions sdk/include/opentelemetry/sdk/common/attributemap_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
#include <utility>
#include <vector>

#include "opentelemetry/common/attribute_value.h"
#include "opentelemetry/common/key_value_iterable.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/attribute_utils.h"
#include "opentelemetry/version.h"
Expand Down Expand Up @@ -74,27 +70,6 @@ inline size_t GetHashForAttributeMap(const OrderedAttributeMap &attribute_map)
return seed;
}

// Calculate hash of keys and values of KeyValueIterable, filtered using callback.
inline size_t GetHashForAttributeMap(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not expected to calculate hash on KeyValueIterable by traversing it, because the order of the elements should not affect the hashing. MetricAttributes need to be constructed from it before hash. So remove this function.

const opentelemetry::common::KeyValueIterable &attributes,
nostd::function_ref<bool(nostd::string_view)> is_key_present_callback)
{
AttributeConverter converter;
size_t seed = 0UL;
attributes.ForEachKeyValue(
[&](nostd::string_view key, opentelemetry::common::AttributeValue value) noexcept {
if (!is_key_present_callback(key))
{
return true;
}
GetHash(seed, key);
auto attr_val = nostd::visit(converter, value);
nostd::visit(GetHashForAttributeValueVisitor(seed), attr_val);
return true;
});
return seed;
}

template <class T>
inline size_t GetHash(T value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ReservoirCell
res.erase(it);
}
}
res.UpdateHash();
return res;
}

Expand Down
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct InstrumentDescriptor
};

using MetricAttributes = opentelemetry::sdk::metrics::FilteredOrderedAttributeMap;
using MetricAttributesHash = opentelemetry::sdk::metrics::FilteredOrderedAttributeMapHash;
using AggregationTemporalitySelector = std::function<AggregationTemporality(InstrumentType)>;

/*class InstrumentSelector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,22 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora

auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(measurement.first);
auto prev = cumulative_hash_map_->Get(hash);
auto prev = cumulative_hash_map_->Get(measurement.first);
if (prev)
{
auto delta = prev->Diff(*aggr);
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
// storage)
cumulative_hash_map_->Set(measurement.first, std::move(aggr), hash);
delta_hash_map_->Set(measurement.first, std::move(delta), hash);
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
delta_hash_map_->Set(measurement.first, std::move(delta));
}
else
{
// store received value in cumulative and delta map.
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr),
hash);
delta_hash_map_->Set(measurement.first, std::move(aggr), hash);
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
delta_hash_map_->Set(measurement.first, std::move(aggr));
}
}
}
Expand Down
133 changes: 76 additions & 57 deletions sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "opentelemetry/sdk/common/attribute_utils.h"
#include "opentelemetry/sdk/common/attributemap_hash.h"
#include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
#include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h"
#include "opentelemetry/sdk/metrics/view/attributes_processor.h"
#include "opentelemetry/version.h"

Expand All @@ -29,9 +30,9 @@ using opentelemetry::sdk::common::OrderedAttributeMap;
constexpr size_t kAggregationCardinalityLimit = 2000;
const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow";
const bool kAttributesLimitOverflowValue = true;
const size_t kOverflowAttributesHash = opentelemetry::sdk::common::GetHashForAttributeMap(
{{kAttributesLimitOverflowKey,
kAttributesLimitOverflowValue}}); // precalculated for optimization
const MetricAttributes kOverflowAttributes = {
{kAttributesLimitOverflowKey,
kAttributesLimitOverflowValue}}; // precalculated for optimization

class AttributeHashGenerator
{
Expand All @@ -42,18 +43,19 @@ class AttributeHashGenerator
}
};

class AttributesHashMap
template <typename CustomHash = MetricAttributesHash>
class AttributesHashMapWithCustomHash
{
public:
AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit)
AttributesHashMapWithCustomHash(size_t attributes_limit = kAggregationCardinalityLimit)
: attributes_limit_(attributes_limit)
{}
Aggregation *Get(size_t hash) const
Aggregation *Get(const MetricAttributes &attributes) const
{
auto it = hash_map_.find(hash);
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
return it->second.second.get();
return it->second.get();
}
return nullptr;
}
Expand All @@ -62,7 +64,10 @@ class AttributesHashMap
* @return check if key is present in hash
*
*/
bool Has(size_t hash) const { return hash_map_.find(hash) != hash_map_.end(); }
bool Has(const MetricAttributes &attributes) const
{
return hash_map_.find(attributes) != hash_map_.end();
}

/**
* @return the pointer to value for given key if present.
Expand All @@ -71,108 +76,115 @@ class AttributesHashMap
*/
Aggregation *GetOrSetDefault(const opentelemetry::common::KeyValueIterable &attributes,
const AttributesProcessor *attributes_processor,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
auto it = hash_map_.find(hash);
MetricAttributes attr{attributes, attributes_processor};

auto it = hash_map_.find(attr);
if (it != hash_map_.end())
{
return it->second.second.get();
return it->second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes, attributes_processor};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
auto result = hash_map_.emplace(std::move(attr), aggregation_callback());
return result.first->second.get();
}

Aggregation *GetOrSetDefault(std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
auto it = hash_map_.find(hash);
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
return it->second.second.get();
return it->second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{};
hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
hash_map_[attributes] = aggregation_callback();
return hash_map_[attributes].get();
}

Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback,
size_t hash)
Aggregation *GetOrSetDefault(MetricAttributes &&attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
auto it = hash_map_.find(hash);
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
return it->second.second.get();
return it->second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
auto result = hash_map_.emplace(std::move(attributes), aggregation_callback());
return result.first->second.get();
}

/**
* Set the value for given key, overwriting the value if already present
*/
void Set(const opentelemetry::common::KeyValueIterable &attributes,
const AttributesProcessor *attributes_processor,
std::unique_ptr<Aggregation> aggr,
size_t hash)
std::unique_ptr<Aggregation> aggr)
{
auto it = hash_map_.find(hash);
MetricAttributes attr{attributes, attributes_processor};

auto it = hash_map_.find(attr);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
it->second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
hash_map_[kOverflowAttributes] = std::move(aggr);
}
else
{
MetricAttributes attr{attributes, attributes_processor};
hash_map_[hash] = {attr, std::move(aggr)};
hash_map_[std::move(attr)] = std::move(aggr);
}
}

void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr, size_t hash)
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr)
{
auto it = hash_map_.find(hash);
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
it->second.second = std::move(aggr);
it->second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
hash_map_[kOverflowAttributes] = std::move(aggr);
}
else
{
hash_map_[hash] = {attributes, std::move(aggr)};
hash_map_[attributes] = std::move(aggr);
}
}

void Set(MetricAttributes &&attributes, std::unique_ptr<Aggregation> aggr)
{
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
it->second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributes] = std::move(aggr);
}
else
{
hash_map_[std::move(attributes)] = std::move(aggr);
}
}

Expand All @@ -184,7 +196,7 @@ class AttributesHashMap
{
for (auto &kv : hash_map_)
{
if (!callback(kv.second.first, *(kv.second.second.get())))
if (!callback(kv.first, *(kv.second.get())))
{
return false; // callback is not prepared to consume data
}
Expand All @@ -197,8 +209,13 @@ class AttributesHashMap
*/
size_t Size() { return hash_map_.size(); }

#ifdef UNIT_TESTING
size_t BucketCount() { return hash_map_.bucket_count(); }
size_t BucketSize(size_t n) { return hash_map_.bucket_size(n); }
#endif

private:
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, CustomHash> hash_map_;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we store MetricAttributes as a pointer? it's going to be a lot copying on rehashing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean make the key as std::shared_ptr<MetricAttributes>, it may work with a more complicated custom hash. But I think MetricsAttributes should be created and moved into the std::unordered_map, so copying and rehashing should be in rare.

I remember the the exporting process does make a copy of MetricsAttributes, but there is an optimization for the common scenario. @lalitb any thoughts on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are already avoiding deep copy with std::move while inserting the MetricsAttributes, also during rehashing, the hashmp will move(not copy) the MetricAttributes objects to their new locations. In general, I do see options for optimization at other places, but we can visit it separate to the PR.

size_t attributes_limit_;

Aggregation *GetOrSetOveflowAttributes(
Expand All @@ -210,19 +227,21 @@ class AttributesHashMap

Aggregation *GetOrSetOveflowAttributes(std::unique_ptr<Aggregation> agg)
{
auto it = hash_map_.find(kOverflowAttributesHash);
auto it = hash_map_.find(kOverflowAttributes);
if (it != hash_map_.end())
{
return it->second.second.get();
return it->second.get();
}

MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}};
hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)};
return hash_map_[kOverflowAttributesHash].second.get();
auto result = hash_map_.emplace(kOverflowAttributes, std::move(agg));
return result.first->second.get();
}

bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); }
};

using AttributesHashMap = AttributesHashMapWithCustomHash<>;

} // namespace metrics

} // namespace sdk
Expand Down
Loading
Loading