Skip to content

Commit 85bc849

Browse files
authored
Use long in Centroid count (#99491) (#99534)
* Use long in Centroid count Centroids currently use integers to track how many samples their mean tracks. This can overflow in case the digest tracks billions of samples or more. TDigestState already serializes the count as VLong, so it can be read as VInt without compatibility issues. Fixes #80153 * Update docs/changelog/99491.yaml * More test fixes * Bump TransportVersion * Revert TransportVersion change
1 parent a6dd6a9 commit 85bc849

File tree

13 files changed

+41
-35
lines changed

13 files changed

+41
-35
lines changed

docs/changelog/99491.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 99491
2+
summary: Use long in Centroid count
3+
area: Aggregations
4+
type: bug
5+
issues:
6+
- 80153

libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
final class AVLGroupTree extends AbstractCollection<Centroid> {
3232
/* For insertions into the tree */
3333
private double centroid;
34-
private int count;
34+
private long count;
3535
private double[] centroids;
36-
private int[] counts;
37-
private int[] aggregatedCounts;
36+
private long[] counts;
37+
private long[] aggregatedCounts;
3838
private final IntAVLTree tree;
3939

4040
AVLGroupTree() {
@@ -78,8 +78,8 @@ protected void fixAggregates(int node) {
7878

7979
};
8080
centroids = new double[tree.capacity()];
81-
counts = new int[tree.capacity()];
82-
aggregatedCounts = new int[tree.capacity()];
81+
counts = new long[tree.capacity()];
82+
aggregatedCounts = new long[tree.capacity()];
8383
}
8484

8585
/**
@@ -113,14 +113,14 @@ public double mean(int node) {
113113
/**
114114
* Return the count for the provided node.
115115
*/
116-
public int count(int node) {
116+
public long count(int node) {
117117
return counts[node];
118118
}
119119

120120
/**
121121
* Add the provided centroid to the tree.
122122
*/
123-
public void add(double centroid, int count) {
123+
public void add(double centroid, long count) {
124124
this.centroid = centroid;
125125
this.count = count;
126126
tree.add();
@@ -135,7 +135,7 @@ public boolean add(Centroid centroid) {
135135
/**
136136
* Update values associated with a node, readjusting the tree if necessary.
137137
*/
138-
public void update(int node, double centroid, int count) {
138+
public void update(int node, double centroid, long count) {
139139
// have to do full scale update
140140
this.centroid = centroid;
141141
this.count = count;
@@ -242,7 +242,7 @@ public void remove() {
242242
/**
243243
* Return the total count of points that have been added to the tree.
244244
*/
245-
public int sum() {
245+
public long sum() {
246246
return aggregatedCounts[tree.root()];
247247
}
248248

libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public int centroidCount() {
6868
}
6969

7070
@Override
71-
public void add(double x, int w) {
71+
public void add(double x, long w) {
7272
checkValue(x);
7373
needsCompression = true;
7474

@@ -84,7 +84,7 @@ public void add(double x, int w) {
8484
}
8585

8686
if (start == NIL) { // empty summary
87-
assert summary.size() == 0;
87+
assert summary.isEmpty();
8888
summary.add(x, w);
8989
count = w;
9090
} else {
@@ -127,7 +127,7 @@ public void add(double x, int w) {
127127
// if the nearest point was not unique, then we may not be modifying the first copy
128128
// which means that ordering can change
129129
double centroid = summary.mean(closest);
130-
int count = summary.count(closest);
130+
long count = summary.count(closest);
131131
centroid = weightedAverage(centroid, count, x, w);
132132
count += w;
133133
summary.update(closest, centroid, count);
@@ -189,7 +189,7 @@ public long size() {
189189
@Override
190190
public double cdf(double x) {
191191
AVLGroupTree values = summary;
192-
if (values.size() == 0) {
192+
if (values.isEmpty()) {
193193
return Double.NaN;
194194
}
195195
if (values.size() == 1) {
@@ -272,7 +272,7 @@ public double quantile(double q) {
272272
}
273273

274274
AVLGroupTree values = summary;
275-
if (values.size() == 0) {
275+
if (values.isEmpty()) {
276276
// no centroids means no data, no way to get a quantile
277277
return Double.NaN;
278278
} else if (values.size() == 1) {
@@ -293,7 +293,7 @@ public double quantile(double q) {
293293
}
294294

295295
int currentNode = values.first();
296-
int currentWeight = values.count(currentNode);
296+
long currentWeight = values.count(currentNode);
297297

298298
// Total mass to the left of the center of the current node.
299299
double weightSoFar = currentWeight / 2.0;
@@ -305,7 +305,7 @@ public double quantile(double q) {
305305

306306
for (int i = 0; i < values.size() - 1; i++) {
307307
int nextNode = values.next(currentNode);
308-
int nextWeight = values.count(nextNode);
308+
long nextWeight = values.count(nextNode);
309309
// this is the mass between current center and next center
310310
double dw = (currentWeight + nextWeight) / 2.0;
311311

libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class Centroid implements Comparable<Centroid> {
3030
private static final AtomicInteger uniqueCount = new AtomicInteger(1);
3131

3232
private double centroid = 0;
33-
private int count = 0;
33+
private long count = 0;
3434

3535
// The ID is transient because it must be unique within a given JVM. A new
3636
// ID should be generated from uniqueCount when a Centroid is deserialized.
@@ -45,22 +45,22 @@ public Centroid(double x) {
4545
start(x, 1, uniqueCount.getAndIncrement());
4646
}
4747

48-
public Centroid(double x, int w) {
48+
public Centroid(double x, long w) {
4949
this();
5050
start(x, w, uniqueCount.getAndIncrement());
5151
}
5252

53-
public Centroid(double x, int w, int id) {
53+
public Centroid(double x, long w, int id) {
5454
this();
5555
start(x, w, id);
5656
}
5757

58-
private void start(double x, int w, int id) {
58+
private void start(double x, long w, int id) {
5959
this.id = id;
6060
add(x, w);
6161
}
6262

63-
public void add(double x, int w) {
63+
public void add(double x, long w) {
6464
count += w;
6565
centroid += w * (x - centroid) / count;
6666
}
@@ -69,7 +69,7 @@ public double mean() {
6969
return centroid;
7070
}
7171

72-
public int count() {
72+
public long count() {
7373
return count;
7474
}
7575

libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class HybridDigest extends AbstractTDigest {
7070
}
7171

7272
@Override
73-
public void add(double x, int w) {
73+
public void add(double x, long w) {
7474
reserve(w);
7575
if (mergingDigest != null) {
7676
mergingDigest.add(x, w);

libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public MergingDigest(double compression, int bufferSize, int size) {
216216
}
217217

218218
@Override
219-
public void add(double x, int w) {
219+
public void add(double x, long w) {
220220
checkValue(x);
221221
if (tempUsed >= tempWeight.length - lastUsedCell - 1) {
222222
mergeNewValues();
@@ -514,7 +514,7 @@ public boolean hasNext() {
514514

515515
@Override
516516
public Centroid next() {
517-
Centroid rc = new Centroid(mean[i], (int) weight[i]);
517+
Centroid rc = new Centroid(mean[i], (long) weight[i]);
518518
i++;
519519
return rc;
520520
}

libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class SortingDigest extends AbstractTDigest {
3939
private boolean isSorted = true;
4040

4141
@Override
42-
public void add(double x, int w) {
42+
public void add(double x, long w) {
4343
checkValue(x);
4444
isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x);
4545
for (int i = 0; i < w; i++) {

libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static TDigest createHybridDigest(double compression) {
9494
* @param x The value to add.
9595
* @param w The weight of this point.
9696
*/
97-
public abstract void add(double x, int w);
97+
public abstract void add(double x, long w);
9898

9999
/**
100100
* Add a single sample to this TDigest.

libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void testSingletonsAtEnds() {
118118
d.add(x);
119119
}
120120
}
121-
int last = 0;
121+
long last = 0;
122122
for (Centroid centroid : d.centroids()) {
123123
if (last == 0) {
124124
assertEquals(1, centroid.count());

server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public EmptyTDigestState() {
1515
}
1616

1717
@Override
18-
public void add(double x, int w) {
18+
public void add(double x, long w) {
1919
throw new UnsupportedOperationException("Immutable Empty TDigest");
2020
}
2121

0 commit comments

Comments
 (0)