Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/issue-23947.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Dynamic shard sizing based on search nodes operating system memory."

issues = ["23947"]
pulls = ["24372"]
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
Expand All @@ -46,14 +48,12 @@
import org.graylog2.system.stats.elasticsearch.ClusterStats;
import org.graylog2.system.stats.elasticsearch.IndicesStats;
import org.graylog2.system.stats.elasticsearch.NodeInfo;
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
import org.graylog2.system.stats.elasticsearch.NodesStats;
import org.graylog2.system.stats.elasticsearch.ShardStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -276,8 +276,9 @@ public Map<String, NodeInfo> nodesInfo() {
final Request request = new Request("GET", "/_nodes");
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Elasticsearch nodes data!");

return toStream(nodesJson.at("/nodes").fields())
.collect(Collectors.toMap(Map.Entry::getKey, o -> createNodeInfo(o.getValue())));
final JsonNode nodes = nodesJson.at("/nodes");
return toStream(nodes.fieldNames())
.collect(Collectors.toMap(name -> name, name -> createNodeInfo(nodes.get(name))));
}

private NodeInfo createNodeInfo(JsonNode nodesJson) {
Expand All @@ -289,6 +290,23 @@ private NodeInfo createNodeInfo(JsonNode nodesJson) {
.build();
}

@Override
public Map<String, NodeOSInfo> nodesHostInfo() {
final Request request = new Request("GET", "/_nodes/stats/os");
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Elasticsearch nodes os data!");

final JsonNode nodes = nodesJson.at("/nodes");
return toStream(nodes.fieldNames())
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
}

private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
return new NodeOSInfo(
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
);
}

public <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog.shaded.opensearch2.org.opensearch.OpenSearchException;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -47,14 +49,12 @@
import org.graylog2.system.stats.elasticsearch.ClusterStats;
import org.graylog2.system.stats.elasticsearch.IndicesStats;
import org.graylog2.system.stats.elasticsearch.NodeInfo;
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
import org.graylog2.system.stats.elasticsearch.NodesStats;
import org.graylog2.system.stats.elasticsearch.ShardStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -285,8 +285,9 @@ public Map<String, NodeInfo> nodesInfo() {
final Request request = new Request("GET", "/_nodes");
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Opensearch nodes data!");

return toStream(nodesJson.at("/nodes").fields())
.collect(Collectors.toMap(Map.Entry::getKey, o -> createNodeInfo(o.getValue())));
final JsonNode nodes = nodesJson.at("/nodes");
return toStream(nodes.fieldNames())
.collect(Collectors.toMap(name -> name, name -> createNodeInfo(nodes.get(name))));
}

private NodeInfo createNodeInfo(JsonNode nodesJson) {
Expand All @@ -298,6 +299,23 @@ private NodeInfo createNodeInfo(JsonNode nodesJson) {
.build();
}

@Override
public Map<String, NodeOSInfo> nodesHostInfo() {
final Request request = new Request("GET", "/_nodes/stats/os");
final JsonNode nodesJson = jsonApi.perform(request, "Couldn't read Opensearch nodes os data!");

final JsonNode nodes = nodesJson.at("/nodes");
return toStream(nodes.fieldNames())
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
}

private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
return new NodeOSInfo(
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
);
}

public <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.graylog2.rest.models.system.indexer.responses.ClusterHealth;
import org.graylog2.system.stats.elasticsearch.ClusterStats;
import org.graylog2.system.stats.elasticsearch.IndicesStats;
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
import org.graylog2.system.stats.elasticsearch.NodesStats;
import org.graylog2.system.stats.elasticsearch.ShardStats;
import org.opensearch.client.json.JsonData;
Expand Down Expand Up @@ -327,6 +328,25 @@ private org.graylog2.system.stats.elasticsearch.NodeInfo createNodeInfo(JsonNode
.build();
}

@Override
public Map<String, NodeOSInfo> nodesHostInfo() {
Request request = Requests.builder()
.endpoint("/_nodes/stats/os")
.method("GET")
.build();
JsonNode json = jsonApi.performRequest(request, "Couldn't read Opensearch nodes os data!");
JsonNode nodes = json.at("/nodes");
return toStream(nodes.fieldNames())
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
}

private NodeOSInfo createNodeHostInfo(JsonNode nodesOsJson) {
return new NodeOSInfo(
nodesOsJson.at("/os/mem/total_in_bytes").asLong(),
toStream(nodesOsJson.at("/roles").elements()).map(JsonNode::asText).toList()
);
}

public <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(((Iterable<T>) () -> iterator).spliterator(), false);
}
Expand Down Expand Up @@ -386,8 +406,8 @@ public Optional<HealthStatus> deflectorHealth(Collection<String> indices) {

final Set<IndicesRecord> indexSummaries = opensearchClient.execute(() ->
catClient.indices().valueBody()
.stream()
.filter(indexSummary -> mappedIndices.contains(indexSummary.index()))
.stream()
.filter(indexSummary -> mappedIndices.contains(indexSummary.index()))
.collect(Collectors.toSet()),
"Unable to retrieve indices");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
import com.github.joschi.jadconfig.validators.PositiveLongValidator;
import com.github.joschi.jadconfig.validators.StringNotBlankValidator;
import com.google.common.annotations.VisibleForTesting;
import org.graylog2.configuration.validators.RetentionStrategyValidator;
import org.graylog2.configuration.validators.RotationStrategyValidator;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategy;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class ElasticsearchConfiguration {
public static final String TIME_SIZE_OPTIMIZING_RETENTION_FIXED_LEEWAY = "time_size_optimizing_retention_fixed_leeway";
public static final String TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE = "time_size_optimizing_rotation_min_shard_size";
public static final String TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE = "time_size_optimizing_rotation_max_shard_size";
public static final String TIME_SIZE_OPTIMIZING_ROTATION_OS_MEMORY_FACTOR = "time_size_optimizing_rotation_os_memory_factor";
public static final String TIME_SIZE_OPTIMIZING_ROTATION_PERIOD = "time_size_optimizing_rotation_period";
public static final String ALLOW_FLEXIBLE_RETENTION_PERIOD = "allow_flexible_retention_period";
public static final String INDEX_FIELD_TYPE_REFRESH_INTERVAL = "index_field_type_refresh_interval";
Expand Down Expand Up @@ -133,10 +135,13 @@ public class ElasticsearchConfiguration {
private Period timeSizeOptimizingRotationPeriod = Period.days(1);

@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE)
private Size timeSizeOptimizingRotationMinShardSize = Size.gigabytes(20);
private Size timeSizeOptimizingRotationMinShardSize;

@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE)
private Size timeSizeOptimizingRotationMaxShardSize = Size.gigabytes(20);
private Size timeSizeOptimizingRotationMaxShardSize;

@Parameter(value = TIME_SIZE_OPTIMIZING_ROTATION_OS_MEMORY_FACTOR)
private double timeSizeOptimizingRotationOSMemoryFactor = 0.6;

@Parameter(value = TIME_SIZE_OPTIMIZING_RETENTION_MIN_LIFETIME)
private Period timeSizeOptimizingRetentionMinLifeTime = IndexLifetimeConfig.DEFAULT_LIFETIME_MIN;
Expand Down Expand Up @@ -268,14 +273,20 @@ public Period getTimeSizeOptimizingRotationPeriod() {
return timeSizeOptimizingRotationPeriod;
}

@Nullable
public Size getTimeSizeOptimizingRotationMinShardSize() {
return timeSizeOptimizingRotationMinShardSize;
}

@Nullable
public Size getTimeSizeOptimizingRotationMaxShardSize() {
return timeSizeOptimizingRotationMaxShardSize;
}

public double getTimeSizeOptimizingRotationOSMemoryFactor() {
return timeSizeOptimizingRotationOSMemoryFactor;
}

public Period getTimeSizeOptimizingRetentionMinLifeTime() {
return timeSizeOptimizingRetentionMinLifeTime;
}
Expand Down Expand Up @@ -320,7 +331,8 @@ public boolean allowFlexibleRetentionPeriod() {
@ValidatorMethod
@SuppressWarnings("unused")
public void validateTimeSizeOptimizingRotation() throws ValidationException {
if (getTimeSizeOptimizingRotationMaxShardSize().compareTo(getTimeSizeOptimizingRotationMinShardSize()) < 0) {
if (getTimeSizeOptimizingRotationMaxShardSize() != null && getTimeSizeOptimizingRotationMinShardSize() != null &&
getTimeSizeOptimizingRotationMaxShardSize().compareTo(getTimeSizeOptimizingRotationMinShardSize()) < 0) {
throw new ValidationException(f("\"%s=%s\" cannot be larger than \"%s=%s\"",
TIME_SIZE_OPTIMIZING_ROTATION_MIN_SHARD_SIZE, getTimeSizeOptimizingRotationMinShardSize(),
TIME_SIZE_OPTIMIZING_ROTATION_MAX_SHARD_SIZE, getTimeSizeOptimizingRotationMaxShardSize())
Expand Down Expand Up @@ -357,4 +369,14 @@ TIME_SIZE_OPTIMIZING_RETENTION_MIN_LIFETIME, getTimeSizeOptimizingRetentionMinLi
);
}
}

@VisibleForTesting
public void setTimeSizeOptimizingRotationMinShardSize(Size timeSizeOptimizingRotationMinShardSize) {
this.timeSizeOptimizingRotationMinShardSize = timeSizeOptimizingRotationMinShardSize;
}

@VisibleForTesting
public void setTimeSizeOptimizingRotationMaxShardSize(Size timeSizeOptimizingRotationMaxShardSize) {
this.timeSizeOptimizingRotationMaxShardSize = timeSizeOptimizingRotationMaxShardSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,116 +16,38 @@
*/
package org.graylog2.datatiering.rotation;

import com.github.joschi.jadconfig.util.Size;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog2.configuration.ElasticsearchConfiguration;
import org.graylog2.indexer.indexset.IndexSet;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.retention.strategies.NoopRetentionStrategyConfig;
import org.graylog2.indexer.rotation.common.IndexRotator;
import org.graylog2.indexer.rotation.tso.IndexLifetimeConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.graylog2.indexer.rotation.tso.TimeSizeOptimizingCalculator;

import javax.annotation.Nonnull;
import java.time.Duration;
import java.time.Instant;

import static org.graylog2.shared.utilities.StringUtils.f;
import static org.graylog2.shared.utilities.StringUtils.humanReadableByteCount;

public class DataTierRotation {

private final JobSchedulerClock clock;
private final org.joda.time.Period rotationPeriod;

private final Indices indices;

private final IndexRotator indexRotator;

private final Size maxShardSize;
private final Size minShardSize;

private final TimeSizeOptimizingCalculator timeSizeOptimizingCalculator;
private final IndexLifetimeConfig indexLifetimeConfig;

@AssistedInject
public DataTierRotation(Indices indices,
ElasticsearchConfiguration elasticsearchConfiguration,
IndexRotator indexRotator,
JobSchedulerClock clock,
public DataTierRotation(IndexRotator indexRotator,
TimeSizeOptimizingCalculator timeSizeOptimizingCalculator,
@Assisted IndexLifetimeConfig indexLifetimeConfig) {

this.indices = indices;
this.indexRotator = indexRotator;
this.clock = clock;
this.rotationPeriod = elasticsearchConfiguration.getTimeSizeOptimizingRotationPeriod();
this.maxShardSize = elasticsearchConfiguration.getTimeSizeOptimizingRotationMaxShardSize();
this.minShardSize = elasticsearchConfiguration.getTimeSizeOptimizingRotationMinShardSize();
this.timeSizeOptimizingCalculator = timeSizeOptimizingCalculator;
this.indexLifetimeConfig = indexLifetimeConfig;
}

public void rotate(IndexSet indexSet) {
indexRotator.rotate(indexSet, this::shouldRotate);
}

private IndexRotator.Result createResult(boolean shouldRotate, String message) {
return IndexRotator.createResult(shouldRotate, message, this.getClass().getCanonicalName());
}

@Nonnull
private IndexRotator.Result shouldRotate(final String index, IndexSet indexSet) {
final DateTime creationDate = indices.indexCreationDate(index).orElseThrow(() -> new IllegalStateException("No index creation date"));
final Long sizeInBytes = indices.getStoreSizeInBytes(index).orElseThrow(() -> new IllegalStateException("No index size"));

if (indices.numberOfMessages(index) == 0) {
return createResult(false, "Index is empty");
}

final int shards = indexSet.getConfig().shards();

final long maxIndexSize = maxShardSize.toBytes() * shards;
if (sizeInBytes > maxIndexSize) {
return createResult(true,
f("Index size <%s> exceeds maximum size <%s>",
humanReadableByteCount(sizeInBytes), humanReadableByteCount(maxIndexSize)));
}

// If no retention is selected, we have an "indefinite" optimization leeway
if (!(indexSet.getConfig().retentionStrategyConfig() instanceof NoopRetentionStrategyConfig)) {
Period leeWay = indexLifetimeConfig.indexLifetimeMax().minus(indexLifetimeConfig.indexLifetimeMin());
if (indexExceedsLeeWay(creationDate, leeWay)) {
return createResult(true,
f("Index creation date <%s> exceeds optimization leeway <%s>",
creationDate, leeWay));
}
}

final long minIndexSize = minShardSize.toBytes() * shards;
if (indexIsOldEnough(creationDate) && sizeInBytes >= minIndexSize) {
return createResult(true,
f("Index creation date <%s> has passed rotation period <%s> and has a reasonable size <%s> for rotation",
creationDate, rotationPeriod, humanReadableByteCount(minIndexSize)));
}

return createResult(false, "No reason to rotate found");
}

private boolean indexExceedsLeeWay(DateTime creationDate, Period leeWay) {
return timePassedIsBeyondLimit(creationDate, leeWay);
}

private boolean indexIsOldEnough(DateTime creationDate) {
return timePassedIsBeyondLimit(creationDate, rotationPeriod);
}

private boolean timePassedIsBeyondLimit(DateTime date, Period limit) {
final Instant now = clock.instantNow();
final Duration timePassed = Duration.between(Instant.ofEpochMilli(date.getMillis()), now);
final Duration limitAsDuration = Duration.ofSeconds(limit.toStandardSeconds().getSeconds());

return timePassed.compareTo(limitAsDuration) >= 0;
return timeSizeOptimizingCalculator.calculate(index, indexLifetimeConfig, indexSet.getConfig());
}

public interface Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.graylog2.rest.models.system.indexer.responses.ClusterHealth;
import org.graylog2.system.stats.elasticsearch.ClusterStats;
import org.graylog2.system.stats.elasticsearch.NodeInfo;
import org.graylog2.system.stats.elasticsearch.NodeOSInfo;
import org.graylog2.system.stats.elasticsearch.ShardStats;

import java.util.Collection;
Expand Down Expand Up @@ -61,6 +62,8 @@ public interface ClusterAdapter {

Map<String, NodeInfo> nodesInfo();

Map<String, NodeOSInfo> nodesHostInfo();

ShardStats shardStats();

Optional<HealthStatus> deflectorHealth(Collection<String> indices);
Expand Down
Loading
Loading