Skip to content

Commit 9f39f0b

Browse files
The advanced version - too advanced really.
1 parent 4202c1d commit 9f39f0b

File tree

14 files changed

+308
-13
lines changed

14 files changed

+308
-13
lines changed

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
import org.elasticsearch.index.shard.ShardPath;
5858
import org.elasticsearch.index.similarity.SimilarityService;
5959
import org.elasticsearch.index.store.FsDirectoryFactory;
60+
import org.elasticsearch.index.store.MetricHandler;
61+
import org.elasticsearch.index.store.StoreMetrics;
62+
import org.elasticsearch.index.store.ThreadLocalMetricHandler;
6063
import org.elasticsearch.indices.IndicesQueryCache;
6164
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6265
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -182,6 +185,7 @@ public interface DirectoryWrapper {
182185
private final IndexingStatsSettings indexingStatsSettings;
183186
private final SearchStatsSettings searchStatsSettings;
184187
private final MergeMetrics mergeMetrics;
188+
private MetricHandler<StoreMetrics> metricHandler;
185189

186190
/**
187191
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -206,7 +210,7 @@ public IndexModule(
206210
final List<SearchOperationListener> searchOperationListeners,
207211
final IndexingStatsSettings indexingStatsSettings,
208212
final SearchStatsSettings searchStatsSettings,
209-
final MergeMetrics mergeMetrics
213+
final MergeMetrics mergeMetrics, MetricHandler<StoreMetrics> metricHandler
210214
) {
211215
this.indexSettings = indexSettings;
212216
this.analysisRegistry = analysisRegistry;
@@ -224,6 +228,7 @@ public IndexModule(
224228
this.indexingStatsSettings = indexingStatsSettings;
225229
this.searchStatsSettings = searchStatsSettings;
226230
this.mergeMetrics = mergeMetrics;
231+
this.metricHandler = metricHandler;
227232
}
228233

229234
/**
@@ -560,7 +565,8 @@ public IndexService newIndexService(
560565
mapperMetrics,
561566
indexingStatsSettings,
562567
searchStatsSettings,
563-
mergeMetrics
568+
mergeMetrics,
569+
metricHandler
564570
);
565571
success = true;
566572
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@
8282
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
8383
import org.elasticsearch.index.shard.ShardPath;
8484
import org.elasticsearch.index.similarity.SimilarityService;
85+
import org.elasticsearch.index.store.MetricHandler;
8586
import org.elasticsearch.index.store.Store;
87+
import org.elasticsearch.index.store.StoreMetrics;
8688
import org.elasticsearch.index.translog.Translog;
8789
import org.elasticsearch.indices.breaker.CircuitBreakerService;
8890
import org.elasticsearch.indices.cluster.IndexRemovalReason;
@@ -172,6 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
172174
private final IndexingStatsSettings indexingStatsSettings;
173175
private final SearchStatsSettings searchStatsSettings;
174176
private final MergeMetrics mergeMetrics;
177+
private MetricHandler<StoreMetrics> metricHandler;
175178

176179
@SuppressWarnings("this-escape")
177180
public IndexService(
@@ -210,7 +213,8 @@ public IndexService(
210213
MapperMetrics mapperMetrics,
211214
IndexingStatsSettings indexingStatsSettings,
212215
SearchStatsSettings searchStatsSettings,
213-
MergeMetrics mergeMetrics
216+
MergeMetrics mergeMetrics,
217+
MetricHandler<StoreMetrics> metricHandler
214218
) {
215219
super(indexSettings);
216220
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -297,6 +301,7 @@ public IndexService(
297301
this.indexingStatsSettings = indexingStatsSettings;
298302
this.searchStatsSettings = searchStatsSettings;
299303
this.mergeMetrics = mergeMetrics;
304+
this.metricHandler = metricHandler;
300305
updateFsyncTaskIfNecessary();
301306
}
302307

@@ -559,7 +564,8 @@ public synchronized IndexShard createShard(
559564
directory,
560565
lock,
561566
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)),
562-
this.indexSettings.getIndexSortConfig().hasIndexSort()
567+
this.indexSettings.getIndexSortConfig().hasIndexSort(),
568+
metricHandler
563569
);
564570
eventListener.onStoreCreated(shardId);
565571
indexShard = new IndexShard(
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
public class Metric {
13+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import java.util.function.Function;
13+
14+
public interface MetricHandler<M extends MetricHandler.Metrics> {
15+
static <M extends MetricHandler.Metrics> MetricHandler<M> noop() {
16+
return new MetricHandler<>() {
17+
@Override
18+
public void add(long amount, Function<M, LongMetricBase> metric) {
19+
20+
}
21+
22+
@Override
23+
public MetricHandler<M> singleThreaded() {
24+
return this;
25+
}
26+
};
27+
}
28+
29+
interface LongMetricBase {
30+
void add(long amount);
31+
}
32+
interface Metrics {
33+
34+
}
35+
void add(long amount, Function<M, LongMetricBase> metric);
36+
37+
38+
MetricHandler<M> singleThreaded();
39+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import java.util.function.Function;
13+
14+
public class SingleThreadMetricHandler<M extends MetricHandler.Metrics> implements MetricHandler<M> {
15+
private final ThreadLocalMetricHandler<M> delegate;
16+
private Thread owner;
17+
private M current;
18+
19+
public SingleThreadMetricHandler(ThreadLocalMetricHandler<M> delegate) {
20+
this.delegate = delegate;
21+
}
22+
23+
@Override
24+
public void add(long amount, Function<M, LongMetricBase> metric) {
25+
metric.apply(instance()).add(amount);
26+
}
27+
28+
@Override
29+
public MetricHandler<M> singleThreaded() {
30+
return this;
31+
}
32+
33+
private M instance() {
34+
Thread thread = Thread.currentThread();
35+
if (owner == thread) {
36+
return current;
37+
} else {
38+
current = delegate.instance();
39+
owner = thread;
40+
return current;
41+
}
42+
}
43+
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,22 @@ public Store(
183183
ShardLock shardLock,
184184
OnClose onClose,
185185
boolean hasIndexSort
186+
) {
187+
this(shardId, indexSettings, directory, shardLock, onClose, hasIndexSort, new ThreadLocalMetricHandler<>(StoreMetrics::new));
188+
}
189+
190+
public Store(
191+
ShardId shardId,
192+
IndexSettings indexSettings,
193+
Directory directory,
194+
ShardLock shardLock,
195+
OnClose onClose,
196+
boolean hasIndexSort,
197+
MetricHandler<StoreMetrics> metricHandler
186198
) {
187199
super(shardId, indexSettings);
188200
this.directory = new StoreDirectory(
189-
byteSizeDirectory(directory, indexSettings, logger),
201+
byteSizeDirectory(new StoreMetricsDirectory(directory, metricHandler), indexSettings, logger),
190202
Loggers.getLogger("index.store.deletes", shardId)
191203
);
192204
this.shardLock = shardLock;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import org.apache.lucene.store.FilterIndexInput;
13+
import org.apache.lucene.store.IndexInput;
14+
15+
import java.io.IOException;
16+
17+
public class StoreMetricIndexInput extends FilterIndexInput {
18+
private final MetricHandler<? extends StoreMetrics> metricHandler;
19+
20+
public StoreMetricIndexInput(String resourceDescription, IndexInput in, MetricHandler<? extends StoreMetrics> metricHandler) {
21+
super(resourceDescription, in);
22+
this.metricHandler = metricHandler;
23+
}
24+
25+
@Override
26+
public byte readByte() throws IOException {
27+
byte result = super.readByte();
28+
metricHandler.add(1, StoreMetrics::getBytesReadMetric);
29+
return result;
30+
}
31+
@Override
32+
public void readBytes(byte[] b, int offset, int len) throws IOException {
33+
super.readBytes(b, offset, len);
34+
metricHandler.add(len, StoreMetrics::getBytesReadMetric);
35+
}
36+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
public class StoreMetrics implements MetricHandler.Metrics {
13+
private long bytesRead;
14+
private final MetricHandler.LongMetricBase bytesReadMetric = new MetricHandler.LongMetricBase() {
15+
@Override
16+
public void add(long amount) {
17+
bytesRead += amount;
18+
}
19+
};
20+
21+
public MetricHandler.LongMetricBase getBytesReadMetric() {
22+
return bytesReadMetric;
23+
}
24+
25+
public long getBytesRead() {
26+
return bytesRead;
27+
}
28+
29+
public StoreMetrics snapshot() {
30+
StoreMetrics metrics = new StoreMetrics();
31+
metrics.bytesRead = bytesRead;
32+
return metrics;
33+
}
34+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import org.apache.lucene.store.Directory;
13+
import org.apache.lucene.store.FilterDirectory;
14+
import org.apache.lucene.store.IOContext;
15+
import org.apache.lucene.store.IndexInput;
16+
17+
import java.io.IOException;
18+
19+
public class StoreMetricsDirectory extends FilterDirectory {
20+
private final MetricHandler<? extends StoreMetrics> metricHandler;
21+
22+
public StoreMetricsDirectory(Directory in, MetricHandler<? extends StoreMetrics> metricHandler) {
23+
super(in);
24+
this.metricHandler = metricHandler;
25+
}
26+
27+
@Override
28+
public IndexInput openInput(String name, IOContext context) throws IOException {
29+
return new StoreMetricIndexInput(name, super.openInput(name, context), metricHandler.singleThreaded());
30+
}
31+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.store;
11+
12+
import java.util.function.Function;
13+
import java.util.function.Supplier;
14+
15+
public class ThreadLocalMetricHandler<M extends MetricHandler.Metrics> implements MetricHandler<M> {
16+
private final Supplier<? extends M> metricsSupplier;
17+
private final ThreadLocal<M> threadLocal = new ThreadLocal<>() {
18+
@Override
19+
protected M initialValue() {
20+
return metricsSupplier.get();
21+
}
22+
};
23+
24+
public ThreadLocalMetricHandler(Supplier<? extends M> metricsSupplier) {
25+
this.metricsSupplier = metricsSupplier;
26+
}
27+
28+
@Override
29+
public void add(long amount, Function<M, LongMetricBase> metric) {
30+
metric.apply(instance()).add(amount);
31+
}
32+
33+
@Override
34+
public MetricHandler<M> singleThreaded() {
35+
return new SingleThreadMetricHandler<>(this);
36+
}
37+
38+
M instance() {
39+
return threadLocal.get();
40+
}
41+
}

0 commit comments

Comments
 (0)