Skip to content

Commit b0fcdbe

Browse files
peteralfonsiPeter Alfonsi
andauthored
Reapply "Switch percentiles implementation to MergingDigest (#18124)" (#19648)
Signed-off-by: Peter Alfonsi <[email protected]> Co-authored-by: Peter Alfonsi <[email protected]>
1 parent 71f4671 commit b0fcdbe

File tree

8 files changed

+72
-91
lines changed

8 files changed

+72
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233))
1111
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
1212
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
13-
1413
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
1514

1615
### Changed
@@ -19,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1918
- Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584))
2019
- Onboarding new maven snapshots publishing to s3 ([#19619](https://github.com/opensearch-project/OpenSearch/pull/19619))
2120
- Remove MultiCollectorWrapper and use MultiCollector in Lucene instead ([#19595](https://github.com/opensearch-project/OpenSearch/pull/19595))
21+
- Change implementation for `percentiles` aggregation for latency improvement ([#19648](https://github.com/opensearch-project/OpenSearch/pull/19648))
2222

2323
### Fixed
2424
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ protobuf = "3.25.8"
2727
jakarta_annotation = "1.3.5"
2828
google_http_client = "1.44.1"
2929
google_auth = "1.29.0"
30-
tdigest = "3.3"
30+
tdigest = "3.3" # Warning: Before updating tdigest, ensure its serialization code for MergingDigest hasn't changed
3131
hdrhistogram = "2.2.2"
3232
grpc = "1.75.0"
3333
json_smart = "2.5.2"

server/src/main/java/org/opensearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Map;
4444
import java.util.Objects;
4545

46+
import com.tdunning.math.stats.Centroid;
47+
4648
/**
4749
* Implementation of median absolute deviation agg
4850
*
@@ -57,11 +59,14 @@ static double computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
5759
} else {
5860
final double approximateMedian = valuesSketch.quantile(0.5);
5961
final TDigestState approximatedDeviationsSketch = new TDigestState(valuesSketch.compression());
60-
valuesSketch.centroids().forEach(centroid -> {
62+
for (Centroid centroid : valuesSketch.centroids()) {
6163
final double deviation = Math.abs(approximateMedian - centroid.mean());
62-
approximatedDeviationsSketch.add(deviation, centroid.count());
63-
});
64-
64+
// Weighted add() isn't supported for faster MergingDigest implementation, so add iteratively instead. see
65+
// https://github.com/tdunning/t-digest/issues/167
66+
for (int i = 0; i < centroid.count(); i++) {
67+
approximatedDeviationsSketch.add(deviation);
68+
}
69+
}
6570
return approximatedDeviationsSketch.quantile(0.5);
6671
}
6772
}

server/src/main/java/org/opensearch/search/aggregations/metrics/TDigestState.java

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,25 @@
3131

3232
package org.opensearch.search.aggregations.metrics;
3333

34+
import org.opensearch.Version;
3435
import org.opensearch.core.common.io.stream.StreamInput;
3536
import org.opensearch.core.common.io.stream.StreamOutput;
3637

3738
import java.io.IOException;
39+
import java.nio.ByteBuffer;
3840
import java.util.Iterator;
41+
import java.util.List;
3942

4043
import com.tdunning.math.stats.AVLTreeDigest;
4144
import com.tdunning.math.stats.Centroid;
45+
import com.tdunning.math.stats.MergingDigest;
4246

4347
/**
4448
* Extension of {@link com.tdunning.math.stats.TDigest} with custom serialization.
4549
*
4650
* @opensearch.internal
4751
*/
48-
public class TDigestState extends AVLTreeDigest {
52+
public class TDigestState extends MergingDigest {
4953

5054
private final double compression;
5155

@@ -54,28 +58,66 @@ public TDigestState(double compression) {
5458
this.compression = compression;
5559
}
5660

61+
private TDigestState(double compression, MergingDigest in) {
62+
super(compression);
63+
this.compression = compression;
64+
this.add(List.of(in));
65+
}
66+
5767
@Override
5868
public double compression() {
5969
return compression;
6070
}
6171

6272
public static void write(TDigestState state, StreamOutput out) throws IOException {
63-
out.writeDouble(state.compression);
64-
out.writeVInt(state.centroidCount());
65-
for (Centroid centroid : state.centroids()) {
66-
out.writeDouble(centroid.mean());
67-
out.writeVLong(centroid.count());
73+
if (out.getVersion().before(Version.V_3_4_0)) {
74+
out.writeDouble(state.compression);
75+
out.writeVInt(state.centroidCount());
76+
for (Centroid centroid : state.centroids()) {
77+
out.writeDouble(centroid.mean());
78+
out.writeVLong(centroid.count());
79+
}
80+
} else {
81+
int byteSize = state.byteSize();
82+
out.writeVInt(byteSize);
83+
ByteBuffer buf = ByteBuffer.allocate(byteSize);
84+
state.asBytes(buf);
85+
out.writeBytes(buf.array());
6886
}
6987
}
7088

7189
public static TDigestState read(StreamInput in) throws IOException {
72-
double compression = in.readDouble();
73-
TDigestState state = new TDigestState(compression);
74-
int n = in.readVInt();
75-
for (int i = 0; i < n; i++) {
76-
state.add(in.readDouble(), in.readVInt());
90+
if (in.getVersion().before(Version.V_3_4_0)) {
91+
// In older versions TDigestState was based on AVLTreeDigest. Load centroids into this class, then add it to MergingDigest.
92+
double compression = in.readDouble();
93+
94+
int n = in.readVInt();
95+
if (n <= 0) {
96+
return new TDigestState(compression);
97+
}
98+
AVLTreeDigest treeDigest = new AVLTreeDigest(compression);
99+
for (int i = 0; i < n; i++) {
100+
treeDigest.add(in.readDouble(), in.readVInt());
101+
}
102+
TDigestState state = new TDigestState(compression);
103+
state.add(List.of(treeDigest));
104+
return state;
105+
106+
} else {
107+
// For MergingDigest, adding the original centroids in ascending order to a new, empty MergingDigest isn't guaranteed
108+
// to produce a MergingDigest whose centroids are exactly equal to the originals.
109+
// So, use the library's serialization code to ensure we get the exact same centroids, allowing us to compare with equals().
110+
// The AVLTreeDigest had the same limitation for equals() where it was only guaranteed to return true if the other object was
111+
// produced by de/serializing the object, so this should be fine.
112+
int byteSize = in.readVInt();
113+
byte[] bytes = new byte[byteSize];
114+
in.readBytes(bytes, 0, byteSize);
115+
MergingDigest mergingDigest = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
116+
if (mergingDigest.centroids().isEmpty()) {
117+
return new TDigestState(mergingDigest.compression());
118+
}
119+
return new TDigestState(mergingDigest.compression(), mergingDigest);
77120
}
78-
return state;
79121
}
80122

81123
@Override

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesRanksTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentileRanks createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroidCount() <= values.length);
57+
assertTrue(state.centroids().size() <= values.length);
5858
return new InternalTDigestPercentileRanks(name, percents, state, keyed, format, metadata);
5959
}
6060

@@ -66,7 +66,7 @@ protected void assertReduced(InternalTDigestPercentileRanks reduced, List<Intern
6666
double max = Double.NEGATIVE_INFINITY;
6767
long totalCount = 0;
6868
for (InternalTDigestPercentileRanks ranks : inputs) {
69-
if (ranks.state.centroidCount() == 0) {
69+
if (ranks.state.centroids().isEmpty()) {
7070
// quantiles would return NaN
7171
continue;
7272
}

server/src/test/java/org/opensearch/search/aggregations/metrics/InternalTDigestPercentilesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ protected InternalTDigestPercentiles createTestInstance(
5454
Arrays.stream(values).forEach(state::add);
5555

5656
// the number of centroids is defined as <= the number of samples inserted
57-
assertTrue(state.centroidCount() <= values.length);
57+
assertTrue(state.centroids().size() <= values.length);
5858
return new InternalTDigestPercentiles(name, percents, state, keyed, format, metadata);
5959
}
6060

server/src/test/java/org/opensearch/search/aggregations/metrics/TDigestPercentilesAggregatorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testSomeMatchesSortedNumericDocValues() throws IOException {
104104
iw.addDocument(singleton(new SortedNumericDocValuesField("number", 0)));
105105
}, tdigest -> {
106106
assertEquals(7L, tdigest.state.size());
107-
assertEquals(7L, tdigest.state.centroidCount());
107+
assertEquals(7L, tdigest.state.centroids().size());
108108
assertEquals(5.0d, tdigest.percentile(75), 0.0d);
109109
assertEquals("5.0", tdigest.percentileAsString(75));
110110
assertEquals(3.0d, tdigest.percentile(71), 0.0d);
@@ -128,7 +128,7 @@ public void testSomeMatchesNumericDocValues() throws IOException {
128128
iw.addDocument(singleton(new NumericDocValuesField("number", 0)));
129129
}, tdigest -> {
130130
assertEquals(tdigest.state.size(), 7L);
131-
assertTrue(tdigest.state.centroidCount() <= 7L);
131+
assertTrue(tdigest.state.centroids().size() <= 7L);
132132
assertEquals(8.0d, tdigest.percentile(100), 0.0d);
133133
assertEquals("8.0", tdigest.percentileAsString(100));
134134
assertEquals(8.0d, tdigest.percentile(88), 0.0d);
@@ -156,7 +156,7 @@ public void testQueryFiltering() throws IOException {
156156

157157
testCase(LongPoint.newRangeQuery("row", 1, 4), docs, tdigest -> {
158158
assertEquals(4L, tdigest.state.size());
159-
assertEquals(4L, tdigest.state.centroidCount());
159+
assertEquals(4L, tdigest.state.centroids().size());
160160
assertEquals(2.0d, tdigest.percentile(100), 0.0d);
161161
assertEquals(1.0d, tdigest.percentile(50), 0.0d);
162162
assertEquals(1.0d, tdigest.percentile(25), 0.0d);
@@ -165,7 +165,7 @@ public void testQueryFiltering() throws IOException {
165165

166166
testCase(LongPoint.newRangeQuery("row", 100, 110), docs, tdigest -> {
167167
assertEquals(0L, tdigest.state.size());
168-
assertEquals(0L, tdigest.state.centroidCount());
168+
assertEquals(0L, tdigest.state.centroids().size());
169169
assertFalse(AggregationInspectionHelper.hasValue(tdigest));
170170
});
171171
}

server/src/test/java/org/opensearch/search/aggregations/metrics/TDigestStateTests.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)