Skip to content

Commit ec6742e

Browse files
A simple metric capturing mechanism to expose metrics to queries
1 parent 9f39f0b commit ec6742e

File tree

14 files changed

+137
-127
lines changed

14 files changed

+137
-127
lines changed

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,8 @@
5757
import org.elasticsearch.index.shard.ShardPath;
5858
import org.elasticsearch.index.similarity.SimilarityService;
5959
import org.elasticsearch.index.store.FsDirectoryFactory;
60-
import org.elasticsearch.index.store.MetricHandler;
60+
import org.elasticsearch.index.store.MetricHolder;
6161
import org.elasticsearch.index.store.StoreMetrics;
62-
import org.elasticsearch.index.store.ThreadLocalMetricHandler;
6362
import org.elasticsearch.indices.IndicesQueryCache;
6463
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6564
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -185,7 +184,7 @@ public interface DirectoryWrapper {
185184
private final IndexingStatsSettings indexingStatsSettings;
186185
private final SearchStatsSettings searchStatsSettings;
187186
private final MergeMetrics mergeMetrics;
188-
private MetricHandler<StoreMetrics> metricHandler;
187+
private MetricHolder<StoreMetrics> metricHolder;
189188

190189
/**
191190
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -210,7 +209,8 @@ public IndexModule(
210209
final List<SearchOperationListener> searchOperationListeners,
211210
final IndexingStatsSettings indexingStatsSettings,
212211
final SearchStatsSettings searchStatsSettings,
213-
final MergeMetrics mergeMetrics, MetricHandler<StoreMetrics> metricHandler
212+
final MergeMetrics mergeMetrics,
213+
MetricHolder<StoreMetrics> metricHolder
214214
) {
215215
this.indexSettings = indexSettings;
216216
this.analysisRegistry = analysisRegistry;
@@ -228,7 +228,7 @@ public IndexModule(
228228
this.indexingStatsSettings = indexingStatsSettings;
229229
this.searchStatsSettings = searchStatsSettings;
230230
this.mergeMetrics = mergeMetrics;
231-
this.metricHandler = metricHandler;
231+
this.metricHolder = metricHolder;
232232
}
233233

234234
/**
@@ -566,7 +566,7 @@ public IndexService newIndexService(
566566
indexingStatsSettings,
567567
searchStatsSettings,
568568
mergeMetrics,
569-
metricHandler
569+
metricHolder
570570
);
571571
success = true;
572572
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
8383
import org.elasticsearch.index.shard.ShardPath;
8484
import org.elasticsearch.index.similarity.SimilarityService;
85-
import org.elasticsearch.index.store.MetricHandler;
85+
import org.elasticsearch.index.store.MetricHolder;
8686
import org.elasticsearch.index.store.Store;
8787
import org.elasticsearch.index.store.StoreMetrics;
8888
import org.elasticsearch.index.translog.Translog;
@@ -174,7 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
174174
private final IndexingStatsSettings indexingStatsSettings;
175175
private final SearchStatsSettings searchStatsSettings;
176176
private final MergeMetrics mergeMetrics;
177-
private MetricHandler<StoreMetrics> metricHandler;
177+
private MetricHolder<StoreMetrics> metricHandler;
178178

179179
@SuppressWarnings("this-escape")
180180
public IndexService(
@@ -214,7 +214,7 @@ public IndexService(
214214
IndexingStatsSettings indexingStatsSettings,
215215
SearchStatsSettings searchStatsSettings,
216216
MergeMetrics mergeMetrics,
217-
MetricHandler<StoreMetrics> metricHandler
217+
MetricHolder<StoreMetrics> metricHandler
218218
) {
219219
super(indexSettings);
220220
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS

server/src/main/java/org/elasticsearch/index/store/MetricHandler.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/index/store/Metric.java renamed to server/src/main/java/org/elasticsearch/index/store/MetricHolder.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,24 @@
99

1010
package org.elasticsearch.index.store;
1111

12-
public class Metric {
12+
public interface MetricHolder<M> {
13+
static <M> MetricHolder<M> noop() {
14+
return new MetricHolder<>() {
15+
16+
@Override
17+
public MetricHolder<M> singleThreaded() {
18+
return this;
19+
}
20+
21+
@Override
22+
public M instance() {
23+
// todo fix this
24+
return null;
25+
}
26+
};
27+
}
28+
29+
M instance();
30+
31+
MetricHolder<M> singleThreaded();
1332
}

server/src/main/java/org/elasticsearch/index/store/SingleThreadMetricHandler.java renamed to server/src/main/java/org/elasticsearch/index/store/SingleThreadMetricHolder.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,27 @@
99

1010
package org.elasticsearch.index.store;
1111

12-
import java.util.function.Function;
13-
14-
public class SingleThreadMetricHandler<M extends MetricHandler.Metrics> implements MetricHandler<M> {
15-
private final ThreadLocalMetricHandler<M> delegate;
12+
public class SingleThreadMetricHolder<M> implements MetricHolder<M> {
13+
private final ThreadLocalMetricHolder<M> delegate;
1614
private Thread owner;
1715
private M current;
1816

19-
public SingleThreadMetricHandler(ThreadLocalMetricHandler<M> delegate) {
17+
public SingleThreadMetricHolder(ThreadLocalMetricHolder<M> delegate) {
2018
this.delegate = delegate;
2119
}
2220

23-
@Override
24-
public void add(long amount, Function<M, LongMetricBase> metric) {
25-
metric.apply(instance()).add(amount);
21+
public SingleThreadMetricHolder(SingleThreadMetricHolder<M> source) {
22+
this.delegate = source.delegate;
23+
this.owner = source.owner;
24+
this.current = source.current;
2625
}
2726

2827
@Override
29-
public MetricHandler<M> singleThreaded() {
30-
return this;
28+
public MetricHolder<M> singleThreaded() {
29+
return new SingleThreadMetricHolder<>(this);
3130
}
3231

33-
private M instance() {
32+
public M instance() {
3433
Thread thread = Thread.currentThread();
3534
if (owner == thread) {
3635
return current;

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public Store(
184184
OnClose onClose,
185185
boolean hasIndexSort
186186
) {
187-
this(shardId, indexSettings, directory, shardLock, onClose, hasIndexSort, new ThreadLocalMetricHandler<>(StoreMetrics::new));
187+
this(shardId, indexSettings, directory, shardLock, onClose, hasIndexSort, new ThreadLocalMetricHolder<>(StoreMetrics::new));
188188
}
189189

190190
public Store(
@@ -194,11 +194,11 @@ public Store(
194194
ShardLock shardLock,
195195
OnClose onClose,
196196
boolean hasIndexSort,
197-
MetricHandler<StoreMetrics> metricHandler
197+
MetricHolder<StoreMetrics> metricHolder
198198
) {
199199
super(shardId, indexSettings);
200200
this.directory = new StoreDirectory(
201-
byteSizeDirectory(new StoreMetricsDirectory(directory, metricHandler), indexSettings, logger),
201+
byteSizeDirectory(new StoreMetricsDirectory(directory, metricHolder), indexSettings, logger),
202202
Loggers.getLogger("index.store.deletes", shardId)
203203
);
204204
this.shardLock = shardLock;

server/src/main/java/org/elasticsearch/index/store/StoreMetricIndexInput.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,91 @@
1010
package org.elasticsearch.index.store;
1111

1212
import org.apache.lucene.store.FilterIndexInput;
13+
import org.apache.lucene.store.IOContext;
1314
import org.apache.lucene.store.IndexInput;
15+
import org.apache.lucene.store.RandomAccessInput;
1416

1517
import java.io.IOException;
1618

1719
public class StoreMetricIndexInput extends FilterIndexInput {
18-
private final MetricHandler<? extends StoreMetrics> metricHandler;
20+
private final MetricHolder<StoreMetrics> metricHolder;
1921

20-
public StoreMetricIndexInput(String resourceDescription, IndexInput in, MetricHandler<? extends StoreMetrics> metricHandler) {
22+
public StoreMetricIndexInput(String resourceDescription, IndexInput in, MetricHolder<StoreMetrics> metricHolder) {
2123
super(resourceDescription, in);
22-
this.metricHandler = metricHandler;
24+
this.metricHolder = metricHolder;
2325
}
2426

2527
@Override
2628
public byte readByte() throws IOException {
2729
byte result = super.readByte();
28-
metricHandler.add(1, StoreMetrics::getBytesReadMetric);
30+
metricHolder.instance().addBytesRead(1);
2931
return result;
3032
}
33+
3134
@Override
3235
public void readBytes(byte[] b, int offset, int len) throws IOException {
3336
super.readBytes(b, offset, len);
34-
metricHandler.add(len, StoreMetrics::getBytesReadMetric);
37+
metricHolder.instance().addBytesRead(len);
38+
}
39+
40+
@Override
41+
public IndexInput clone() {
42+
return new StoreMetricIndexInput(toString(), in.clone(), metricHolder.singleThreaded());
43+
}
44+
45+
@Override
46+
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
47+
return new StoreMetricIndexInput(sliceDescription, super.slice(sliceDescription, offset, length), metricHolder.singleThreaded());
48+
}
49+
50+
@Override
51+
public IndexInput slice(String sliceDescription, long offset, long length, IOContext context) throws IOException {
52+
return new StoreMetricIndexInput(
53+
sliceDescription,
54+
super.slice(sliceDescription, offset, length, context),
55+
metricHolder.singleThreaded()
56+
);
57+
}
58+
59+
@Override
60+
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
61+
RandomAccessInput delegate = in.randomAccessSlice(offset, length);
62+
63+
return new RandomAccessInput() {
64+
private final MetricHolder<StoreMetrics> metricHolder = StoreMetricIndexInput.this.metricHolder;
65+
66+
@Override
67+
public long length() {
68+
return delegate.length();
69+
}
70+
71+
@Override
72+
public byte readByte(long pos) throws IOException {
73+
byte result = delegate.readByte(pos);
74+
metricHolder.instance().addBytesRead(1);
75+
return result;
76+
}
77+
78+
@Override
79+
public short readShort(long pos) throws IOException {
80+
short result = delegate.readShort(pos);
81+
metricHolder.instance().addBytesRead(2);
82+
return result;
83+
}
84+
85+
@Override
86+
public int readInt(long pos) throws IOException {
87+
int result = delegate.readInt(pos);
88+
metricHolder.instance().addBytesRead(4);
89+
return result;
90+
}
91+
92+
@Override
93+
public long readLong(long pos) throws IOException {
94+
long result = delegate.readLong(pos);
95+
metricHolder.instance().addBytesRead(8);
96+
return result;
97+
}
98+
};
3599
}
36100
}

server/src/main/java/org/elasticsearch/index/store/StoreMetrics.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,8 @@
99

1010
package org.elasticsearch.index.store;
1111

12-
public class StoreMetrics implements MetricHandler.Metrics {
12+
public class StoreMetrics {
1313
private long bytesRead;
14-
private final MetricHandler.LongMetricBase bytesReadMetric = new MetricHandler.LongMetricBase() {
15-
@Override
16-
public void add(long amount) {
17-
bytesRead += amount;
18-
}
19-
};
20-
21-
public MetricHandler.LongMetricBase getBytesReadMetric() {
22-
return bytesReadMetric;
23-
}
2414

2515
public long getBytesRead() {
2616
return bytesRead;
@@ -31,4 +21,8 @@ public StoreMetrics snapshot() {
3121
metrics.bytesRead = bytesRead;
3222
return metrics;
3323
}
24+
25+
public void addBytesRead(long amount) {
26+
bytesRead += amount;
27+
}
3428
}

server/src/main/java/org/elasticsearch/index/store/StoreMetricsDirectory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
import java.io.IOException;
1818

1919
public class StoreMetricsDirectory extends FilterDirectory {
20-
private final MetricHandler<? extends StoreMetrics> metricHandler;
20+
private final MetricHolder<StoreMetrics> metricHandler;
2121

22-
public StoreMetricsDirectory(Directory in, MetricHandler<? extends StoreMetrics> metricHandler) {
22+
public StoreMetricsDirectory(Directory in, MetricHolder<StoreMetrics> metricHandler) {
2323
super(in);
2424
this.metricHandler = metricHandler;
2525
}
@@ -28,4 +28,5 @@ public StoreMetricsDirectory(Directory in, MetricHandler<? extends StoreMetrics>
2828
public IndexInput openInput(String name, IOContext context) throws IOException {
2929
return new StoreMetricIndexInput(name, super.openInput(name, context), metricHandler.singleThreaded());
3030
}
31+
3132
}

server/src/main/java/org/elasticsearch/index/store/ThreadLocalMetricHandler.java renamed to server/src/main/java/org/elasticsearch/index/store/ThreadLocalMetricHolder.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,27 @@
99

1010
package org.elasticsearch.index.store;
1111

12-
import java.util.function.Function;
1312
import java.util.function.Supplier;
1413

15-
public class ThreadLocalMetricHandler<M extends MetricHandler.Metrics> implements MetricHandler<M> {
14+
public class ThreadLocalMetricHolder<M> implements MetricHolder<M> {
1615
private final Supplier<? extends M> metricsSupplier;
17-
private final ThreadLocal<M> threadLocal = new ThreadLocal<>() {
16+
private final ThreadLocal<M> threadLocal = new ThreadLocal<>() {
1817
@Override
1918
protected M initialValue() {
2019
return metricsSupplier.get();
2120
}
2221
};
2322

24-
public ThreadLocalMetricHandler(Supplier<? extends M> metricsSupplier) {
23+
public ThreadLocalMetricHolder(Supplier<? extends M> metricsSupplier) {
2524
this.metricsSupplier = metricsSupplier;
2625
}
2726

2827
@Override
29-
public void add(long amount, Function<M, LongMetricBase> metric) {
30-
metric.apply(instance()).add(amount);
28+
public MetricHolder<M> singleThreaded() {
29+
return new SingleThreadMetricHolder<>(this);
3130
}
3231

33-
@Override
34-
public MetricHandler<M> singleThreaded() {
35-
return new SingleThreadMetricHandler<>(this);
36-
}
37-
38-
M instance() {
32+
public M instance() {
3933
return threadLocal.get();
4034
}
4135
}

0 commit comments

Comments
 (0)