Skip to content

Commit d34cd64

Browse files
authored
Merge branch 'main' into exp-histo-accounting
2 parents a5773e9 + 216753a commit d34cd64

File tree

6 files changed

+128
-8
lines changed

6 files changed

+128
-8
lines changed

docs/changelog/132547.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132547
2+
summary: Add epoch blob-cache metric
3+
area: Searchable Snapshots
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/HierarchicalKMeans.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,29 +106,57 @@ KMeansIntermediate clusterAndSplit(final FloatVectorValues vectors, final int ta
106106
// TODO: consider adding cluster size counts to the kmeans algo
107107
// handle assignment here so we can track distance and cluster size
108108
int[] centroidVectorCount = new int[centroids.length];
109+
int effectiveCluster = -1;
109110
int effectiveK = 0;
110111
for (int assigment : assignments) {
111112
centroidVectorCount[assigment]++;
112113
// this cluster has received an assignment, its now effective, but only count it once
113114
if (centroidVectorCount[assigment] == 1) {
114115
effectiveK++;
116+
effectiveCluster = assigment;
115117
}
116118
}
117119

118120
if (effectiveK == 1) {
121+
final float[][] singleClusterCentroid = new float[1][];
122+
singleClusterCentroid[0] = centroids[effectiveCluster];
123+
kMeansIntermediate.setCentroids(singleClusterCentroid);
124+
Arrays.fill(kMeansIntermediate.assignments(), 0);
119125
return kMeansIntermediate;
120126
}
121127

128+
int removedElements = 0;
122129
for (int c = 0; c < centroidVectorCount.length; c++) {
123130
// Recurse for each cluster which is larger than targetSize
124131
// Give ourselves 30% margin for the target size
125-
if (100 * centroidVectorCount[c] > 134 * targetSize) {
126-
FloatVectorValues sample = createClusterSlice(centroidVectorCount[c], c, vectors, assignments);
127-
132+
final int count = centroidVectorCount[c];
133+
final int adjustedCentroid = c - removedElements;
134+
if (100 * count > 134 * targetSize) {
135+
final FloatVectorValues sample = createClusterSlice(count, adjustedCentroid, vectors, assignments);
128136
// TODO: consider iterative here instead of recursive
129137
// recursive call to build out the sub partitions around this centroid c
130138
// subsequently reconcile and flatten the space of all centroids and assignments into one structure we can return
131-
updateAssignmentsWithRecursiveSplit(kMeansIntermediate, c, clusterAndSplit(sample, targetSize));
139+
updateAssignmentsWithRecursiveSplit(kMeansIntermediate, adjustedCentroid, clusterAndSplit(sample, targetSize));
140+
} else if (count == 0) {
141+
// remove empty clusters
142+
final int newSize = kMeansIntermediate.centroids().length - 1;
143+
final float[][] newCentroids = new float[newSize][];
144+
System.arraycopy(kMeansIntermediate.centroids(), 0, newCentroids, 0, adjustedCentroid);
145+
System.arraycopy(
146+
kMeansIntermediate.centroids(),
147+
adjustedCentroid + 1,
148+
newCentroids,
149+
adjustedCentroid,
150+
newSize - adjustedCentroid
151+
);
152+
// we need to update the assignments to reflect the new centroid ordinals
153+
for (int i = 0; i < kMeansIntermediate.assignments().length; i++) {
154+
if (kMeansIntermediate.assignments()[i] > adjustedCentroid) {
155+
kMeansIntermediate.assignments()[i]--;
156+
}
157+
}
158+
kMeansIntermediate.setCentroids(newCentroids);
159+
removedElements++;
132160
}
133161
}
134162

server/src/test/java/org/elasticsearch/index/codec/vectors/cluster/HierarchicalKMeansTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,63 @@ private static FloatVectorValues generateData(int nSamples, int nDims, int nClus
7474
}
7575
return FloatVectorValues.fromFloats(vectors, nDims);
7676
}
77+
78+
public void testFewDifferentValues() throws IOException {
79+
int nVectors = random().nextInt(100, 1000);
80+
int targetSize = random().nextInt(4, 64);
81+
int dims = random().nextInt(2, 20);
82+
int diffValues = randomIntBetween(1, 5);
83+
float[][] values = new float[diffValues][dims];
84+
for (int i = 0; i < diffValues; i++) {
85+
for (int j = 0; j < dims; j++) {
86+
values[i][j] = random().nextFloat();
87+
}
88+
}
89+
List<float[]> vectorList = new ArrayList<>(nVectors);
90+
for (int i = 0; i < nVectors; i++) {
91+
vectorList.add(values[random().nextInt(diffValues)]);
92+
}
93+
FloatVectorValues vectors = FloatVectorValues.fromFloats(vectorList, dims);
94+
95+
HierarchicalKMeans hkmeans = new HierarchicalKMeans(
96+
dims,
97+
random().nextInt(1, 100),
98+
random().nextInt(Math.min(nVectors, 100), nVectors + 1),
99+
random().nextInt(2, 512),
100+
random().nextFloat(0.5f, 1.5f)
101+
);
102+
103+
KMeansResult result = hkmeans.cluster(vectors, targetSize);
104+
105+
float[][] centroids = result.centroids();
106+
int[] assignments = result.assignments();
107+
int[] soarAssignments = result.soarAssignments();
108+
109+
int[] counts = new int[centroids.length];
110+
for (int i = 0; i < assignments.length; i++) {
111+
counts[assignments[i]]++;
112+
}
113+
int totalCount = 0;
114+
for (int count : counts) {
115+
totalCount += count;
116+
assertTrue(count > 0);
117+
}
118+
assertEquals(nVectors, totalCount);
119+
120+
assertEquals(nVectors, assignments.length);
121+
122+
for (int assignment : assignments) {
123+
assertTrue(assignment >= 0 && assignment < centroids.length);
124+
}
125+
if (centroids.length > 1 && centroids.length < nVectors) {
126+
assertEquals(nVectors, soarAssignments.length);
127+
// verify no duplicates exist
128+
for (int i = 0; i < assignments.length; i++) {
129+
assertTrue(soarAssignments[i] >= 0 && soarAssignments[i] < centroids.length);
130+
assertNotEquals(assignments[i], soarAssignments[i]);
131+
}
132+
} else {
133+
assertEquals(0, soarAssignments.length);
134+
}
135+
}
77136
}

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class BlobCacheMetrics {
4242

4343
private final LongAdder missCount = new LongAdder();
4444
private final LongAdder readCount = new LongAdder();
45+
private final LongCounter epochChanges;
4546

4647
public enum CachePopulationReason {
4748
/**
@@ -98,7 +99,8 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
9899
"es.blob_cache.population.time.total",
99100
"The time spent copying data into the cache",
100101
"milliseconds"
101-
)
102+
),
103+
meterRegistry.registerLongCounter("es.blob_cache.epoch.total", "The epoch changes of the LFU cache", "count")
102104
);
103105

104106
meterRegistry.registerLongGauge(
@@ -134,7 +136,8 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
134136
LongHistogram cacheMissLoadTimes,
135137
DoubleHistogram cachePopulationThroughput,
136138
LongCounter cachePopulationBytes,
137-
LongCounter cachePopulationTime
139+
LongCounter cachePopulationTime,
140+
LongCounter epochChanges
138141
) {
139142
this.cacheMissCounter = cacheMissCounter;
140143
this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency;
@@ -143,6 +146,7 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
143146
this.cachePopulationThroughput = cachePopulationThroughput;
144147
this.cachePopulationBytes = cachePopulationBytes;
145148
this.cachePopulationTime = cachePopulationTime;
149+
this.epochChanges = epochChanges;
146150
}
147151

148152
public static final BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
@@ -201,6 +205,10 @@ public void recordCachePopulationMetrics(
201205
}
202206
}
203207

208+
public void recordEpochChange() {
209+
epochChanges.increment();
210+
}
211+
204212
public void recordRead() {
205213
readCount.increment();
206214
}

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,6 +2049,7 @@ public void onFailure(Exception e) {
20492049
public void onAfter() {
20502050
assert pendingEpoch.get() == epoch.get() + 1;
20512051
epoch.incrementAndGet();
2052+
blobCacheMetrics.recordEpochChange();
20522053
}
20532054

20542055
@Override

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.env.TestEnvironment;
3636
import org.elasticsearch.node.NodeRoleSettings;
3737
import org.elasticsearch.telemetry.InstrumentType;
38+
import org.elasticsearch.telemetry.Measurement;
3839
import org.elasticsearch.telemetry.RecordingMeterRegistry;
3940
import org.elasticsearch.test.ESTestCase;
4041
import org.elasticsearch.threadpool.TestThreadPool;
@@ -332,6 +333,8 @@ public void testAsynchronousEviction() throws Exception {
332333
}
333334

334335
public void testDecay() throws IOException {
336+
RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
337+
BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
335338
// we have 8 regions
336339
Settings settings = Settings.builder()
337340
.put(NODE_NAME_SETTING.getKey(), "node")
@@ -347,7 +350,7 @@ public void testDecay() throws IOException {
347350
settings,
348351
taskQueue.getThreadPool(),
349352
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
350-
BlobCacheMetrics.NOOP
353+
metrics
351354
)
352355
) {
353356
assertEquals(4, cacheService.freeRegionCount());
@@ -375,6 +378,8 @@ public void testDecay() throws IOException {
375378
assertThat(taskQueue.hasRunnableTasks(), is(true));
376379
taskQueue.runAllRunnableTasks();
377380
assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet()));
381+
long epochs = recordedEpochs(recordingMeterRegistry);
382+
assertEquals(cacheService.epoch(), epochs);
378383
};
379384

380385
triggerDecay.run();
@@ -435,11 +440,22 @@ public void testDecay() throws IOException {
435440
}
436441
}
437442

443+
private static long recordedEpochs(RecordingMeterRegistry recordingMeterRegistry) {
444+
long epochs = recordingMeterRegistry.getRecorder()
445+
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.epoch.total")
446+
.stream()
447+
.mapToLong(Measurement::getLong)
448+
.sum();
449+
return epochs;
450+
}
451+
438452
/**
439453
* Test when many objects need to decay, in particular useful to measure how long the decay task takes.
440454
* For 1M objects (with no assertions) it took 26ms locally.
441455
*/
442456
public void testMassiveDecay() throws IOException {
457+
RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
458+
BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
443459
int regions = 1024; // to measure decay time, increase to 1024*1024 and disable assertions.
444460
Settings settings = Settings.builder()
445461
.put(NODE_NAME_SETTING.getKey(), "node")
@@ -455,7 +471,7 @@ public void testMassiveDecay() throws IOException {
455471
settings,
456472
taskQueue.getThreadPool(),
457473
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
458-
BlobCacheMetrics.NOOP
474+
metrics
459475
)
460476
) {
461477
Runnable decay = () -> {
@@ -496,6 +512,9 @@ public void testMassiveDecay() throws IOException {
496512
}
497513
}
498514
assertThat(freqs.get(4), equalTo(regions - maxRounds + 1));
515+
516+
long epochs = recordedEpochs(recordingMeterRegistry);
517+
assertEquals(cacheService.epoch(), epochs);
499518
}
500519
}
501520

0 commit comments

Comments
 (0)