Skip to content

Commit bb71dd3

Browse files
Make the add method treat the new write load the same as the old one, i.e. average rather than sum - and log it
1 parent 7353654 commit bb71dd3

File tree

2 files changed

+171
-13
lines changed

2 files changed

+171
-13
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.index.shard;
1111

12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
1214
import org.elasticsearch.TransportVersion;
1315
import org.elasticsearch.TransportVersions;
1416
import org.elasticsearch.common.io.stream.StreamInput;
@@ -29,6 +31,8 @@
2931

3032
public class IndexingStats implements Writeable, ToXContentFragment {
3133

34+
private static final Logger logger = LogManager.getLogger(IndexingStats.class);
35+
3236
public static class Stats implements Writeable, ToXContentFragment {
3337
private static final TransportVersion WRITE_LOAD_AVG_SUPPORTED_VERSION = TransportVersions.V_8_6_0;
3438

@@ -99,15 +103,10 @@ public Stats(
99103
this.noopUpdateCount = noopUpdateCount;
100104
this.isThrottled = isThrottled;
101105
this.throttleTimeInMillis = throttleTimeInMillis;
102-
103-
// We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats.
104-
// N.B. In add(Stats) we sum both of these, so getWriteLoad() will return the ratio of the sums, which is a weighted average of
105-
// the ratios we would get for each shard.
106+
// We store the raw unweighted write load values in order to avoid losing precision when we combine the shard stats
106107
this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos;
107108
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
108-
109-
// We store the exponentially weighted write load value as a double. N.B. In add(Stats) we add these, and getRecentWriteLoad()
110-
// will return that sum.
109+
// We store the weighted write load as a double because the calculation is inherently floating point
111110
this.recentIndexingLoad = recentIndexingLoad;
112111
}
113112

@@ -127,9 +126,34 @@ public void add(Stats stats) {
127126
if (isThrottled != stats.isThrottled) {
128127
isThrottled = true; // When combining if one is throttled set result to throttled.
129128
}
129+
// TODO(pete): Remove logging of sums
130+
long tmpNum = totalIndexingTimeSinceShardStartedInNanos;
131+
long tmpDen = totalActiveTimeInNanos;
132+
double tmpWgt = recentIndexingLoad;
133+
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
130134
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
131135
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
132-
recentIndexingLoad += stats.recentIndexingLoad;
136+
// We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating
137+
// formula for a weighted mean:
138+
if (totalActiveTimeInNanos > 0) {
139+
recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos
140+
/ totalActiveTimeInNanos;
141+
}
142+
logger.info(
143+
"***** SUM UNWEIGHTED ({} / {} = {}) + ({} / {} = {}) = ({} + {} = {}) --- WEIGHTED {} + {} = {}",
144+
tmpNum * 1.0e6,
145+
tmpDen * 1.0e6,
146+
1.0 * tmpNum / tmpDen,
147+
stats.totalIndexingTimeSinceShardStartedInNanos * 1.0e6,
148+
stats.totalActiveTimeInNanos * 1.0e6,
149+
1.0 * stats.totalIndexingTimeSinceShardStartedInNanos / stats.totalActiveTimeInNanos,
150+
totalIndexingTimeSinceShardStartedInNanos * 1.0e6,
151+
totalActiveTimeInNanos * 1.0e6,
152+
1.0 * totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos,
153+
tmpWgt,
154+
stats.recentIndexingLoad,
155+
recentIndexingLoad
156+
);
133157
}
134158

135159
/**
@@ -212,9 +236,8 @@ public long getNoopUpdateCount() {
212236
* <p>If this {@link Stats} instance represents a single shard, this is ratio of the sum of the time taken by every index operations
213237
* since the shard started to the elapsed time since the shard started.
214238
*
215-
* <p>If this {@link Stats} instance represents multiple shards, this is the <b>average</b> that ratio for each shard, weighted by
216-
* the elapsed time for each shard. N.B. This is a different behaviour to the {@link #getRecentWriteLoad()} method, which returns a
217-
* sum over the shards.
239+
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that ratio for each shard, weighted by
240+
* the elapsed time for each shard.
218241
*/
219242
// TODO(pete): See which callers of this should be changed to use getRecentLoad(). Make sure that they are single-shard!
220243
public double getWriteLoad() {
@@ -227,8 +250,8 @@ public double getWriteLoad() {
227250
* <p>If this {@link Stats} instance represents a single shard, this is an Exponentially Weighted Moving Rate based on the time
228251
* taken by indexing operations in this shard since the shard started.
229252
*
230-
* <p>If this {@link Stats} instance represents multiple shards, this is the <b>sum</b> that rate for each shard. N.B. This is a
231-
* different behaviour to the {@link #getWriteLoad()} method, which returns an average over the shards.
253+
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that ratio for each shard, weighted by
254+
* the elapsed time for each shard.
232255
*/
233256
public double getRecentWriteLoad() {
234257
return recentIndexingLoad;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.test.ESTestCase;
13+
14+
import static org.hamcrest.Matchers.closeTo;
15+
import static org.hamcrest.Matchers.equalTo;
16+
import static org.hamcrest.Matchers.is;
17+
18+
public class IndexingStatsTests extends ESTestCase {
19+
20+
private static final double DOUBLE_TOLERANCE = 1.0e-10;
21+
22+
public void testStatsGetWriteLoad() {
23+
IndexingStats.Stats stats = new IndexingStats.Stats(
24+
1,
25+
2,
26+
3,
27+
4,
28+
5,
29+
6,
30+
7,
31+
8,
32+
9,
33+
false,
34+
10,
35+
1_800_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1.8sec
36+
3_000_000_000L, // totalActiveTimeInNanos - 3sec
37+
0.1357 // recentWriteLoad
38+
);
39+
assertThat(stats.getWriteLoad(), closeTo(0.6, DOUBLE_TOLERANCE));
40+
}
41+
42+
public void testStatsAdd_indexCount() {
43+
IndexingStats.Stats stats1 = new IndexingStats.Stats(
44+
1001L, // indexCount
45+
2,
46+
3,
47+
4,
48+
5,
49+
6,
50+
7,
51+
8,
52+
9,
53+
false,
54+
10,
55+
11,
56+
12,
57+
0.1357
58+
);
59+
IndexingStats.Stats stats2 = new IndexingStats.Stats(
60+
2001L, // indexCount
61+
2,
62+
3,
63+
4,
64+
5,
65+
6,
66+
7,
67+
8,
68+
9,
69+
false, // isThrottled
70+
10,
71+
11,
72+
12,
73+
0.1357
74+
);
75+
IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2);
76+
assertThat(statsAgg.getIndexCount(), equalTo(1001L + 2001L));
77+
}
78+
79+
public void testStatsAdd_throttled() {
80+
IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 12, 0.1357);
81+
IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 12, 0.1357);
82+
assertThat(sumOfStats(statsFalse, statsFalse).isThrottled(), is(false));
83+
assertThat(sumOfStats(statsFalse, statsTrue).isThrottled(), is(true));
84+
assertThat(sumOfStats(statsTrue, statsFalse).isThrottled(), is(true));
85+
assertThat(sumOfStats(statsTrue, statsTrue).isThrottled(), is(true));
86+
}
87+
88+
public void testStatsAdd_writeLoads() {
89+
IndexingStats.Stats stats1 = new IndexingStats.Stats(
90+
1,
91+
2,
92+
3,
93+
4,
94+
5,
95+
6,
96+
7,
97+
8,
98+
9,
99+
false,
100+
10,
101+
1_000_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1sec
102+
2_000_000_000L, // totalActiveTimeInNanos - 2sec
103+
0.1357 // recentWriteLoad
104+
);
105+
IndexingStats.Stats stats2 = new IndexingStats.Stats(
106+
2,
107+
2,
108+
3,
109+
4,
110+
5,
111+
6,
112+
7,
113+
8,
114+
9,
115+
false, // isThrottled
116+
10,
117+
2_100_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 2.1sec
118+
3_000_000_000L, // totalActiveTimeInNanos - 3sec
119+
0.2468 // recentWriteLoad
120+
);
121+
IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2);
122+
// The unweighted write loads for the two shards are 0.5 (1sec / 2sec) and 0.7 (2.1sec / 3sec) respectively.
123+
// The aggregated value should be the average weighted by the times, i.e. by 2sec and 3sec, giving weights of 0.4 and 0.6.
124+
assertThat(statsAgg.getWriteLoad(), closeTo(0.4 * 0.5 + 0.6 * 0.7, DOUBLE_TOLERANCE));
125+
// The aggregated value for the recent write load should be the average with the same weights.
126+
assertThat(statsAgg.getRecentWriteLoad(), closeTo(0.4 * 0.1357 + 0.6 * 0.2468, DOUBLE_TOLERANCE));
127+
}
128+
129+
private static IndexingStats.Stats sumOfStats(IndexingStats.Stats stats1, IndexingStats.Stats stats2) {
130+
IndexingStats.Stats statsAgg = new IndexingStats.Stats();
131+
statsAgg.add(stats1);
132+
statsAgg.add(stats2);
133+
return statsAgg;
134+
}
135+
}

0 commit comments

Comments
 (0)