-
Notifications
You must be signed in to change notification settings - Fork 501
[METRICS SDK] Fix hash collision in MetricAttributes #3322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e98e336
adf6126
3196544
938b241
274ed36
7f2aba2
1cad020
59fa221
5ebc039
e88c7ae
1d0ebea
3dcb0ad
6b899ef
d836c10
44a467a
a7698d0
72ef04e
d9d0e82
f17399c
809a70a
2c017e3
a5735f3
95a9f7f
eef46df
7cc78be
4a8b77f
adcd477
47409a5
ce7dc77
7bb5652
8d5fe18
5e2778c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,6 +125,7 @@ class ReservoirCell | |
| res.erase(it); | ||
| } | ||
| } | ||
| res.UpdateHash(); | ||
| return res; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| #include "opentelemetry/sdk/common/attribute_utils.h" | ||
| #include "opentelemetry/sdk/common/attributemap_hash.h" | ||
| #include "opentelemetry/sdk/metrics/aggregation/aggregation.h" | ||
| #include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h" | ||
| #include "opentelemetry/sdk/metrics/view/attributes_processor.h" | ||
| #include "opentelemetry/version.h" | ||
|
|
||
|
|
@@ -29,9 +30,9 @@ using opentelemetry::sdk::common::OrderedAttributeMap; | |
| constexpr size_t kAggregationCardinalityLimit = 2000; | ||
| const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow"; | ||
| const bool kAttributesLimitOverflowValue = true; | ||
| const size_t kOverflowAttributesHash = opentelemetry::sdk::common::GetHashForAttributeMap( | ||
| {{kAttributesLimitOverflowKey, | ||
| kAttributesLimitOverflowValue}}); // precalculated for optimization | ||
| const MetricAttributes kOverflowAttributes = { | ||
| {kAttributesLimitOverflowKey, | ||
| kAttributesLimitOverflowValue}}; // precalculated for optimization | ||
|
|
||
| class AttributeHashGenerator | ||
| { | ||
|
|
@@ -42,18 +43,19 @@ class AttributeHashGenerator | |
| } | ||
| }; | ||
|
|
||
| class AttributesHashMap | ||
| template <typename CustomHash = MetricAttributesHash> | ||
| class AttributesHashMapWithCustomHash | ||
| { | ||
| public: | ||
| AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit) | ||
| AttributesHashMapWithCustomHash(size_t attributes_limit = kAggregationCardinalityLimit) | ||
| : attributes_limit_(attributes_limit) | ||
| {} | ||
| Aggregation *Get(size_t hash) const | ||
| Aggregation *Get(const MetricAttributes &attributes) const | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| auto it = hash_map_.find(attributes); | ||
| if (it != hash_map_.end()) | ||
| { | ||
| return it->second.second.get(); | ||
| return it->second.get(); | ||
| } | ||
| return nullptr; | ||
| } | ||
|
|
@@ -62,7 +64,10 @@ class AttributesHashMap | |
| * @return check if key is present in hash | ||
| * | ||
| */ | ||
| bool Has(size_t hash) const { return hash_map_.find(hash) != hash_map_.end(); } | ||
| bool Has(const MetricAttributes &attributes) const | ||
| { | ||
| return hash_map_.find(attributes) != hash_map_.end(); | ||
| } | ||
|
|
||
| /** | ||
| * @return the pointer to value for given key if present. | ||
|
|
@@ -71,108 +76,103 @@ class AttributesHashMap | |
| */ | ||
| Aggregation *GetOrSetDefault(const opentelemetry::common::KeyValueIterable &attributes, | ||
| const AttributesProcessor *attributes_processor, | ||
| std::function<std::unique_ptr<Aggregation>()> aggregation_callback, | ||
| size_t hash) | ||
| std::function<std::unique_ptr<Aggregation>()> aggregation_callback) | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| // TODO: avoid constructing MetricAttributes from KeyValueIterable for | ||
ThomsonTan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // hash_map_.find which is a heavy operation | ||
| MetricAttributes attr{attributes, attributes_processor}; | ||
ThomsonTan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| auto it = hash_map_.find(attr); | ||
| if (it != hash_map_.end()) | ||
| { | ||
| return it->second.second.get(); | ||
| return it->second.get(); | ||
| } | ||
|
|
||
| if (IsOverflowAttributes()) | ||
| { | ||
| return GetOrSetOveflowAttributes(aggregation_callback); | ||
| } | ||
|
|
||
| MetricAttributes attr{attributes, attributes_processor}; | ||
|
|
||
| hash_map_[hash] = {attr, aggregation_callback()}; | ||
| return hash_map_[hash].second.get(); | ||
| auto result = hash_map_.emplace(std::move(attr), aggregation_callback()); | ||
| return result.first->second.get(); | ||
| } | ||
|
|
||
| Aggregation *GetOrSetDefault(std::function<std::unique_ptr<Aggregation>()> aggregation_callback, | ||
| size_t hash) | ||
| Aggregation *GetOrSetDefault(const MetricAttributes &attributes, | ||
| std::function<std::unique_ptr<Aggregation>()> aggregation_callback) | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| auto it = hash_map_.find(attributes); | ||
ThomsonTan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (it != hash_map_.end()) | ||
| { | ||
| return it->second.second.get(); | ||
| return it->second.get(); | ||
| } | ||
|
|
||
| if (IsOverflowAttributes()) | ||
| { | ||
| return GetOrSetOveflowAttributes(aggregation_callback); | ||
| } | ||
|
|
||
| MetricAttributes attr{}; | ||
| hash_map_[hash] = {attr, aggregation_callback()}; | ||
| return hash_map_[hash].second.get(); | ||
| hash_map_[attributes] = aggregation_callback(); | ||
| return hash_map_[attributes].get(); | ||
| } | ||
|
|
||
| Aggregation *GetOrSetDefault(const MetricAttributes &attributes, | ||
| std::function<std::unique_ptr<Aggregation>()> aggregation_callback, | ||
| size_t hash) | ||
| Aggregation *GetOrSetDefault(MetricAttributes &&attributes, | ||
| std::function<std::unique_ptr<Aggregation>()> aggregation_callback) | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| auto it = hash_map_.find(attributes); | ||
| if (it != hash_map_.end()) | ||
| { | ||
| return it->second.second.get(); | ||
| return it->second.get(); | ||
| } | ||
|
|
||
| if (IsOverflowAttributes()) | ||
| { | ||
| return GetOrSetOveflowAttributes(aggregation_callback); | ||
| } | ||
|
|
||
| MetricAttributes attr{attributes}; | ||
|
|
||
| hash_map_[hash] = {attr, aggregation_callback()}; | ||
| return hash_map_[hash].second.get(); | ||
| auto result = hash_map_.emplace(std::move(attributes), aggregation_callback()); | ||
| return result.first->second.get(); | ||
| } | ||
|
|
||
| /** | ||
| * Set the value for given key, overwriting the value if already present | ||
| */ | ||
| void Set(const opentelemetry::common::KeyValueIterable &attributes, | ||
| const AttributesProcessor *attributes_processor, | ||
| std::unique_ptr<Aggregation> aggr, | ||
| size_t hash) | ||
| std::unique_ptr<Aggregation> aggr) | ||
| { | ||
| Set(MetricAttributes{attributes, attributes_processor}, std::move(aggr)); | ||
| } | ||
|
|
||
| void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr) | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| auto it = hash_map_.find(attributes); | ||
lalitb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (it != hash_map_.end()) | ||
| { | ||
| it->second.second = std::move(aggr); | ||
| it->second = std::move(aggr); | ||
| } | ||
| else if (IsOverflowAttributes()) | ||
| { | ||
| hash_map_[kOverflowAttributesHash] = { | ||
| MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, | ||
| std::move(aggr)}; | ||
| hash_map_[kOverflowAttributes] = std::move(aggr); | ||
| } | ||
| else | ||
| { | ||
| MetricAttributes attr{attributes, attributes_processor}; | ||
| hash_map_[hash] = {attr, std::move(aggr)}; | ||
| hash_map_[attributes] = std::move(aggr); | ||
| } | ||
| } | ||
|
|
||
| void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> aggr, size_t hash) | ||
| void Set(MetricAttributes &&attributes, std::unique_ptr<Aggregation> aggr) | ||
| { | ||
| auto it = hash_map_.find(hash); | ||
| auto it = hash_map_.find(attributes); | ||
| if (it != hash_map_.end()) | ||
| { | ||
| it->second.second = std::move(aggr); | ||
| it->second = std::move(aggr); | ||
| } | ||
| else if (IsOverflowAttributes()) | ||
| { | ||
| hash_map_[kOverflowAttributesHash] = { | ||
| MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}, | ||
| std::move(aggr)}; | ||
| hash_map_[kOverflowAttributes] = std::move(aggr); | ||
| } | ||
| else | ||
| { | ||
| hash_map_[hash] = {attributes, std::move(aggr)}; | ||
| hash_map_[std::move(attributes)] = std::move(aggr); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -184,7 +184,7 @@ class AttributesHashMap | |
| { | ||
| for (auto &kv : hash_map_) | ||
| { | ||
| if (!callback(kv.second.first, *(kv.second.second.get()))) | ||
| if (!callback(kv.first, *(kv.second.get()))) | ||
| { | ||
| return false; // callback is not prepared to consume data | ||
| } | ||
|
|
@@ -197,8 +197,13 @@ class AttributesHashMap | |
| */ | ||
| size_t Size() { return hash_map_.size(); } | ||
|
|
||
| #ifdef UNIT_TESTING | ||
| size_t BucketCount() { return hash_map_.bucket_count(); } | ||
| size_t BucketSize(size_t n) { return hash_map_.bucket_size(n); } | ||
| #endif | ||
|
|
||
| private: | ||
| std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_; | ||
| std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, CustomHash> hash_map_; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we store MetricAttributes as a pointer? it's going to be a lot copying on rehashing... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean make the key as I remember the the exporting process does make a copy of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we are already avoiding deep copy with std::move while inserting the MetricsAttributes, also during rehashing, the hashmp will move(not copy) the MetricAttributes objects to their new locations. In general, I do see options for optimization at other places, but we can visit it separate to the PR. |
||
| size_t attributes_limit_; | ||
|
|
||
| Aggregation *GetOrSetOveflowAttributes( | ||
|
|
@@ -210,19 +215,21 @@ class AttributesHashMap | |
|
|
||
| Aggregation *GetOrSetOveflowAttributes(std::unique_ptr<Aggregation> agg) | ||
| { | ||
| auto it = hash_map_.find(kOverflowAttributesHash); | ||
| auto it = hash_map_.find(kOverflowAttributes); | ||
| if (it != hash_map_.end()) | ||
| { | ||
| return it->second.second.get(); | ||
| return it->second.get(); | ||
| } | ||
|
|
||
| MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}; | ||
| hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)}; | ||
| return hash_map_[kOverflowAttributesHash].second.get(); | ||
| auto result = hash_map_.emplace(kOverflowAttributes, std::move(agg)); | ||
| return result.first->second.get(); | ||
| } | ||
|
|
||
| bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); } | ||
| }; | ||
|
|
||
| using AttributesHashMap = AttributesHashMapWithCustomHash<>; | ||
|
|
||
| } // namespace metrics | ||
|
|
||
| } // namespace sdk | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not expected to calculate hash on
KeyValueIterableby traversing it, because the order of the elements should not affect the hashing.MetricAttributesneed to be constructed from it before hash. So remove this function.