Skip to content

Commit 78f521f

Browse files
committed
Commit
1 parent 4d57998 commit 78f521f

File tree

12 files changed

+42
-8
lines changed

12 files changed

+42
-8
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,9 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
530+
stats.indexing = new IndexingStats(
531+
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, targetWriteLoad, 1, 0.123, 0.234)
532+
);
531533
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532534
}
533535

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
308308
private final LongSupplier relativeTimeInNanosSupplier;
309309
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
310310
private volatile long indexingTimeBeforeShardStartedInNanos;
311+
private volatile long indexingLoadBeforeShardStartedInNanos;
311312
private volatile double recentIndexingLoadAtShardStarted;
312313
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
313314

@@ -569,6 +570,7 @@ public void updateShardState(
569570
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
570571
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
571572
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
573+
indexingLoadBeforeShardStartedInNanos = internalIndexingStats.totalIndexingLoadInNanos();
572574
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
573575
} else if (currentRouting.primary()
574576
&& currentRouting.relocating()
@@ -1401,6 +1403,7 @@ public IndexingStats indexingStats() {
14011403
throttled,
14021404
throttleTimeInMillis,
14031405
indexingTimeBeforeShardStartedInNanos,
1406+
indexingLoadBeforeShardStartedInNanos,
14041407
timeSinceShardStartedInNanos,
14051408
currentTimeInNanos,
14061409
recentIndexingLoadAtShardStarted

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static class Stats implements Writeable, ToXContentFragment {
4545
private long throttleTimeInMillis;
4646
private boolean isThrottled;
4747
private long totalIndexingTimeSinceShardStartedInNanos;
48+
private long totalIndexingLoadSinceShardStartedInNanos;
4849
private long totalActiveTimeInNanos;
4950
private double recentIndexingLoad;
5051
private double peakIndexingLoad;
@@ -75,7 +76,7 @@ public Stats(StreamInput in) throws IOException {
7576
// When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the
7677
// unweighted write load, rather that assuming zero load:
7778
recentIndexingLoad = totalActiveTimeInNanos > 0
78-
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
79+
? (double) totalIndexingLoadSinceShardStartedInNanos / totalActiveTimeInNanos
7980
: 0;
8081
}
8182
if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
@@ -102,6 +103,7 @@ public Stats(
102103
boolean isThrottled,
103104
long throttleTimeInMillis,
104105
long totalIndexingTimeSinceShardStartedInNanos,
106+
long totalIndexingLoadSinceShardStartedInNanos,
105107
long totalActiveTimeInNanos,
106108
double recentIndexingLoad,
107109
double peakIndexingLoad
@@ -119,6 +121,7 @@ public Stats(
119121
this.throttleTimeInMillis = throttleTimeInMillis;
120122
// We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats
121123
this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos;
124+
this.totalIndexingLoadSinceShardStartedInNanos = totalIndexingLoadSinceShardStartedInNanos;
122125
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
123126
// We store the weighted write load as a double because the calculation is inherently floating point
124127
this.recentIndexingLoad = recentIndexingLoad;
@@ -141,8 +144,9 @@ public void add(Stats stats) {
141144
if (isThrottled != stats.isThrottled) {
142145
isThrottled = true; // When combining if one is throttled set result to throttled.
143146
}
144-
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
145147
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
148+
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
149+
totalIndexingLoadSinceShardStartedInNanos += stats.totalIndexingLoadSinceShardStartedInNanos;
146150
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
147151
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
148152
// so we use the updating formula for a weighted mean:
@@ -237,7 +241,7 @@ public long getNoopUpdateCount() {
237241
* the elapsed time for each shard.
238242
*/
239243
public double getWriteLoad() {
240-
return totalActiveTimeInNanos > 0 ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
244+
return totalActiveTimeInNanos > 0 ? (double) totalIndexingLoadSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
241245
}
242246

243247
/**
@@ -338,6 +342,7 @@ public boolean equals(Object o) {
338342
&& isThrottled == that.isThrottled
339343
&& throttleTimeInMillis == that.throttleTimeInMillis
340344
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
345+
&& totalIndexingLoadSinceShardStartedInNanos == that.totalIndexingLoadSinceShardStartedInNanos
341346
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
342347
&& recentIndexingLoad == that.recentIndexingLoad
343348
&& peakIndexingLoad == that.peakIndexingLoad;
@@ -358,6 +363,7 @@ public int hashCode() {
358363
isThrottled,
359364
throttleTimeInMillis,
360365
totalIndexingTimeSinceShardStartedInNanos,
366+
totalIndexingLoadSinceShardStartedInNanos,
361367
totalActiveTimeInNanos
362368
);
363369
}

server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.concurrent.atomic.LongAdder;
2526
import java.util.function.LongSupplier;
2627

2728
import static org.elasticsearch.core.TimeValue.timeValueNanos;
@@ -51,6 +52,7 @@ IndexingStats stats(
5152
boolean isThrottled,
5253
long currentThrottleInMillis,
5354
long indexingTimeBeforeShardStartedInNanos,
55+
long indexingLoadBeforeShardStartedInNanos,
5456
long timeSinceShardStartedInNanos,
5557
long currentTimeInNanos,
5658
double recentIndexingLoadAtShardStarted
@@ -59,6 +61,7 @@ IndexingStats stats(
5961
isThrottled,
6062
currentThrottleInMillis,
6163
indexingTimeBeforeShardStartedInNanos,
64+
indexingLoadBeforeShardStartedInNanos,
6265
timeSinceShardStartedInNanos,
6366
currentTimeInNanos,
6467
recentIndexingLoadAtShardStarted
@@ -70,6 +73,10 @@ long totalIndexingTimeInNanos() {
7073
return totalStats.indexMetric.sum();
7174
}
7275

76+
long totalIndexingLoadInNanos() {
77+
return totalStats.indexMetric.sum() + totalStats.writeIndexingBufferTime.sum();
78+
}
79+
7380
/**
7481
* Returns an exponentially-weighted moving rate which measures the indexing load, favoring more recent load.
7582
*/
@@ -160,7 +167,7 @@ void noopUpdate() {
160167
* @see org.elasticsearch.indices.IndexingMemoryController
161168
*/
162169
void writeIndexBuffers(long took) {
163-
totalStats.indexMetric.inc(took);
170+
totalStats.writeIndexingBufferTime.add(took);
164171
totalStats.recentIndexMetric.addIncrement(took, relativeTimeInNanosSupplier.getAsLong());
165172
}
166173

@@ -174,6 +181,7 @@ static class StatsHolder {
174181
private final CounterMetric indexFailedDueToVersionConflicts = new CounterMetric();
175182
private final CounterMetric deleteCurrent = new CounterMetric();
176183
private final CounterMetric noopUpdates = new CounterMetric();
184+
private final LongAdder writeIndexingBufferTime = new LongAdder();
177185

178186
StatsHolder(long startTimeInNanos, TimeValue recentWriteLoadHalfLife) {
179187
double lambdaInInverseNanos = Math.log(2.0) / recentWriteLoadHalfLife.nanos();
@@ -190,12 +198,15 @@ IndexingStats.Stats stats(
190198
boolean isThrottled,
191199
long currentThrottleMillis,
192200
long indexingTimeBeforeShardStartedInNanos,
201+
long indexingLoadBeforeShardStartedInNanos,
193202
long timeSinceShardStartedInNanos,
194203
long currentTimeInNanos,
195204
double recentIndexingLoadAtShardStarted
196205
) {
197206
final long totalIndexingTimeInNanos = indexMetric.sum();
198207
final long totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos;
208+
final long totalIndexingLoadInNanos = totalIndexingTimeInNanos + writeIndexingBufferTime.sum();
209+
final long totalIndexingLoadSinceShardStartedInNanos = totalIndexingLoadInNanos - indexingLoadBeforeShardStartedInNanos;
199210
final double recentIndexingLoadSinceShardStarted = recentIndexMetric.calculateRateSince(
200211
currentTimeInNanos,
201212
recentIndexMetric.getRate(currentTimeInNanos),
@@ -229,6 +240,7 @@ IndexingStats.Stats stats(
229240
isThrottled,
230241
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
231242
totalIndexingTimeSinceShardStartedInNanos,
243+
totalIndexingLoadSinceShardStartedInNanos,
232244
timeSinceShardStartedInNanos,
233245
recentIndexingLoadSinceShardStarted,
234246
peakIndexingLoad

server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ private static CommonStats createShardLevelCommonStats() {
593593
++iota,
594594
++iota,
595595
++iota,
596+
++iota,
596597
++iota
597598
);
598599
indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));

server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,6 +1066,7 @@ private static IndexingStats createIndexingStats(double indexingLoad, double rec
10661066
false,
10671067
0,
10681068
totalIndexingTimeSinceShardStartedInNanos,
1069+
totalIndexingTimeSinceShardStartedInNanos,
10691070
totalActiveTimeInNanos,
10701071
recentIndexingLoad,
10711072
peakIndexingLoad

server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ private ShardStats createShardStats(
126126
false,
127127
0,
128128
totalIndexingTimeSinceShardStartedInNanos,
129+
totalIndexingTimeSinceShardStartedInNanos,
129130
totalActiveTimeInNanos,
130131
0.0,
131132
0.0

server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public void testStatsGetWriteLoad() {
3333
false,
3434
10,
3535
1_800_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1.8sec
36+
1_800_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1.8sec
3637
3_000_000_000L, // totalActiveTimeInNanos - 3sec
3738
0.1357,
3839
0.2468
@@ -55,6 +56,7 @@ public void testStatsAdd_indexCount() {
5556
false,
5657
10,
5758
11,
59+
11,
5860
12,
5961
0.1357,
6062
0.2468
@@ -72,6 +74,7 @@ public void testStatsAdd_indexCount() {
7274
false,
7375
10,
7476
11,
77+
11,
7578
12,
7679
0.1357,
7780
0.2468
@@ -81,8 +84,8 @@ public void testStatsAdd_indexCount() {
8184
}
8285

8386
public void testStatsAdd_throttled() {
84-
IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 12, 0.1357, 0.2468);
85-
IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 12, 0.1357, 0.2468);
87+
IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 11, 12, 0.1357, 0.2468);
88+
IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 11, 12, 0.1357, 0.2468);
8689
assertThat(sumOfStats(statsFalse, statsFalse).isThrottled(), is(false));
8790
assertThat(sumOfStats(statsFalse, statsTrue).isThrottled(), is(true));
8891
assertThat(sumOfStats(statsTrue, statsFalse).isThrottled(), is(true));
@@ -103,6 +106,7 @@ public void testStatsAdd_writeLoads() {
103106
false,
104107
10,
105108
1_000_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1sec
109+
1_000_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1sec
106110
2_000_000_000L, // totalActiveTimeInNanos - 2sec
107111
0.1357, // recentWriteLoad
108112
0.3579 // peakWriteLoad
@@ -120,6 +124,7 @@ public void testStatsAdd_writeLoads() {
120124
false,
121125
10,
122126
2_100_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 2.1sec
127+
2_100_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 2.1sec
123128
3_000_000_000L, // totalActiveTimeInNanos - 3sec
124129
0.2468, // recentWriteLoad
125130
0.5791 // peakWriteLoad

server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ private void mockShardStats(boolean includeCommonStats) {
153153
randomNonNegativeLong(),
154154
randomNonNegativeLong(),
155155
randomNonNegativeLong(),
156+
randomNonNegativeLong(),
156157
randomDoubleBetween(0.0, 1.0, true),
157158
randomDoubleBetween(0.0, 1.0, true)
158159
)

x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ private static CommonStats mockCommonStats() {
404404
no,
405405
no,
406406
no,
407+
no,
407408
no
408409
);
409410
commonStats.getIndexing().add(new IndexingStats(indexingStats));

0 commit comments

Comments
 (0)