Skip to content

Commit 084542a

Browse files
authored
Account for time taken to write index buffers in IndexingMemoryController (#126786)
This PR adds to the indexing write load, the time taken to flush write indexing buffers using the indexing threads (this is done here to push back on indexing) This changes the semantics of InternalIndexingStats#recentIndexMetric and InternalIndexingStats#peakIndexMetric to more accurately account for load on the indexing thread. Address ES-11356.
1 parent 4d907ce commit 084542a

File tree

16 files changed

+128
-10
lines changed

16 files changed

+128
-10
lines changed

docs/changelog/126786.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126786
2+
summary: Account for time taken to write index buffers in `IndexingMemoryController`
3+
area: Distributed
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,9 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
530+
stats.indexing = new IndexingStats(
531+
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, targetWriteLoad, 1, 0.123, 0.234)
532+
);
531533
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532534
}
533535

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Optional;
3030

3131
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
32+
import static org.hamcrest.Matchers.greaterThan;
3233
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3334

3435
public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
@@ -37,7 +38,9 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
3738
protected Settings nodeSettings() {
3839
return Settings.builder()
3940
.put(super.nodeSettings())
40-
// small indexing buffer so that we can trigger refresh after buffering 100 deletes
41+
// small indexing buffer so that
42+
// 1. We can trigger refresh after buffering 100 deletes
43+
// 2. Indexing memory Controller writes indexing buffers in sync with indexing on the indexing thread
4144
.put("indices.memory.index_buffer_size", "1kb")
4245
.build();
4346
}
@@ -111,4 +114,22 @@ public void testDeletesAloneCanTriggerRefresh() throws Exception {
111114
}
112115
assertThat(shard.getEngineOrNull().getIndexBufferRAMBytesUsed(), lessThanOrEqualTo(ByteSizeUnit.KB.toBytes(1)));
113116
}
117+
118+
/* When there is memory pressure, we write indexing buffers to disk on the same thread as the indexing thread,
119+
* @see org.elasticsearch.indices.IndexingMemoryController.
120+
* This test verifies that we update the stats that capture the combined time for indexing + writing the
121+
* indexing buffers.
122+
* Note that the small indices.memory.index_buffer_size setting is required for this test to work.
123+
*/
124+
public void testIndexingUpdatesRelevantStats() throws Exception {
125+
IndexService indexService = createIndex("index", indexSettings(1, 0).put("index.refresh_interval", -1).build());
126+
IndexShard shard = indexService.getShard(0);
127+
prepareIndex("index").setSource("field", randomUnicodeOfCodepointLengthBetween(10, 25)).get();
128+
// Check that
129+
assertThat(shard.indexingStats().getTotal().getTotalIndexingExecutionTimeInMillis(), greaterThan(0L));
130+
assertThat(
131+
shard.indexingStats().getTotal().getTotalIndexingExecutionTimeInMillis(),
132+
greaterThan(shard.indexingStats().getTotal().getIndexTime().getMillis())
133+
);
134+
}
114135
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ static TransportVersion def(int id) {
234234
public static final TransportVersion AGGREGATE_METRIC_DOUBLE_BLOCK = def(9_067_00_0);
235235
public static final TransportVersion PINNED_RETRIEVER = def(9_068_0_00);
236236
public static final TransportVersion ML_INFERENCE_SAGEMAKER = def(9_069_0_00);
237+
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
237238

238239
/*
239240
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
308308
private final LongSupplier relativeTimeInNanosSupplier;
309309
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
310310
private volatile long indexingTimeBeforeShardStartedInNanos;
311+
private volatile long indexingTaskExecutionTimeBeforeShardStartedInNanos;
311312
private volatile double recentIndexingLoadAtShardStarted;
312313
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
313314

@@ -569,6 +570,7 @@ public void updateShardState(
569570
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
570571
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
571572
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
573+
indexingTaskExecutionTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingExecutionTimeInNanos();
572574
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
573575
} else if (currentRouting.primary()
574576
&& currentRouting.relocating()
@@ -1401,6 +1403,7 @@ public IndexingStats indexingStats() {
14011403
throttled,
14021404
throttleTimeInMillis,
14031405
indexingTimeBeforeShardStartedInNanos,
1406+
indexingTaskExecutionTimeBeforeShardStartedInNanos,
14041407
timeSinceShardStartedInNanos,
14051408
currentTimeInNanos,
14061409
recentIndexingLoadAtShardStarted
@@ -3235,6 +3238,16 @@ public void noopUpdate() {
32353238
internalIndexingStats.noopUpdate();
32363239
}
32373240

3241+
/**
3242+
* Increment relevant stats when indexing buffers are written to disk using indexing threads,
3243+
* in order to apply back-pressure on indexing.
3244+
* @param tookInNanos time it took to write the indexing buffers for this shard (in ns)
3245+
* @see IndexingMemoryController#writePendingIndexingBuffers()
3246+
*/
3247+
public void addWriteIndexBuffersToIndexThreadsTime(long tookInNanos) {
3248+
internalIndexingStats.writeIndexingBuffersTime(tookInNanos);
3249+
}
3250+
32383251
public void maybeCheckIndex() {
32393252
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
32403253
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
2929
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
30+
import static org.elasticsearch.TransportVersions.WRITE_LOAD_INCLUDES_BUFFER_WRITES;
3031

3132
public class IndexingStats implements Writeable, ToXContentFragment {
3233

@@ -45,6 +46,10 @@ public static class Stats implements Writeable, ToXContentFragment {
4546
private long throttleTimeInMillis;
4647
private boolean isThrottled;
4748
private long totalIndexingTimeSinceShardStartedInNanos;
49+
// This is different from totalIndexingTimeSinceShardStartedInNanos, as it also includes the time taken to write indexing buffers
50+
// to disk on the same thread as the indexing thread. This happens when we are running low on memory and want to push
51+
// back on indexing, see IndexingMemoryController#writePendingIndexingBuffers()
52+
private long totalIndexingExecutionTimeSinceShardStartedInNanos;
4853
private long totalActiveTimeInNanos;
4954
private double recentIndexingLoad;
5055
private double peakIndexingLoad;
@@ -87,6 +92,15 @@ public Stats(StreamInput in) throws IOException {
8792
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
8893
: 0;
8994
}
95+
if (in.getTransportVersion().onOrAfter(WRITE_LOAD_INCLUDES_BUFFER_WRITES)) {
96+
totalIndexingExecutionTimeSinceShardStartedInNanos = in.readLong();
97+
} else {
98+
// When getting stats from an older version which doesn't have the more accurate indexing execution time,
99+
// better to fall back to the indexing time, rather that assuming zero load:
100+
totalIndexingExecutionTimeSinceShardStartedInNanos = totalActiveTimeInNanos > 0
101+
? totalIndexingTimeSinceShardStartedInNanos
102+
: 0;
103+
}
90104
}
91105

92106
public Stats(
@@ -102,6 +116,7 @@ public Stats(
102116
boolean isThrottled,
103117
long throttleTimeInMillis,
104118
long totalIndexingTimeSinceShardStartedInNanos,
119+
long totalIndexingExecutionTimeSinceShardStartedInNanos,
105120
long totalActiveTimeInNanos,
106121
double recentIndexingLoad,
107122
double peakIndexingLoad
@@ -119,6 +134,7 @@ public Stats(
119134
this.throttleTimeInMillis = throttleTimeInMillis;
120135
// We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats
121136
this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos;
137+
this.totalIndexingExecutionTimeSinceShardStartedInNanos = totalIndexingExecutionTimeSinceShardStartedInNanos;
122138
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
123139
// We store the weighted write load as a double because the calculation is inherently floating point
124140
this.recentIndexingLoad = recentIndexingLoad;
@@ -141,8 +157,9 @@ public void add(Stats stats) {
141157
if (isThrottled != stats.isThrottled) {
142158
isThrottled = true; // When combining if one is throttled set result to throttled.
143159
}
144-
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
145160
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
161+
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
162+
totalIndexingExecutionTimeSinceShardStartedInNanos += stats.totalIndexingExecutionTimeSinceShardStartedInNanos;
146163
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
147164
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
148165
// so we use the updating formula for a weighted mean:
@@ -237,7 +254,7 @@ public long getNoopUpdateCount() {
237254
* the elapsed time for each shard.
238255
*/
239256
public double getWriteLoad() {
240-
return totalActiveTimeInNanos > 0 ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
257+
return totalActiveTimeInNanos > 0 ? (double) totalIndexingExecutionTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
241258
}
242259

243260
/**
@@ -271,6 +288,13 @@ public long getTotalActiveTimeInMillis() {
271288
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
272289
}
273290

291+
/**
292+
* The total amount of time spend on indexing plus writing indexing buffers.
293+
*/
294+
public long getTotalIndexingExecutionTimeInMillis() {
295+
return TimeUnit.NANOSECONDS.toMillis(totalIndexingExecutionTimeSinceShardStartedInNanos);
296+
}
297+
274298
@Override
275299
public void writeTo(StreamOutput out) throws IOException {
276300
out.writeVLong(indexCount);
@@ -296,6 +320,9 @@ public void writeTo(StreamOutput out) throws IOException {
296320
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
297321
out.writeDouble(peakIndexingLoad);
298322
}
323+
if (out.getTransportVersion().onOrAfter(WRITE_LOAD_INCLUDES_BUFFER_WRITES)) {
324+
out.writeLong(totalIndexingExecutionTimeSinceShardStartedInNanos);
325+
}
299326
}
300327

301328
@Override
@@ -338,6 +365,7 @@ public boolean equals(Object o) {
338365
&& isThrottled == that.isThrottled
339366
&& throttleTimeInMillis == that.throttleTimeInMillis
340367
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
368+
&& totalIndexingExecutionTimeSinceShardStartedInNanos == that.totalIndexingExecutionTimeSinceShardStartedInNanos
341369
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
342370
&& recentIndexingLoad == that.recentIndexingLoad
343371
&& peakIndexingLoad == that.peakIndexingLoad;
@@ -358,6 +386,7 @@ public int hashCode() {
358386
isThrottled,
359387
throttleTimeInMillis,
360388
totalIndexingTimeSinceShardStartedInNanos,
389+
totalIndexingExecutionTimeSinceShardStartedInNanos,
361390
totalActiveTimeInNanos
362391
);
363392
}

server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.concurrent.atomic.LongAdder;
2526
import java.util.function.LongSupplier;
2627

2728
import static org.elasticsearch.core.TimeValue.timeValueNanos;
@@ -51,6 +52,7 @@ IndexingStats stats(
5152
boolean isThrottled,
5253
long currentThrottleInMillis,
5354
long indexingTimeBeforeShardStartedInNanos,
55+
long indexingLoadBeforeShardStartedInNanos,
5456
long timeSinceShardStartedInNanos,
5557
long currentTimeInNanos,
5658
double recentIndexingLoadAtShardStarted
@@ -59,6 +61,7 @@ IndexingStats stats(
5961
isThrottled,
6062
currentThrottleInMillis,
6163
indexingTimeBeforeShardStartedInNanos,
64+
indexingLoadBeforeShardStartedInNanos,
6265
timeSinceShardStartedInNanos,
6366
currentTimeInNanos,
6467
recentIndexingLoadAtShardStarted
@@ -70,6 +73,10 @@ long totalIndexingTimeInNanos() {
7073
return totalStats.indexMetric.sum();
7174
}
7275

76+
long totalIndexingExecutionTimeInNanos() {
77+
return totalStats.indexMetric.sum() + totalStats.writeIndexingBufferTime.sum();
78+
}
79+
7380
/**
7481
* Returns an exponentially-weighted moving rate which measures the indexing load, favoring more recent load.
7582
*/
@@ -153,10 +160,26 @@ void noopUpdate() {
153160
totalStats.noopUpdates.inc();
154161
}
155162

163+
/**
164+
* Increment relevant stats when indexing buffers are written to disk using indexing threads,
165+
* in order to apply back-pressure on indexing.
166+
* @param took time taken to write buffers
167+
* @see org.elasticsearch.indices.IndexingMemoryController
168+
*/
169+
void writeIndexingBuffersTime(long took) {
170+
totalStats.writeIndexingBufferTime.add(took);
171+
totalStats.recentIndexMetric.addIncrement(took, relativeTimeInNanosSupplier.getAsLong());
172+
}
173+
156174
static class StatsHolder {
157-
private final MeanMetric indexMetric = new MeanMetric(); // Used for the count and total 'took' time (in ns) of index operations
158-
private final ExponentiallyWeightedMovingRate recentIndexMetric; // An EWMR of the total 'took' time of index operations (in ns)
159-
private final AtomicReference<Double> peakIndexMetric; // The peak value of the EWMR observed in any stats() call
175+
// Used for the count and total 'took' time (in ns) of index operations
176+
private final MeanMetric indexMetric = new MeanMetric();
177+
// Used for the total time taken to flush indexing buffers to disk (on indexing threads) (in ns)
178+
private final LongAdder writeIndexingBufferTime = new LongAdder();
179+
// An EWMR of the total 'took' time of index operations (indexMetric) plus the writeIndexingBufferTime (in ns)
180+
private final ExponentiallyWeightedMovingRate recentIndexMetric;
181+
// The peak value of the EWMR (recentIndexMetric) observed in any stats() call
182+
private final AtomicReference<Double> peakIndexMetric;
160183
private final MeanMetric deleteMetric = new MeanMetric();
161184
private final CounterMetric indexCurrent = new CounterMetric();
162185
private final CounterMetric indexFailed = new CounterMetric();
@@ -179,12 +202,19 @@ IndexingStats.Stats stats(
179202
boolean isThrottled,
180203
long currentThrottleMillis,
181204
long indexingTimeBeforeShardStartedInNanos,
205+
long indexingLoadBeforeShardStartedInNanos,
182206
long timeSinceShardStartedInNanos,
183207
long currentTimeInNanos,
184208
double recentIndexingLoadAtShardStarted
185209
) {
186210
final long totalIndexingTimeInNanos = indexMetric.sum();
187211
final long totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos;
212+
// This is different from indexing time as it also includes the time taken to write indexing buffers to disk
213+
// on the same thread as the indexing thread. This happens when we are running low on memory and want to push
214+
// back on indexing, see IndexingMemoryController#writePendingIndexingBuffers()
215+
final long totalIndexingExecutionTimeInNanos = totalIndexingTimeInNanos + writeIndexingBufferTime.sum();
216+
final long totalIndexingExecutionTimeSinceShardStartedInNanos = totalIndexingExecutionTimeInNanos
217+
- indexingLoadBeforeShardStartedInNanos;
188218
final double recentIndexingLoadSinceShardStarted = recentIndexMetric.calculateRateSince(
189219
currentTimeInNanos,
190220
recentIndexMetric.getRate(currentTimeInNanos),
@@ -218,6 +248,7 @@ IndexingStats.Stats stats(
218248
isThrottled,
219249
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
220250
totalIndexingTimeSinceShardStartedInNanos,
251+
totalIndexingExecutionTimeSinceShardStartedInNanos,
221252
timeSinceShardStartedInNanos,
222253
recentIndexingLoadSinceShardStarted,
223254
peakIndexingLoad

server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,11 @@ private boolean writePendingIndexingBuffers() {
209209
.pollFirst()) {
210210
// Remove the shard from the set first, so that multiple threads can run writeIndexingBuffer concurrently on the same shard.
211211
pendingWriteIndexingBufferSet.remove(shard);
212+
// Calculate the time taken to write the indexing buffers so it can be accounted for in the index write load
213+
long startTime = System.nanoTime();
212214
shard.writeIndexingBuffer();
215+
long took = System.nanoTime() - startTime;
216+
shard.addWriteIndexBuffersToIndexThreadsTime(took);
213217
wrotePendingIndexingBuffer = true;
214218
}
215219
return wrotePendingIndexingBuffer;
@@ -258,6 +262,7 @@ private void postOperation(ShardId shardId, Engine.Operation operation, Engine.R
258262
// be reclaimed rapidly. This has the downside of increasing the latency of _bulk requests though. Lucene does the same thing in
259263
// DocumentsWriter#postUpdate, flushing a segment because the size limit on the RAM buffer was reached happens on the call to
260264
// IndexWriter#addDocument.
265+
261266
while (writePendingIndexingBuffers()) {
262267
// If we just wrote segments, then run the checker again if not already running to check if we released enough memory.
263268
if (statusChecker.tryRun() == false) {

server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ private static CommonStats createShardLevelCommonStats() {
593593
++iota,
594594
++iota,
595595
++iota,
596+
++iota,
596597
++iota
597598
);
598599
indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));

server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ private static IndexingStats createIndexingStats(double indexingLoad, double rec
10661066
false,
10671067
0,
10681068
totalIndexingTimeSinceShardStartedInNanos,
1069+
totalIndexingTimeSinceShardStartedInNanos,
10691070
totalActiveTimeInNanos,
10701071
recentIndexingLoad,
10711072
peakIndexingLoad

0 commit comments

Comments
 (0)