diff --git a/jstests/aggregation/sources/group/accumulator_percentile.js b/jstests/aggregation/sources/group/accumulator_percentile.js new file mode 100644 index 0000000000000..dc4006fb10c1a --- /dev/null +++ b/jstests/aggregation/sources/group/accumulator_percentile.js @@ -0,0 +1,63 @@ +/** + * Tests that numbers that are equivalent but have different types are grouped together. + */ +(function() { + "use strict"; + const coll = db.coll; + + coll.drop(); + + assert.writeOK(coll.insert({key: new NumberInt(24), value: new NumberInt(75)})); + assert.writeOK(coll.insert({key: new NumberLong(24), value: new NumberLong(100)})); + assert.writeOK(coll.insert({key: 24, value: 36})); + + assert.writeOK(coll.insert({key: new NumberInt(42), value: new NumberInt(75)})); + assert.writeOK(coll.insert({key: new NumberLong(42), value: new NumberLong(100)})); + assert.writeOK(coll.insert({key: 42, value: 36})); + + const result1 = coll.aggregate({$group: {_id: "$key", perc_result: {$percentile: {"value":"$value","perc":20}}}}).toArray(); + + assert.eq(result1.length, 2, tojson(result1)); + + assert.eq(result1[0].perc_result, 39.900000000000006, tojson(result1)); + assert.eq(result1[1].perc_result, 39.900000000000006, tojson(result1)); + coll.drop(); + + assert.writeOK(coll.insert({temperature: 18,switch:1})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:1})); + assert.writeOK(coll.insert({temperature: 20,switch:1})); + assert.writeOK(coll.insert({temperature: 25,switch:1})); + assert.writeOK(coll.insert({temperature: 30,switch:1})); + assert.writeOK(coll.insert({temperature: 35,switch:1})); + + const result2 = db.coll.aggregate( + [ + { + '$project': { + 'valid_temp': { + '$cond': { + if: {'$eq': ['$switch', 1]}, + then: '$temperature', + else: null + } + }, + } + }, + { + "$group": { + _id: null, + perc_result: { + $percentile: { + "value":"$valid_temp", + "perc":70} + } + } + } + ]).toArray(); + + assert.eq(result2[0]['perc_result'], 28.499999999999996, tojson(result2)); + +}()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d8cd37fe5a3e6..89216aa17702b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -160,7 +160,9 @@ env.Library( 'accumulator_push.cpp', 'accumulator_std_dev.cpp', 'accumulator_sum.cpp', - 'accumulator_merge_objects.cpp' + 'accumulator_merge_objects.cpp', + 'accumulator_percentile.cpp', + '$BUILD_DIR/third_party/folly/TDigest.cpp' ], LIBDEPS=[ 'document_value', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 801c356020f2b..851d21ccca5f8 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -44,6 +44,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/summation.h" +#include "third_party/folly/TDigest.h" namespace mongo { @@ -303,6 +304,33 @@ class AccumulatorAvg final : public Accumulator { long long _count; }; +// Adding a new accumulator as 'percentile' +class AccumulatorPercentile final : public Accumulator { +public: + explicit AccumulatorPercentile(const boost::intrusive_ptr& expCtx); + + void processInternal(const Value& input, bool merging) final; + Value getValue(bool toBeMerged) final; + const char* getOpName() const final; + void reset() final; + + static boost::intrusive_ptr create( + const boost::intrusive_ptr& expCtx); + +private: + double percentile; + double digest_size = 0; + double chunk_size = 100; + mongo::TDigest digest; + + // to be digested by TDigest algorithm + std::vector values; + + // push the vector of values to create tdigest object + void _add_to_tdigest(); + + bool any_input = false; +}; class AccumulatorStdDev : public Accumulator { public: diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp new file mode 100644 index 0000000000000..cb1155f66d52d --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -0,0 +1,213 @@ + +/* + * Copyright (c) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/platform/decimal128.h" + +#include "third_party/folly/TDigest.h" + +namespace mongo { + +using boost::intrusive_ptr; + +REGISTER_ACCUMULATOR(percentile, AccumulatorPercentile::create); +REGISTER_EXPRESSION(percentile, ExpressionFromAccumulator::parse); + +const char* AccumulatorPercentile::getOpName() const { + return "$percentile"; +} + +namespace { + +const char sumName[] = "sum"; +const char countName[] = "count"; +const char maxName[] = "max"; +const char minName[] = "min"; +const char percentileName[] = "percentile"; +const char digestSizeName[] = "digest_size"; +const char centroidsName[] = "centroids"; +const char meanName[] = "mean"; +const char weightName[] = "weight"; +} // namespace + +void AccumulatorPercentile::processInternal(const Value& input, bool merging) { + + if (merging) { + verify(input.getType() == Object); + + Value digest_centroids = input[centroidsName]; + double digest_sum = input[sumName].getDouble(); + double digest_count = input[countName].getDouble(); + double digest_max = input[maxName].getDouble(); + double digest_min = input[minName].getDouble(); + double digest_size = input[digestSizeName].getDouble(); + + if (any_input == false){ + digest = mongo::TDigest(digest_size); + _memUsageBytes += sizeof(mongo::TDigest::Centroid) * digest_size; + any_input = true; + } + + std::vector centroids; + for (const auto& centroid: digest_centroids.getArray()) { + centroids.push_back(mongo::TDigest::Centroid(centroid[meanName].getDouble(), centroid[weightName].getDouble())); + }; + + digest = digest.merge({ + mongo::TDigest( + centroids, + digest_sum, + digest_count, + digest_max, + digest_min, + digest_size), + digest + }); + + this->percentile = input[percentileName].getDouble(); + return; + } + + // Determining 'digest_size' + if (this->digest_size == 0){ + if (input.getDocument()["digest_size"].missing()){ + this->digest_size = 1000; + } + else{ + this->digest_size = input.getDocument()["digest_size"].getDouble(); + } + } + + uassert(51300, "The 'percentile' should be present in the input document.", + !input.getDocument()["percentile"].missing()); + + uassert(51301, "The 'value' should be present in the input document.", + !input.getDocument()["value"].missing()); + + this->percentile = input.getDocument()["percentile"].getDouble(); + + Value input_value = input.getDocument()["value"]; + + switch (input_value.getType()) { + case NumberDecimal: + case NumberLong: + case NumberInt: + case NumberDouble: + values.push_back(input_value.getDouble()); + break; + default: + dassert(!input_value.numeric()); + return; + } + + if (any_input == false){ + digest = mongo::TDigest(digest_size); + any_input = true; + + // To add the memory used by 'values' vector. + _memUsageBytes += sizeof(double) * chunk_size; + + // To add the memory used by digest with custom size + _memUsageBytes += sizeof(mongo::TDigest::Centroid) * digest_size; + } + + if (values.size() == chunk_size){ + _add_to_tdigest(); + } +} + +intrusive_ptr AccumulatorPercentile::create( + const boost::intrusive_ptr& expCtx) { + return new AccumulatorPercentile(expCtx); +} + +Value AccumulatorPercentile::getValue(bool toBeMerged) { + + // To add the remainders + if (not values.empty()){ + _add_to_tdigest(); + } + + if (toBeMerged) { + std::vector centroids; + + for (const auto& centroid:digest.getCentroids()) { + centroids.push_back(Document{ + {"mean", centroid.mean()}, + {"weight", centroid.weight()} + }); + }; + + return Value( + Document{ + {"centroids", Value(centroids)}, + {"sum", digest.sum()}, + {"count", digest.count()}, + {"max", digest.max()}, + {"min", digest.min()}, + {"percentile", percentile}, + {"digest_size", digest_size} + } + ); + } + + if (digest.empty()){ + return Value(BSONNULL); + } + + return Value(digest.estimateQuantile(percentile)); +} + +AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) + : Accumulator(expCtx) { + _memUsageBytes = sizeof(*this); +} + +void AccumulatorPercentile::_add_to_tdigest(){ + // Sort, Push and Clear the "values" vector in each chunk + std::sort(values.begin(), values.end()); + digest = digest.merge(values); + values.clear(); +} + +void AccumulatorPercentile::reset() { + digest_size = 0; + values.clear(); + digest = mongo::TDigest(digest_size); + any_input = false; + _memUsageBytes = sizeof(*this); +} +} diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index af727e21df7ea..3d037d26903ef 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -99,6 +99,103 @@ static void assertExpectedResults( } } +TEST(Accumulators, Percentile) { + intrusive_ptr expCtx(new ExpressionContextForTest()); + assertExpectedResults( + "$percentile", + expCtx, + { + {{}, Value(BSONNULL)}, + + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 110},{"value", Value(BSONNULL)}})), + }, Value(BSONNULL)}, + + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", Value(BSONNULL)}})), + }, Value(11.00)}, + + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 40}})), + }, Value(13.00)}, + + {{ + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 10}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 20}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 30}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 40}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 42}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 43}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 44}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 45}})), + }, Value(41.25)}, + + {{ + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 97}})), + }, Value(13.00)}, + + {{ + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 97}})), + }, Value(87.25)}, + + {{ + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 97}})), + }, Value(96.7)}, + + }); +} + TEST(Accumulators, Avg) { intrusive_ptr expCtx(new ExpressionContextForTest()); assertExpectedResults( diff --git a/src/third_party/folly/TDigest.cpp b/src/third_party/folly/TDigest.cpp new file mode 100644 index 0000000000000..da25790b506a5 --- /dev/null +++ b/src/third_party/folly/TDigest.cpp @@ -0,0 +1,356 @@ +/* + * Copyright 2012-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TDigest.h" + +#include +#include +#include +#include + +namespace mongo { + +/* + * A good biased scaling function has the following properties: + * - The value of the function k(0, delta) = 0, and k(1, delta) = delta. + * This is a requirement for any t-digest function. + * - The limit of the derivative of the function dk/dq at 0 is inf, and at + * 1 is inf. This provides bias to improve accuracy at the tails. + * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy + * of upper and lower quantiles are equivalent. + * + * The scaling function used here is... + * k(q, d) = (IF q >= 0.5, d - d * sqrt(2 - 2q) / 2, d * sqrt(2q) / 2) + * + * k(0, d) = 0 + * k(1, d) = d + * + * dk/dq = (IF q >= 0.5, d / sqrt(2-2q), d / sqrt(2q)) + * limit q->1 dk/dq = inf + * limit q->0 dk/dq = inf + * + * When plotted, the derivative function is symmetric, centered at q=0.5. + * + * Note that FMA has been tested here, but benchmarks have not shown it to be a + * performance improvement. + */ + +/* + * q_to_k is unused but left here as a comment for completeness. + * double q_to_k(double q, double d) { + * if (q >= 0.5) { + * return d - d * std::sqrt(0.5 - 0.5 * q); + * } + * return d * std::sqrt(0.5 * q); + * } + */ + +static double k_to_q(double k, double d) { + double k_div_d = k / d; + if (k_div_d >= 0.5) { + double base = 1 - k_div_d; + return 1 - 2 * base * base; + } else { + return 2 * k_div_d * k_div_d; + } +} + +static double clamp(double v, double lo, double hi) { + if (v > hi) { + return hi; + } else if (v < lo) { + return lo; + } + return v; +} + +TDigest::TDigest( + std::vector centroids, + double sum, + double count, + double max_val, + double min_val, + size_t maxSize) + : maxSize_(maxSize), + sum_(sum), + count_(count), + max_(max_val), + min_(min_val) { + if (centroids.size() <= maxSize_) { + centroids_ = std::move(centroids); + } else { + // Number of centroids is greater than maxSize, we need to compress them + // When merging, resulting digest takes the maxSize of the first digest + auto sz = centroids.size(); + std::vector digests{{ + TDigest(maxSize_), + TDigest(std::move(centroids), sum_, count_, max_, min_, sz), + }}; + *this = this->merge(digests); + } +} + +TDigest TDigest::merge(const std::vector & sortedValues) const { + if (sortedValues.empty()) { + return *this; + } + + TDigest result(maxSize_); + + result.count_ = count_ + sortedValues.size(); + + double maybeMin = *sortedValues.begin(); + double maybeMax = *(sortedValues.end() - 1); + if (count_ > 0) { + // We know that min_ and max_ are numbers + result.min_ = std::min(min_, maybeMin); + result.max_ = std::max(max_, maybeMax); + } else { + // We know that min_ and max_ are NaN. + result.min_ = maybeMin; + result.max_ = maybeMax; + } + + std::vector compressed; + compressed.reserve(maxSize_); + + double k_limit = 1; + double q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; + + auto it_centroids = centroids_.begin(); + auto it_sortedValues = sortedValues.begin(); + + Centroid cur; + if (it_centroids != centroids_.end() && + it_centroids->mean() < *it_sortedValues) { + cur = *it_centroids++; + } else { + cur = Centroid(*it_sortedValues++, 1.0); + } + + double weightSoFar = cur.weight(); + + // Keep track of sums along the way to reduce expensive floating points + double sumsToMerge = 0; + double weightsToMerge = 0; + + while (it_centroids != centroids_.end() || + it_sortedValues != sortedValues.end()) { + Centroid next; + + if (it_centroids != centroids_.end() && + (it_sortedValues == sortedValues.end() || + it_centroids->mean() < *it_sortedValues)) { + next = *it_centroids++; + } else { + next = Centroid(*it_sortedValues++, 1.0); + } + + double nextSum = next.mean() * next.weight(); + weightSoFar += next.weight(); + + if (weightSoFar <= q_limit_times_count) { + sumsToMerge += nextSum; + weightsToMerge += next.weight(); + } else { + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + sumsToMerge = 0; + weightsToMerge = 0; + compressed.push_back(cur); + q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; + cur = next; + } + } + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + compressed.push_back(cur); + compressed.shrink_to_fit(); + + // Deal with floating point precision + std::sort(compressed.begin(), compressed.end()); + + result.centroids_ = std::move(compressed); + return result; +} + +TDigest TDigest::merge(const std::vector& digests) { + size_t nCentroids = 0; + for (auto it = digests.begin(); it != digests.end(); it++) { + nCentroids += it->centroids_.size(); + } + + if (nCentroids == 0) { + return TDigest(); + } + + std::vector centroids; + centroids.reserve(nCentroids); + + std::vector::iterator> starts; + starts.reserve(digests.size()); + + double count = 0; + + // We can safely use these limits to avoid isnan checks below because we know + // nCentroids > 0, so at least one TDigest has a min and max. + double min = std::numeric_limits::infinity(); + double max = -std::numeric_limits::infinity(); + + for (auto it = digests.begin(); it != digests.end(); it++) { + starts.push_back(centroids.end()); + double curCount = it->count(); + if (curCount > 0) { + //DCHECK(!std::isnan(it->min_)); + //DCHECK(!std::isnan(it->max_)); + min = std::min(min, it->min_); + max = std::max(max, it->max_); + count += curCount; + for (const auto& centroid : it->centroids_) { + centroids.push_back(centroid); + } + } + } + + for (size_t digestsPerBlock = 1; digestsPerBlock < starts.size(); + digestsPerBlock *= 2) { + // Each sorted block is digestPerBlock digests big. For each step, try to + // merge two blocks together. + for (size_t i = 0; i < starts.size(); i += (digestsPerBlock * 2)) { + // It is possible that this block is incomplete (less than digestsPerBlock + // big). In that case, the rest of the block is sorted and leave it alone + if (i + digestsPerBlock < starts.size()) { + auto first = starts[i]; + auto middle = starts[i + digestsPerBlock]; + + // It is possible that the next block is incomplete (less than + // digestsPerBlock big). In that case, merge to end. Otherwise, merge to + // the end of that block. + std::vector::iterator last = + (i + (digestsPerBlock * 2) < starts.size()) + ? *(starts.begin() + i + 2 * digestsPerBlock) + : centroids.end(); + std::inplace_merge(first, middle, last); + } + } + } + + //DCHECK(std::is_sorted(centroids.begin(), centroids.end())); + + size_t maxSize = digests.begin()->maxSize_; + TDigest result(maxSize); + + std::vector compressed; + compressed.reserve(maxSize); + + double k_limit = 1; + double q_limit_times_count = k_to_q(k_limit, maxSize) * count; + + Centroid cur = centroids.front(); + double weightSoFar = cur.weight(); + double sumsToMerge = 0; + double weightsToMerge = 0; + for (auto it = centroids.begin() + 1; it != centroids.end(); ++it) { + weightSoFar += it->weight(); + if (weightSoFar <= q_limit_times_count) { + sumsToMerge += it->mean() * it->weight(); + weightsToMerge += it->weight(); + } else { + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + sumsToMerge = 0; + weightsToMerge = 0; + compressed.push_back(cur); + q_limit_times_count = k_to_q(k_limit++, maxSize) * count; + cur = *it; + } + } + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + compressed.push_back(cur); + compressed.shrink_to_fit(); + + // Deal with floating point precision + std::sort(compressed.begin(), compressed.end()); + + result.count_ = count; + result.min_ = min; + result.max_ = max; + result.centroids_ = std::move(compressed); + return result; +} + +double TDigest::estimateQuantile(double q) const { + if (centroids_.empty()) { + return 0.0; + } + double rank = q * count_; + + size_t pos; + double t; + if (q > 0.5) { + if (q >= 1.0) { + return max_; + } + pos = 0; + t = count_; + for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) { + t -= rit->weight(); + if (rank >= t) { + pos = std::distance(rit, centroids_.rend()) - 1; + break; + } + } + } else { + if (q <= 0.0) { + return min_; + } + pos = centroids_.size() - 1; + t = 0; + for (auto it = centroids_.begin(); it != centroids_.end(); ++it) { + if (rank < t + it->weight()) { + pos = std::distance(centroids_.begin(), it); + break; + } + t += it->weight(); + } + } + + double delta = 0; + double min = min_; + double max = max_; + if (centroids_.size() > 1) { + if (pos == 0) { + delta = centroids_[pos + 1].mean() - centroids_[pos].mean(); + max = centroids_[pos + 1].mean(); + } else if (pos == centroids_.size() - 1) { + delta = centroids_[pos].mean() - centroids_[pos - 1].mean(); + min = centroids_[pos - 1].mean(); + } else { + delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2; + min = centroids_[pos - 1].mean(); + max = centroids_[pos + 1].mean(); + } + } + auto value = centroids_[pos].mean() + + ((rank - t) / centroids_[pos].weight() - 0.5) * delta; + return clamp(value, min, max); +} + +double TDigest::Centroid::add(double sum, double weight) { + sum += (mean_ * weight_); + weight_ += weight; + mean_ = sum / weight_; + return sum; +} + +} // namespace folly => merged with existing mongo namespace diff --git a/src/third_party/folly/TDigest.h b/src/third_party/folly/TDigest.h new file mode 100644 index 0000000000000..b4baaa40a7c9d --- /dev/null +++ b/src/third_party/folly/TDigest.h @@ -0,0 +1,147 @@ +/* + * Copyright 2012-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace mongo { + +/* + * TDigests are a biased quantile estimator designed to estimate the values of + * the quantiles of streaming data with high accuracy and low memory, + * particularly for quantiles at the tails (p0.1, p1, p99, p99.9). See + * https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf + * for an explanation of what the purpose of TDigests is, and how they work. + * + * There is a notable difference between the implementation here and the + * implementation in the paper. In the paper, the recommended scaling function + * for bucketing centroids is an arcsin function. The arcsin function provides + * high accuracy for low memory, but comes at a relatively high compute cost. + * A good choice algorithm has the following properties: + * - The value of the function k(0, delta) = 0, and k(1, delta) = delta. + * This is a requirement for any t-digest function. + * - The limit of the derivative of the function dk/dq at 0 is inf, and at + * 1 is inf. This provides bias to improve accuracy at the tails. + * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy + * of upper and lower quantiles are equivalent. + * As such, TDigest uses a sqrt function with these properties, which is faster + * than arcsin. There is a small, but relatively negligible impact to accuracy + * at the tail. In empirical tests, accuracy of the sqrt approach has been + * adequate. + */ +class TDigest { + public: + class Centroid { + public: + explicit Centroid(double mean = 0.0, double weight = 1.0) + : mean_(mean), weight_(weight) { + //DCHECK_GT(weight, 0); + } + + inline double mean() const { + return mean_; + } + + inline double weight() const { + return weight_; + } + + /* + * Adds the sum/weight to this centroid, and returns the new sum. + */ + inline double add(double sum, double weight); + + inline bool operator<(const Centroid& other) const { + return mean() < other.mean(); + } + + private: + double mean_; + double weight_; + }; + + explicit TDigest(size_t maxSize = 100) + : maxSize_(maxSize), sum_(0.0), count_(0.0), max_(NAN), min_(NAN) {} + + explicit TDigest( + std::vector centroids, + double sum, + double count, + double max_val, + double min_val, + size_t maxSize = 100); + + /* + * Returns a new TDigest constructed with values merged from the current + * digest and the given sortedValues. + */ + TDigest merge(const std::vector & sortedValues) const; + + /* + * Returns a new TDigest constructed with values merged from the given + * digests. + */ + static TDigest merge(const std::vector& digests); + + /* + * Estimates the value of the given quantile. + */ + double estimateQuantile(double q) const; + + double mean() const { + return count_ ? sum_ / count_ : 0; + } + + double sum() const { + return sum_; + } + + double count() const { + return count_; + } + + double min() const { + return min_; + } + + double max() const { + return max_; + } + + bool empty() const { + return centroids_.empty(); + } + + const std::vector& getCentroids() const { + return centroids_; + } + + size_t maxSize() const { + return maxSize_; + } + + private: + std::vector centroids_; + size_t maxSize_; + double sum_; + double count_; + double max_; + double min_; +}; + +} // namespace folly