Skip to content

Commit 020ee1d

Browse files
committed
commit
1 parent 8cb4493 commit 020ee1d

File tree

3 files changed

+27
-0
lines changed

3 files changed

+27
-0
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3235,6 +3235,16 @@ public void noopUpdate() {
32353235
internalIndexingStats.noopUpdate();
32363236
}
32373237

3238+
/**
3239+
* Increment relevant stats when indexing buffers are written to disk using indexing threads,
3240+
* in order to apply back-pressure on indexing.
3241+
* @param took time it took to write the index buffers for this shard
3242+
* @see org.elasticsearch.indices.IndexingMemoryController
3243+
*/
3244+
public void writeIndexBuffersOnIndexThreads(long took) {
3245+
internalIndexingStats.writeIndexBuffers(took);
3246+
}
3247+
32383248
public void maybeCheckIndex() {
32393249
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
32403250
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {

server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,17 @@ void noopUpdate() {
153153
totalStats.noopUpdates.inc();
154154
}
155155

156+
/**
157+
* Increment relevant stats when indexing buffers are written to disk using indexing threads,
158+
* in order to apply back-pressure on indexing.
159+
* @param took time taken to write buffers
160+
* @see org.elasticsearch.indices.IndexingMemoryController
161+
*/
162+
void writeIndexBuffers(long took) {
163+
totalStats.indexMetric.inc(took);
164+
totalStats.recentIndexMetric.addIncrement(took, relativeTimeInNanosSupplier.getAsLong());
165+
}
166+
156167
static class StatsHolder {
157168
private final MeanMetric indexMetric = new MeanMetric(); // Used for the count and total 'took' time (in ns) of index operations
158169
private final ExponentiallyWeightedMovingRate recentIndexMetric; // An EWMR of the total 'took' time of index operations (in ns)

server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,16 @@ protected void enqueueWriteIndexingBuffer(IndexShard shard) {
205205
*/
206206
private boolean writePendingIndexingBuffers() {
207207
boolean wrotePendingIndexingBuffer = false;
208+
long startTime;
209+
long took;
208210
for (IndexShard shard = pendingWriteIndexingBufferQueue.pollFirst(); shard != null; shard = pendingWriteIndexingBufferQueue
209211
.pollFirst()) {
210212
// Remove the shard from the set first, so that multiple threads can run writeIndexingBuffer concurrently on the same shard.
211213
pendingWriteIndexingBufferSet.remove(shard);
214+
startTime = System.nanoTime();
212215
shard.writeIndexingBuffer();
216+
took = System.nanoTime() - startTime;
217+
shard.writeIndexBuffersOnIndexThreads(took);
213218
wrotePendingIndexingBuffer = true;
214219
}
215220
return wrotePendingIndexingBuffer;
@@ -258,6 +263,7 @@ private void postOperation(ShardId shardId, Engine.Operation operation, Engine.R
258263
// be reclaimed rapidly. This has the downside of increasing the latency of _bulk requests though. Lucene does the same thing in
259264
// DocumentsWriter#postUpdate, flushing a segment because the size limit on the RAM buffer was reached happens on the call to
260265
// IndexWriter#addDocument.
266+
261267
while (writePendingIndexingBuffers()) {
262268
// If we just wrote segments, then run the checker again if not already running to check if we released enough memory.
263269
if (statusChecker.tryRun() == false) {

0 commit comments

Comments
 (0)