Skip to content

Commit 93f9850

Browse files
authored
Dynamic shard sizing based on search nodes operating system memory (#24372)
* Dynamic shard sizing based on search nodes operating system memory * add changelog * add data_hot role for Elasticsearch * expose fallback shard size * handle min max shard size config nullability
1 parent c6ca5cb commit 93f9850

File tree

17 files changed

+706
-389
lines changed

17 files changed

+706
-389
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type = "a"
2+
message = "Dynamic shard sizing based on search nodes operating system memory."
3+
4+
issues = ["23947"]
5+
pulls = ["24372"]

graylog-storage-elasticsearch7/src/main/java/org/graylog/storage/elasticsearch7/ClusterAdapterES7.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.base.Strings;
2424
import com.google.common.collect.Lists;
2525
import com.google.common.primitives.Ints;
26+
import jakarta.inject.Inject;
27+
import jakarta.inject.Named;
2628
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
2729
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2830
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
@@ -46,14 +48,12 @@
4648
import org.graylog2.system.stats.elasticsearch.ClusterStats;
4749
import org.graylog2.system.stats.elasticsearch.IndicesStats;
4850
import org.graylog2.system.stats.elasticsearch.NodeInfo;
51+
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
4952
import org.graylog2.system.stats.elasticsearch.NodesStats;
5053
import org.graylog2.system.stats.elasticsearch.ShardStats;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
5356

54-
import jakarta.inject.Inject;
55-
import jakarta.inject.Named;
56-
5757
import java.util.Collection;
5858
import java.util.Iterator;
5959
import java.util.List;
@@ -276,8 +276,9 @@ public Map<String, NodeInfo> nodesInfo() {
276276
final Request request = new Request("GET", "/_nodes");
277277
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Elasticsearch nodes data!");
278278

279-
return toStream(nodesJson.at("/nodes").fields())
280-
.collect(Collectors.toMap(Map.Entry::getKey, o -> createNodeInfo(o.getValue())));
279+
final JsonNode nodes = nodesJson.at("/nodes");
280+
return toStream(nodes.fieldNames())
281+
.collect(Collectors.toMap(name -> name, name -> createNodeInfo(nodes.get(name))));
281282
}
282283

283284
private NodeInfo createNodeInfo(JsonNode nodesJson) {
@@ -289,6 +290,23 @@ private NodeInfo createNodeInfo(JsonNode nodesJson) {
289290
.build();
290291
}
291292

293+
@Override
294+
public Map<String, NodeOSInfo> nodesHostInfo() {
295+
final Request request = new Request("GET", "/_nodes/stats/os");
296+
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Elasticsearch nodes os data!");
297+
298+
final JsonNode nodes = nodesJson.at("/nodes");
299+
return toStream(nodes.fieldNames())
300+
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
301+
}
302+
303+
private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
304+
return new NodeOSInfo(
305+
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
306+
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
307+
);
308+
}
309+
292310
public <T> Stream<T> toStream(Iterator<T> iterator) {
293311
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
294312
}

graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/ClusterAdapterOS2.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.base.Strings;
2424
import com.google.common.collect.Lists;
2525
import com.google.common.primitives.Ints;
26+
import jakarta.inject.Inject;
27+
import jakarta.inject.Named;
2628
import org.graylog.shaded.opensearch2.org.opensearch.OpenSearchException;
2729
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
2830
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -47,14 +49,12 @@
4749
import org.graylog2.system.stats.elasticsearch.ClusterStats;
4850
import org.graylog2.system.stats.elasticsearch.IndicesStats;
4951
import org.graylog2.system.stats.elasticsearch.NodeInfo;
52+
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
5053
import org.graylog2.system.stats.elasticsearch.NodesStats;
5154
import org.graylog2.system.stats.elasticsearch.ShardStats;
5255
import org.slf4j.Logger;
5356
import org.slf4j.LoggerFactory;
5457

55-
import jakarta.inject.Inject;
56-
import jakarta.inject.Named;
57-
5858
import java.util.Collection;
5959
import java.util.Iterator;
6060
import java.util.List;
@@ -285,8 +285,9 @@ public Map<String, NodeInfo> nodesInfo() {
285285
final Request request = new Request("GET", "/_nodes");
286286
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Opensearch nodes data!");
287287

288-
return toStream(nodesJson.at("/nodes").fields())
289-
.collect(Collectors.toMap(Map.Entry::getKey, o -> createNodeInfo(o.getValue())));
288+
final JsonNode nodes = nodesJson.at("/nodes");
289+
return toStream(nodes.fieldNames())
290+
.collect(Collectors.toMap(name -> name, name -> createNodeInfo(nodes.get(name))));
290291
}
291292

292293
private NodeInfo createNodeInfo(JsonNode nodesJson) {
@@ -298,6 +299,23 @@ private NodeInfo createNodeInfo(JsonNode nodesJson) {
298299
.build();
299300
}
300301

302+
@Override
303+
public Map<String, NodeOSInfo> nodesHostInfo() {
304+
final Request request = new Request("GET", "/_nodes/stats/os");
305+
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Opensearch nodes os data!");
306+
307+
final JsonNode nodes = nodesJson.at("/nodes");
308+
return toStream(nodes.fieldNames())
309+
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
310+
}
311+
312+
private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
313+
return new NodeOSInfo(
314+
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
315+
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
316+
);
317+
}
318+
301319
public <T> Stream<T> toStream(Iterator<T> iterator) {
302320
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
303321
}

graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/ClusterAdapterOS.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.graylog2.rest.models.system.indexer.responses.ClusterHealth;
3737
import org.graylog2.system.stats.elasticsearch.ClusterStats;
3838
import org.graylog2.system.stats.elasticsearch.IndicesStats;
39+
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
3940
import org.graylog2.system.stats.elasticsearch.NodesStats;
4041
import org.graylog2.system.stats.elasticsearch.ShardStats;
4142
import org.opensearch.client.json.JsonData;
@@ -327,6 +328,25 @@ private org.graylog2.system.stats.elasticsearch.NodeInfo createNodeInfo(JsonNode
327328
.build();
328329
}
329330

331+
@Override
332+
public Map<String, NodeOSInfo> nodesHostInfo() {
333+
Request request = Requests.builder()
334+
.endpoint("/_nodes/stats/os")
335+
.method("GET")
336+
.build();
337+
JsonNode json = jsonApi.performRequest(request, "Couldn't read Opensearch nodes os data!");
338+
JsonNode nodes = json.at("/nodes");
339+
return toStream(nodes.fieldNames())
340+
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
341+
}
342+
343+
private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
344+
return new NodeOSInfo(
345+
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
346+
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
347+
);
348+
}
349+
330350
public <T> Stream<T> toStream(Iterator<T> iterator) {
331351
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
332352
}
@@ -386,8 +406,8 @@ public Optional<HealthStatus> deflectorHealth(Collection<String> indices) {
386406

387407
final Set<IndicesRecord> indexSummaries = opensearchClient.execute(() ->
388408
catClient.indices().valueBody()
389-
.stream()
390-
.filter(indexSummary -> mappedIndices.contains(indexSummary.index()))
409+
.stream()
410+
.filter(indexSummary -> mappedIndices.contains(indexSummary.index()))
391411
.collect(Collectors.toSet()),
392412
"Unable to retrieve indices");
393413

graylog2-server/src/main/java/org/graylog2/configuration/ElasticsearchConfiguration.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
2828
import com.github.joschi.jadconfig.validators.PositiveLongValidator;
2929
import com.github.joschi.jadconfig.validators.StringNotBlankValidator;
30+
import com.google.common.annotations.VisibleForTesting;
3031
import org.graylog2.configuration.validators.RetentionStrategyValidator;
3132
import org.graylog2.configuration.validators.RotationStrategyValidator;
3233
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategy;
@@ -57,6 +58,7 @@ public class ElasticsearchConfiguration {
5758
public static final String TIME_SIZE_OPTIMIZING_RETENTION_FIXED_LEEWAY = "time_size_optimizing_retention_fixed_leeway";
5859
public static final String TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE = "time_size_optimizing_rotation_min_shard_size";
5960
public static final String TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE = "time_size_optimizing_rotation_max_shard_size";
61+
public static final String TIME_SIZE_OPTIMIZING_ROTATION_OS_MEMORY_FACTOR = "time_size_optimizing_rotation_os_memory_factor";
6062
public static final String TIME_SIZE_OPTIMIZING_ROTATION_PERIOD = "time_size_optimizing_rotation_period";
6163
public static final String ALLOW_FLEXIBLE_RETENTION_PERIOD = "allow_flexible_retention_period";
6264
public static final String INDEX_FIELD_TYPE_REFRESH_INTERVAL = "index_field_type_refresh_interval";
@@ -133,10 +135,13 @@ public class ElasticsearchConfiguration {
133135
private Period timeSizeOptimizingRotationPeriod = Period.days(1);
134136

135137
@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE)
136-
private Size timeSizeOptimizingRotationMinShardSize = Size.gigabytes(20);
138+
private Size timeSizeOptimizingRotationMinShardSize;
137139

138140
@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE)
139-
private Size timeSizeOptimizingRotationMaxShardSize = Size.gigabytes(20);
141+
private Size timeSizeOptimizingRotationMaxShardSize;
142+
143+
@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_OS_MEMORY_FACTOR)
144+
private double timeSizeOptimizingRotationOSMemoryFactor = 0.6;
140145

141146
@Parameter(value = TIME_SIZE_OPTIMIZING_RETENTION_MIN_LIFETIME)
142147
private Period timeSizeOptimizingRetentionMinLifeTime = IndexLifetimeConfig.DEFAULT_LIFETIME_MIN;
@@ -268,14 +273,20 @@ public Period getTimeSizeOptimizingRotationPeriod() {
268273
return timeSizeOptimizingRotationPeriod;
269274
}
270275

276+
@Nullable
271277
public Size getTimeSizeOptimizingRotationMinShardSize() {
272278
return timeSizeOptimizingRotationMinShardSize;
273279
}
274280

281+
@Nullable
275282
public Size getTimeSizeOptimizingRotationMaxShardSize() {
276283
return timeSizeOptimizingRotationMaxShardSize;
277284
}
278285

286+
public double getTimeSizeOptimizingRotationOSMemoryFactor() {
287+
return timeSizeOptimizingRotationOSMemoryFactor;
288+
}
289+
279290
public Period getTimeSizeOptimizingRetentionMinLifeTime() {
280291
return timeSizeOptimizingRetentionMinLifeTime;
281292
}
@@ -320,7 +331,8 @@ public boolean allowFlexibleRetentionPeriod() {
320331
@ValidatorMethod
321332
@SuppressWarnings("unused")
322333
public void validateTimeSizeOptimizingRotation() throws ValidationException {
323-
if (getTimeSizeOptimizingRotationMaxShardSize().compareTo(getTimeSizeOptimizingRotationMinShardSize()) < 0) {
334+
if (getTimeSizeOptimizingRotationMaxShardSize() != null && getTimeSizeOptimizingRotationMinShardSize() != null &&
335+
getTimeSizeOptimizingRotationMaxShardSize().compareTo(getTimeSizeOptimizingRotationMinShardSize()) < 0) {
324336
throw new ValidationException(f("\"%s=%s\" cannot be larger than \"%s=%s\"",
325337
TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE, getTimeSizeOptimizingRotationMinShardSize(),
326338
TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE, getTimeSizeOptimizingRotationMaxShardSize())
@@ -357,4 +369,14 @@ TIME_SIZE_OPTIMIZING_RETENTION_MIN_LIFETIME, getTimeSizeOptimizingRetentionMinLi
357369
);
358370
}
359371
}
372+
373+
@VisibleForTesting
374+
public void setTimeSizeOptimizingRotationMinShardSize(Size timeSizeOptimizingRotationMinShardSize) {
375+
this.timeSizeOptimizingRotationMinShardSize = timeSizeOptimizingRotationMinShardSize;
376+
}
377+
378+
@VisibleForTesting
379+
public void setTimeSizeOptimizingRotationMaxShardSize(Size timeSizeOptimizingRotationMaxShardSize) {
380+
this.timeSizeOptimizingRotationMaxShardSize = timeSizeOptimizingRotationMaxShardSize;
381+
}
360382
}

graylog2-server/src/main/java/org/graylog2/datatiering/rotation/DataTierRotation.java

Lines changed: 6 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -16,116 +16,38 @@
1616
*/
1717
package org.graylog2.datatiering.rotation;
1818

19-
import com.github.joschi.jadconfig.util.Size;
2019
import com.google.inject.assistedinject.Assisted;
2120
import com.google.inject.assistedinject.AssistedInject;
22-
import org.graylog.scheduler.clock.JobSchedulerClock;
23-
import org.graylog2.configuration.ElasticsearchConfiguration;
2421
import org.graylog2.indexer.indexset.IndexSet;
25-
import org.graylog2.indexer.indices.Indices;
26-
import org.graylog2.indexer.retention.strategies.NoopRetentionStrategyConfig;
2722
import org.graylog2.indexer.rotation.common.IndexRotator;
2823
import org.graylog2.indexer.rotation.tso.IndexLifetimeConfig;
29-
import org.joda.time.DateTime;
30-
import org.joda.time.Period;
24+
import org.graylog2.indexer.rotation.tso.TimeSizeOptimizingCalculator;
3125

3226
import javax.annotation.Nonnull;
33-
import java.time.Duration;
34-
import java.time.Instant;
35-
36-
import static org.graylog2.shared.utilities.StringUtils.f;
37-
import static org.graylog2.shared.utilities.StringUtils.humanReadableByteCount;
3827

3928
public class DataTierRotation {
4029

41-
private final JobSchedulerClock clock;
42-
private final org.joda.time.Period rotationPeriod;
43-
44-
private final Indices indices;
45-
4630
private final IndexRotator indexRotator;
47-
48-
private final Size maxShardSize;
49-
private final Size minShardSize;
50-
31+
private final TimeSizeOptimizingCalculator timeSizeOptimizingCalculator;
5132
private final IndexLifetimeConfig indexLifetimeConfig;
5233

5334
@AssistedInject
54-
public DataTierRotation(Indices indices,
55-
ElasticsearchConfiguration elasticsearchConfiguration,
56-
IndexRotator indexRotator,
57-
JobSchedulerClock clock,
35+
public DataTierRotation(IndexRotator indexRotator,
36+
TimeSizeOptimizingCalculator timeSizeOptimizingCalculator,
5837
@Assisted IndexLifetimeConfig indexLifetimeConfig) {
5938

60-
this.indices = indices;
6139
this.indexRotator = indexRotator;
62-
this.clock = clock;
63-
this.rotationPeriod = elasticsearchConfiguration.getTimeSizeOptimizingRotationPeriod();
64-
this.maxShardSize = elasticsearchConfiguration.getTimeSizeOptimizingRotationMaxShardSize();
65-
this.minShardSize = elasticsearchConfiguration.getTimeSizeOptimizingRotationMinShardSize();
40+
this.timeSizeOptimizingCalculator = timeSizeOptimizingCalculator;
6641
this.indexLifetimeConfig = indexLifetimeConfig;
6742
}
6843

6944
public void rotate(IndexSet indexSet) {
7045
indexRotator.rotate(indexSet, this::shouldRotate);
7146
}
7247

73-
private IndexRotator.Result createResult(boolean shouldRotate, String message) {
74-
return IndexRotator.createResult(shouldRotate, message, this.getClass().getCanonicalName());
75-
}
76-
7748
@Nonnull
7849
private IndexRotator.Result shouldRotate(final String index, IndexSet indexSet) {
79-
final DateTime creationDate = indices.indexCreationDate(index).orElseThrow(() -> new IllegalStateException("No index creation date"));
80-
final Long sizeInBytes = indices.getStoreSizeInBytes(index).orElseThrow(() -> new IllegalStateException("No index size"));
81-
82-
if (indices.numberOfMessages(index) == 0) {
83-
return createResult(false, "Index is empty");
84-
}
85-
86-
final int shards = indexSet.getConfig().shards();
87-
88-
final long maxIndexSize = maxShardSize.toBytes() * shards;
89-
if (sizeInBytes > maxIndexSize) {
90-
return createResult(true,
91-
f("Index size <%s> exceeds maximum size <%s>",
92-
humanReadableByteCount(sizeInBytes), humanReadableByteCount(maxIndexSize)));
93-
}
94-
95-
// If no retention is selected, we have an "indefinite" optimization leeway
96-
if (!(indexSet.getConfig().retentionStrategyConfig() instanceof NoopRetentionStrategyConfig)) {
97-
Period leeWay = indexLifetimeConfig.indexLifetimeMax().minus(indexLifetimeConfig.indexLifetimeMin());
98-
if (indexExceedsLeeWay(creationDate, leeWay)) {
99-
return createResult(true,
100-
f("Index creation date <%s> exceeds optimization leeway <%s>",
101-
creationDate, leeWay));
102-
}
103-
}
104-
105-
final long minIndexSize = minShardSize.toBytes() * shards;
106-
if (indexIsOldEnough(creationDate) && sizeInBytes >= minIndexSize) {
107-
return createResult(true,
108-
f("Index creation date <%s> has passed rotation period <%s> and has a reasonable size <%s> for rotation",
109-
creationDate, rotationPeriod, humanReadableByteCount(minIndexSize)));
110-
}
111-
112-
return createResult(false, "No reason to rotate found");
113-
}
114-
115-
private boolean indexExceedsLeeWay(DateTime creationDate, Period leeWay) {
116-
return timePassedIsBeyondLimit(creationDate, leeWay);
117-
}
118-
119-
private boolean indexIsOldEnough(DateTime creationDate) {
120-
return timePassedIsBeyondLimit(creationDate, rotationPeriod);
121-
}
122-
123-
private boolean timePassedIsBeyondLimit(DateTime date, Period limit) {
124-
final Instant now = clock.instantNow();
125-
final Duration timePassed = Duration.between(Instant.ofEpochMilli(date.getMillis()), now);
126-
final Duration limitAsDuration = Duration.ofSeconds(limit.toStandardSeconds().getSeconds());
127-
128-
return timePassed.compareTo(limitAsDuration) >= 0;
50+
return timeSizeOptimizingCalculator.calculate(index, indexLifetimeConfig, indexSet.getConfig());
12951
}
13052

13153
public interface Factory {

graylog2-server/src/main/java/org/graylog2/indexer/cluster/ClusterAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.graylog2.rest.models.system.indexer.responses.ClusterHealth;
2626
import org.graylog2.system.stats.elasticsearch.ClusterStats;
2727
import org.graylog2.system.stats.elasticsearch.NodeInfo;
28+
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
2829
import org.graylog2.system.stats.elasticsearch.ShardStats;
2930

3031
import java.util.Collection;
@@ -61,6 +62,8 @@ public interface ClusterAdapter {
6162

6263
Map<String, NodeInfo> nodesInfo();
6364

65+
Map<String, NodeOSInfo> nodesHostInfo();
66+
6467
ShardStats shardStats();
6568

6669
Optional<HealthStatus> deflectorHealth(Collection<String> indices);

0 commit comments

Comments
 (0)