Skip to content

Commit 932f982

Browse files
Simple thread local store metric capture
A simple thread local store metric capture to allow exposing this data to queries ultimately at the coordinating level. Exemplified here through bytes read but we will likely have more for stateless too.
1 parent 4202c1d commit 932f982

File tree

12 files changed

+324
-14
lines changed

12 files changed

+324
-14
lines changed

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.elasticsearch.index.shard.ShardPath;
5858
import org.elasticsearch.index.similarity.SimilarityService;
5959
import org.elasticsearch.index.store.FsDirectoryFactory;
60+
import org.elasticsearch.index.store.MetricHolder;
61+
import org.elasticsearch.index.store.StoreMetrics;
6062
import org.elasticsearch.indices.IndicesQueryCache;
6163
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6264
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -182,6 +184,7 @@ public interface DirectoryWrapper {
182184
private final IndexingStatsSettings indexingStatsSettings;
183185
private final SearchStatsSettings searchStatsSettings;
184186
private final MergeMetrics mergeMetrics;
187+
private MetricHolder<StoreMetrics> metricHolder;
185188

186189
/**
187190
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -206,7 +209,8 @@ public IndexModule(
206209
final List<SearchOperationListener> searchOperationListeners,
207210
final IndexingStatsSettings indexingStatsSettings,
208211
final SearchStatsSettings searchStatsSettings,
209-
final MergeMetrics mergeMetrics
212+
final MergeMetrics mergeMetrics,
213+
MetricHolder<StoreMetrics> metricHolder
210214
) {
211215
this.indexSettings = indexSettings;
212216
this.analysisRegistry = analysisRegistry;
@@ -224,6 +228,7 @@ public IndexModule(
224228
this.indexingStatsSettings = indexingStatsSettings;
225229
this.searchStatsSettings = searchStatsSettings;
226230
this.mergeMetrics = mergeMetrics;
231+
this.metricHolder = metricHolder;
227232
}
228233

229234
/**
@@ -560,7 +565,8 @@ public IndexService newIndexService(
560565
mapperMetrics,
561566
indexingStatsSettings,
562567
searchStatsSettings,
563-
mergeMetrics
568+
mergeMetrics,
569+
metricHolder
564570
);
565571
success = true;
566572
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@
8282
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
8383
import org.elasticsearch.index.shard.ShardPath;
8484
import org.elasticsearch.index.similarity.SimilarityService;
85+
import org.elasticsearch.index.store.MetricHolder;
8586
import org.elasticsearch.index.store.Store;
87+
import org.elasticsearch.index.store.StoreMetrics;
8688
import org.elasticsearch.index.translog.Translog;
8789
import org.elasticsearch.indices.breaker.CircuitBreakerService;
8890
import org.elasticsearch.indices.cluster.IndexRemovalReason;
@@ -172,6 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
172174
private final IndexingStatsSettings indexingStatsSettings;
173175
private final SearchStatsSettings searchStatsSettings;
174176
private final MergeMetrics mergeMetrics;
177+
private final MetricHolder<StoreMetrics> metricHolder;
175178

176179
@SuppressWarnings("this-escape")
177180
public IndexService(
@@ -210,7 +213,8 @@ public IndexService(
210213
MapperMetrics mapperMetrics,
211214
IndexingStatsSettings indexingStatsSettings,
212215
SearchStatsSettings searchStatsSettings,
213-
MergeMetrics mergeMetrics
216+
MergeMetrics mergeMetrics,
217+
MetricHolder<StoreMetrics> metricHolder
214218
) {
215219
super(indexSettings);
216220
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -297,6 +301,7 @@ public IndexService(
297301
this.indexingStatsSettings = indexingStatsSettings;
298302
this.searchStatsSettings = searchStatsSettings;
299303
this.mergeMetrics = mergeMetrics;
304+
this.metricHolder = metricHolder;
300305
updateFsyncTaskIfNecessary();
301306
}
302307

@@ -559,7 +564,8 @@ public synchronized IndexShard createShard(
559564
directory,
560565
lock,
561566
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
562-
this.indexSettings.getIndexSortConfig().hasIndexSort()
567+
this.indexSettings.getIndexSortConfig().hasIndexSort(),
568+
metricHolder
563569
);
564570
eventListener.onStoreCreated(shardId);
565571
indexShard = new IndexShard(
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
public interface MetricHolder<M> {
13+
static <M> MetricHolder<M> noop(M noopData) {
14+
return new MetricHolder<>() {
15+
16+
@Override
17+
public MetricHolder<M> singleThreaded() {
18+
return this;
19+
}
20+
21+
@Override
22+
public M instance() {
23+
return noopData;
24+
}
25+
};
26+
}
27+
28+
M instance();
29+
30+
MetricHolder<M> singleThreaded();
31+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
public class SingleThreadMetricHolder<M> implements MetricHolder<M> {
13+
private final ThreadLocalMetricHolder<M> delegate;
14+
private Thread owner;
15+
private M current;
16+
17+
public SingleThreadMetricHolder(ThreadLocalMetricHolder<M> delegate) {
18+
this.delegate = delegate;
19+
}
20+
21+
public SingleThreadMetricHolder(SingleThreadMetricHolder<M> source) {
22+
this.delegate = source.delegate;
23+
this.owner = source.owner;
24+
this.current = source.current;
25+
}
26+
27+
@Override
28+
public MetricHolder<M> singleThreaded() {
29+
return new SingleThreadMetricHolder<>(this);
30+
}
31+
32+
public M instance() {
33+
Thread thread = Thread.currentThread();
34+
if (owner == thread) {
35+
return current;
36+
} else {
37+
current = delegate.instance();
38+
owner = thread;
39+
return current;
40+
}
41+
}
42+
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,22 @@ public Store(
183183
ShardLock shardLock,
184184
OnClose onClose,
185185
boolean hasIndexSort
186+
) {
187+
this(shardId, indexSettings, directory, shardLock, onClose, hasIndexSort, new ThreadLocalMetricHolder<>(StoreMetrics::new));
188+
}
189+
190+
public Store(
191+
ShardId shardId,
192+
IndexSettings indexSettings,
193+
Directory directory,
194+
ShardLock shardLock,
195+
OnClose onClose,
196+
boolean hasIndexSort,
197+
MetricHolder<StoreMetrics> metricHolder
186198
) {
187199
super(shardId, indexSettings);
188200
this.directory = new StoreDirectory(
189-
byteSizeDirectory(directory, indexSettings, logger),
201+
byteSizeDirectory(new StoreMetricsDirectory(directory, metricHolder), indexSettings, logger),
190202
Loggers.getLogger("index.store.deletes", shardId)
191203
);
192204
this.shardLock = shardLock;
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import org.apache.lucene.store.FilterIndexInput;
13+
import org.apache.lucene.store.IOContext;
14+
import org.apache.lucene.store.IndexInput;
15+
import org.apache.lucene.store.RandomAccessInput;
16+
17+
import java.io.IOException;
18+
19+
public class StoreMetricIndexInput extends FilterIndexInput {
20+
private final MetricHolder<StoreMetrics> metricHolder;
21+
22+
public StoreMetricIndexInput(String resourceDescription, IndexInput in, MetricHolder<StoreMetrics> metricHolder) {
23+
super(resourceDescription, in);
24+
this.metricHolder = metricHolder;
25+
}
26+
27+
@Override
28+
public byte readByte() throws IOException {
29+
byte result = super.readByte();
30+
metricHolder.instance().addBytesRead(1);
31+
return result;
32+
}
33+
34+
@Override
35+
public void readBytes(byte[] b, int offset, int len) throws IOException {
36+
super.readBytes(b, offset, len);
37+
metricHolder.instance().addBytesRead(len);
38+
}
39+
40+
@Override
41+
public IndexInput clone() {
42+
return new StoreMetricIndexInput(toString(), in.clone(), metricHolder.singleThreaded());
43+
}
44+
45+
@Override
46+
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
47+
return new StoreMetricIndexInput(sliceDescription, super.slice(sliceDescription, offset, length), metricHolder.singleThreaded());
48+
}
49+
50+
@Override
51+
public IndexInput slice(String sliceDescription, long offset, long length, IOContext context) throws IOException {
52+
return new StoreMetricIndexInput(
53+
sliceDescription,
54+
super.slice(sliceDescription, offset, length, context),
55+
metricHolder.singleThreaded()
56+
);
57+
}
58+
59+
@Override
60+
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
61+
RandomAccessInput delegate = in.randomAccessSlice(offset, length);
62+
63+
return new RandomAccessInput() {
64+
private final MetricHolder<StoreMetrics> metricHolder = StoreMetricIndexInput.this.metricHolder;
65+
66+
@Override
67+
public long length() {
68+
return delegate.length();
69+
}
70+
71+
@Override
72+
public byte readByte(long pos) throws IOException {
73+
byte result = delegate.readByte(pos);
74+
metricHolder.instance().addBytesRead(1);
75+
return result;
76+
}
77+
78+
@Override
79+
public short readShort(long pos) throws IOException {
80+
short result = delegate.readShort(pos);
81+
metricHolder.instance().addBytesRead(2);
82+
return result;
83+
}
84+
85+
@Override
86+
public int readInt(long pos) throws IOException {
87+
int result = delegate.readInt(pos);
88+
metricHolder.instance().addBytesRead(4);
89+
return result;
90+
}
91+
92+
@Override
93+
public long readLong(long pos) throws IOException {
94+
long result = delegate.readLong(pos);
95+
metricHolder.instance().addBytesRead(8);
96+
return result;
97+
}
98+
};
99+
}
100+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
public class StoreMetrics {
13+
public static final MetricHolder<StoreMetrics> NOOP_HOLDER = MetricHolder.noop(new StoreMetrics() {
14+
@Override
15+
public void addBytesRead(long amount) {
16+
}
17+
});
18+
19+
private long bytesRead;
20+
21+
public long getBytesRead() {
22+
return bytesRead;
23+
}
24+
25+
public StoreMetrics snapshot() {
26+
StoreMetrics metrics = new StoreMetrics();
27+
metrics.bytesRead = bytesRead;
28+
return metrics;
29+
}
30+
31+
public void addBytesRead(long amount) {
32+
bytesRead += amount;
33+
}
34+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import org.apache.lucene.store.Directory;
13+
import org.apache.lucene.store.FilterDirectory;
14+
import org.apache.lucene.store.IOContext;
15+
import org.apache.lucene.store.IndexInput;
16+
17+
import java.io.IOException;
18+
19+
public class StoreMetricsDirectory extends FilterDirectory {
20+
private final MetricHolder<StoreMetrics> metricHandler;
21+
22+
public StoreMetricsDirectory(Directory in, MetricHolder<StoreMetrics> metricHandler) {
23+
super(in);
24+
this.metricHandler = metricHandler;
25+
}
26+
27+
@Override
28+
public IndexInput openInput(String name, IOContext context) throws IOException {
29+
return new StoreMetricIndexInput(name, super.openInput(name, context), metricHandler.singleThreaded());
30+
}
31+
32+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import java.util.function.Supplier;
13+
14+
public class ThreadLocalMetricHolder<M> implements MetricHolder<M> {
15+
private final Supplier<? extends M> metricsSupplier;
16+
private final ThreadLocal<M> threadLocal = new ThreadLocal<>() {
17+
@Override
18+
protected M initialValue() {
19+
return metricsSupplier.get();
20+
}
21+
};
22+
23+
public ThreadLocalMetricHolder(Supplier<? extends M> metricsSupplier) {
24+
this.metricsSupplier = metricsSupplier;
25+
}
26+
27+
@Override
28+
public MetricHolder<M> singleThreaded() {
29+
return new SingleThreadMetricHolder<>(this);
30+
}
31+
32+
public M instance() {
33+
return threadLocal.get();
34+
}
35+
}

0 commit comments

Comments
 (0)