Skip to content

Commit 7272d27

Browse files
authored
Vectorize SpectatorHistogram (#18813)
Adds additional test coverage for both numeric and complex aggregations as well.
1 parent 9628f68 commit 7272d27

10 files changed

+1591
-48
lines changed

extensions-contrib/spectator-histogram/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,5 +146,10 @@
146146
<artifactId>easymock</artifactId>
147147
<scope>test</scope>
148148
</dependency>
149+
<dependency>
150+
<groupId>org.mockito</groupId>
151+
<artifactId>mockito-core</artifactId>
152+
<scope>test</scope>
153+
</dependency>
149154
</dependencies>
150155
</project>

extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogram.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public class SpectatorHistogram extends Number
143143
// These are accumulated when an entry is added, or when another histogram is merged into this one.
144144
private long sumOfCounts = 0;
145145

146-
static int getMaxIntermdiateHistogramSize()
146+
public static int getMaxIntermediateHistogramSize()
147147
{
148148
return PercentileBuckets.length() * MAX_ENTRY_BYTES;
149149
}
@@ -276,9 +276,14 @@ byte[] toBytes()
276276
return Arrays.copyOf(buffer.array(), buffer.position());
277277
}
278278

279-
void insert(Number num)
279+
public void insert(Number num)
280280
{
281-
this.add(PercentileBuckets.indexOf(num.longValue()), 1L);
281+
this.insert(num.longValue());
282+
}
283+
284+
public void insert(long num)
285+
{
286+
this.add(PercentileBuckets.indexOf(num), 1L);
282287
}
283288

284289
void merge(SpectatorHistogram source)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.spectator.histogram;
21+
22+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
23+
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
24+
import org.apache.druid.java.util.common.IAE;
25+
26+
import java.nio.ByteBuffer;
27+
import java.util.IdentityHashMap;
28+
29+
/**
30+
* Helper class for Spectator histogram implementations of {@link org.apache.druid.query.aggregation.BufferAggregator} and {@link org.apache.druid.query.aggregation.VectorAggregator}.
31+
*/
32+
public class SpectatorHistogramAggregateHelper
33+
{
34+
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>> histogramCache = new IdentityHashMap<>();
35+
36+
public void init(ByteBuffer buffer, int position)
37+
{
38+
SpectatorHistogram emptyCounts = new SpectatorHistogram();
39+
addToCache(buffer, position, emptyCounts);
40+
}
41+
42+
/**
43+
* Merge obj ({@link SpectatorHistogram} or {@link Number}) into {@param current}.
44+
*/
45+
public void merge(SpectatorHistogram current, Object obj)
46+
{
47+
if (obj instanceof SpectatorHistogram) {
48+
SpectatorHistogram other = (SpectatorHistogram) obj;
49+
current.merge(other);
50+
} else if (obj instanceof Number) {
51+
current.insert((Number) obj);
52+
} else {
53+
throw new IAE(
54+
"Expected a Number, but received [%s] of type [%s]",
55+
obj,
56+
obj.getClass()
57+
);
58+
}
59+
}
60+
61+
/**
62+
* Merge {@param value} into {@param current}.
63+
*/
64+
public void merge(SpectatorHistogram current, long value)
65+
{
66+
current.insert(value);
67+
}
68+
69+
/**
70+
* Fetches the SpectatorHistogram at the given buffer/position pair in the cache
71+
*/
72+
public SpectatorHistogram get(final ByteBuffer buffer, final int position)
73+
{
74+
// histogramCache is an IdentityHashMap where the reference of buffer is used for equality checks.
75+
// So the returned object isn't impacted by the changes in the buffer object made by concurrent threads.
76+
final Int2ObjectMap<SpectatorHistogram> map = histogramCache.get(buffer);
77+
if (map == null) {
78+
return null;
79+
}
80+
return map.get(position);
81+
}
82+
83+
/**
84+
* Fetches the SpectatorHistogram cache for the given buffer
85+
*/
86+
public Int2ObjectMap<SpectatorHistogram> get(final ByteBuffer buffer)
87+
{
88+
return histogramCache.get(buffer);
89+
}
90+
91+
public float getFloat(final ByteBuffer buffer, final int position)
92+
{
93+
throw new UnsupportedOperationException("Not implemented");
94+
}
95+
96+
public long getLong(final ByteBuffer buffer, final int position)
97+
{
98+
throw new UnsupportedOperationException("Not implemented");
99+
}
100+
101+
/**
102+
* Resets the helper by clearing the buffer/histogram cache.
103+
*/
104+
public void close()
105+
{
106+
histogramCache.clear();
107+
}
108+
109+
/**
110+
* Move histogram located at {@param oldBuffer} in position {@param oldPosition} to {@param newBuffer} in position {@param newPosition}.
111+
*/
112+
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
113+
{
114+
final SpectatorHistogram histogram = histogramCache.get(oldBuffer).get(oldPosition);
115+
addToCache(newBuffer, newPosition, histogram);
116+
117+
final Int2ObjectMap<SpectatorHistogram> map = histogramCache.get(oldBuffer);
118+
map.remove(oldPosition);
119+
if (map.isEmpty()) {
120+
histogramCache.remove(oldBuffer);
121+
}
122+
}
123+
124+
private void addToCache(final ByteBuffer buffer, final int position, final SpectatorHistogram histogram)
125+
{
126+
Int2ObjectMap<SpectatorHistogram> map = histogramCache.computeIfAbsent(
127+
buffer,
128+
b -> new Int2ObjectOpenHashMap<>()
129+
);
130+
map.put(position, histogram);
131+
}
132+
}

extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorFactory.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
import org.apache.druid.query.aggregation.AggregatorUtil;
3030
import org.apache.druid.query.aggregation.BufferAggregator;
3131
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
32+
import org.apache.druid.query.aggregation.VectorAggregator;
3233
import org.apache.druid.query.cache.CacheKeyBuilder;
34+
import org.apache.druid.segment.ColumnInspector;
3335
import org.apache.druid.segment.ColumnSelectorFactory;
3436
import org.apache.druid.segment.ColumnValueSelector;
37+
import org.apache.druid.segment.column.ColumnCapabilities;
3538
import org.apache.druid.segment.column.ColumnType;
39+
import org.apache.druid.segment.column.ValueType;
40+
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
3641

3742
import javax.annotation.Nonnull;
3843
import javax.annotation.Nullable;
@@ -98,6 +103,27 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
98103
return new SpectatorHistogramBufferAggregator(metricFactory.makeColumnValueSelector(fieldName));
99104
}
100105

106+
@Override
107+
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
108+
{
109+
final ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
110+
// Allow both complex and numeric aggregation to support vectorization during ingestion as well as query time.
111+
// When ingesting data, the columnSelectorFactory returns null for column capabilities, so this doesn't
112+
// necessarily mean that the column doesn't exist. We need to be prepared to accept anything in this
113+
// case. As such, we pretend like the input is COMPLEX to get the logic to use the object-based aggregation.
114+
if (capabilities == null || capabilities.getType() == ValueType.COMPLEX) {
115+
return new SpectatorHistogramVectorizedAggregator(selectorFactory.makeObjectSelector(fieldName));
116+
} else {
117+
return new SpectatorHistogramNumericVectorizedAggregator(selectorFactory.makeValueSelector(fieldName));
118+
}
119+
}
120+
121+
@Override
122+
public boolean canVectorize(ColumnInspector columnInspector)
123+
{
124+
return true;
125+
}
126+
101127
// This is used when writing metrics to segment files to check whether the column is sorted.
102128
// Since there is no sensible way really to compare histograms, compareTo always returns 1.
103129
public static final Comparator<SpectatorHistogram> COMPARATOR = (o, o1) -> {
@@ -195,7 +221,7 @@ public ColumnType getResultType()
195221
@Override
196222
public int getMaxIntermediateSize()
197223
{
198-
return SpectatorHistogram.getMaxIntermdiateHistogramSize();
224+
return SpectatorHistogram.getMaxIntermediateHistogramSize();
199225
}
200226

201227
@Override

extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramBufferAggregator.java

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,11 @@
2020
package org.apache.druid.spectator.histogram;
2121

2222
import com.google.common.base.Preconditions;
23-
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
24-
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
25-
import org.apache.druid.java.util.common.IAE;
2623
import org.apache.druid.query.aggregation.BufferAggregator;
2724
import org.apache.druid.segment.ColumnValueSelector;
2825

2926
import javax.annotation.Nonnull;
3027
import java.nio.ByteBuffer;
31-
import java.util.IdentityHashMap;
3228

3329
/**
3430
* Aggregator that builds Spectator Histograms over numeric values read from {@link ByteBuffer}
@@ -38,7 +34,7 @@ public class SpectatorHistogramBufferAggregator implements BufferAggregator
3834

3935
@Nonnull
4036
private final ColumnValueSelector selector;
41-
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<SpectatorHistogram>> histogramCache = new IdentityHashMap<>();
37+
private final SpectatorHistogramAggregateHelper innerAggregator = new SpectatorHistogramAggregateHelper();
4238

4339
public SpectatorHistogramBufferAggregator(
4440
final ColumnValueSelector valueSelector
@@ -51,8 +47,7 @@ public SpectatorHistogramBufferAggregator(
5147
@Override
5248
public void init(ByteBuffer buffer, int position)
5349
{
54-
SpectatorHistogram emptyCounts = new SpectatorHistogram();
55-
addToCache(buffer, position, emptyCounts);
50+
innerAggregator.init(buffer, position);
5651
}
5752

5853
@Override
@@ -62,70 +57,44 @@ public void aggregate(ByteBuffer buffer, int position)
6257
if (obj == null) {
6358
return;
6459
}
65-
SpectatorHistogram counts = histogramCache.get(buffer).get(position);
66-
if (obj instanceof SpectatorHistogram) {
67-
SpectatorHistogram other = (SpectatorHistogram) obj;
68-
counts.merge(other);
69-
} else if (obj instanceof Number) {
70-
counts.insert((Number) obj);
71-
} else {
72-
throw new IAE(
73-
"Expected a number or a long[], but received [%s] of type [%s]",
74-
obj,
75-
obj.getClass()
76-
);
60+
SpectatorHistogram counts = innerAggregator.get(buffer, position);
61+
if (counts == null) {
62+
return;
7763
}
64+
innerAggregator.merge(counts, obj);
7865
}
7966

8067
@Override
8168
public Object get(final ByteBuffer buffer, final int position)
8269
{
83-
// histogramCache is an IdentityHashMap where the reference of buffer is used for equality checks.
84-
// So the returned object isn't impacted by the changes in the buffer object made by concurrent threads.
85-
86-
SpectatorHistogram spectatorHistogram = histogramCache.get(buffer).get(position);
87-
if (spectatorHistogram.isEmpty()) {
70+
SpectatorHistogram histogram = innerAggregator.get(buffer, position);
71+
if (histogram == null || histogram.isEmpty()) {
8872
return null;
8973
}
90-
return spectatorHistogram;
74+
return histogram;
9175
}
9276

9377
@Override
9478
public float getFloat(final ByteBuffer buffer, final int position)
9579
{
96-
throw new UnsupportedOperationException("Not implemented");
80+
return innerAggregator.getFloat(buffer, position);
9781
}
9882

9983
@Override
10084
public long getLong(final ByteBuffer buffer, final int position)
10185
{
102-
throw new UnsupportedOperationException("Not implemented");
86+
return innerAggregator.getLong(buffer, position);
10387
}
10488

10589
@Override
10690
public void close()
10791
{
108-
histogramCache.clear();
92+
innerAggregator.close();
10993
}
11094

11195
@Override
11296
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
11397
{
114-
SpectatorHistogram histogram = histogramCache.get(oldBuffer).get(oldPosition);
115-
addToCache(newBuffer, newPosition, histogram);
116-
final Int2ObjectMap<SpectatorHistogram> map = histogramCache.get(oldBuffer);
117-
map.remove(oldPosition);
118-
if (map.isEmpty()) {
119-
histogramCache.remove(oldBuffer);
120-
}
121-
}
122-
123-
private void addToCache(final ByteBuffer buffer, final int position, final SpectatorHistogram histogram)
124-
{
125-
Int2ObjectMap<SpectatorHistogram> map = histogramCache.computeIfAbsent(
126-
buffer,
127-
b -> new Int2ObjectOpenHashMap<>()
128-
);
129-
map.put(position, histogram);
98+
innerAggregator.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
13099
}
131100
}

0 commit comments

Comments
 (0)