Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public ClusterModule(
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
shardRoutingRoleStrategy
shardRoutingRoleStrategy,
telemetryProvider.getMeterRegistry()
);
this.allocationService.addAllocFailuresResetListenerTo(clusterService);
this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class AllocationService {
private final ClusterInfoService clusterInfoService;
private final SnapshotsInfoService snapshotsInfoService;
private final ShardRoutingRoleStrategy shardRoutingRoleStrategy;
private final ShardChangesObserver shardChangesObserver;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
@SuppressWarnings("this-escape")
Expand All @@ -101,7 +103,7 @@ public AllocationService(
SnapshotsInfoService snapshotsInfoService,
ShardRoutingRoleStrategy shardRoutingRoleStrategy
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, shardRoutingRoleStrategy);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, shardRoutingRoleStrategy, MeterRegistry.NOOP);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

Expand All @@ -110,13 +112,15 @@ public AllocationService(
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ShardRoutingRoleStrategy shardRoutingRoleStrategy
ShardRoutingRoleStrategy shardRoutingRoleStrategy,
MeterRegistry meterRegistry
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.shardRoutingRoleStrategy = shardRoutingRoleStrategy;
this.shardChangesObserver = new ShardChangesObserver(meterRegistry);
}

/**
Expand Down Expand Up @@ -769,7 +773,9 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState, lon
clusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime
currentNanoTime,
false,
shardChangesObserver
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ public RoutingAllocation(
this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
}

/**
* Creates a new {@link RoutingAllocation}
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state
* @param clusterState cluster state before rerouting
* @param clusterInfo information about node disk usage and shard disk usage
* @param shardSizeInfo information about snapshot shard sizes
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
*/
/// Creates a new [RoutingAllocation]
///
/// @param deciders [AllocationDeciders] to use to make decisions for routing allocations
/// @param routingNodes routing nodes in the current cluster or `null` if using those in the given cluster state
/// @param clusterState cluster state before rerouting
/// @param clusterInfo information about node disk usage and shard disk usage
/// @param shardSizeInfo information about snapshot shard sizes
/// @param currentNanoTime the nano time to use for all delay allocation calculation (typically `System#nanoTime()`)
///
public RoutingAllocation(
AllocationDeciders deciders,
@Nullable RoutingNodes routingNodes,
Expand All @@ -112,25 +112,29 @@ public RoutingAllocation(
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime
) {
this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, false);
}

/**
* Creates a new {@link RoutingAllocation}
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state
* @param clusterState cluster state before rerouting
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
* @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation
*/
this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, false, RoutingChangesObserver.NOOP);
}

/// Creates a new [RoutingAllocation]
///
/// @param deciders [AllocationDeciders] to use to make decisions for routing allocations
/// @param routingNodes routing nodes in the current cluster or `null` if using those in the given cluster state
/// @param clusterState cluster state before rerouting
/// @param clusterInfo information about node disk usage and shard disk usage
/// @param shardSizeInfo information about snapshot shard sizes
/// @param currentNanoTime the nano time to use for all delay allocation calculation (typically `System#nanoTime()`)
/// @param isSimulating `true` if "transient" deciders should be ignored because we are simulating the final allocation
/// @param shardChangesObserver observer that records shard state transition timing metrics
///
public RoutingAllocation(
AllocationDeciders deciders,
@Nullable RoutingNodes routingNodes,
ClusterState clusterState,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
long currentNanoTime,
boolean isSimulating
boolean isSimulating,
RoutingChangesObserver shardChangesObserver
) {
this.deciders = deciders;
this.routingNodes = routingNodes;
Expand All @@ -154,7 +158,7 @@ public RoutingAllocation(
indexMetadataUpdater,
restoreInProgressUpdater,
resizeSourceIndexUpdater,
new ShardChangesObserver() }
shardChangesObserver }
);
}

Expand Down Expand Up @@ -465,7 +469,8 @@ public RoutingAllocation mutableCloneForSimulation() {
clusterInfo,
shardSizeInfo,
currentNanoTime,
true
true,
RoutingChangesObserver.NOOP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,51 @@
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

public class ShardChangesObserver implements RoutingChangesObserver {
import java.util.Arrays;
import java.util.Map;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/// Observes shard state transitions during allocation rounds, logging them and emitting APM timing metrics.
class ShardChangesObserver implements RoutingChangesObserver {
private static final Logger logger = LogManager.getLogger(ShardChangesObserver.class);

static final String UNASSIGNED_TO_INITIALIZING_METRIC = "es.allocator.shards.unassigned_to_initializing.duration.histogram";
static final String UNASSIGNED_TO_STARTED_METRIC = "es.allocator.shards.unassigned_to_started.duration.histogram";

private static final Map<UnassignedInfo.Reason, Map<String, Object>> PRIMARY_ATTRIBUTES = buildAttributesByReason(true);
private static final Map<UnassignedInfo.Reason, Map<String, Object>> REPLICA_ATTRIBUTES = buildAttributesByReason(false);

private static Map<UnassignedInfo.Reason, Map<String, Object>> buildAttributesByReason(boolean primary) {
return Arrays.stream(UnassignedInfo.Reason.values())
.collect(Collectors.toUnmodifiableMap(r -> r, r -> Map.of("es_shard_primary", primary, "es_shard_reason", r.name())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like in the past I've had issues with attribute values being something other than a string. I think the TestTelemetryProvider copes with it but the real one doesn't. Please just verify that because it may have changed or I may have misremembered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r.name() is a String so that one should work.

For the es_shard_primary boolean, I traced down the code, it seems that this should be handled properly:

static Attributes fromMap(String metricName, Map<String, Object> attributes) {
if (attributes == null || attributes.isEmpty()) {
return Attributes.empty();
}
MetricValidator.assertValidAttributeNames(metricName, attributes);
var builder = Attributes.builder();
attributes.forEach((k, v) -> {
if (v instanceof String value) {
builder.put(k, value);
} else if (v instanceof Long value) {
builder.put(k, value);
} else if (v instanceof Integer value) {
builder.put(k, value);
} else if (v instanceof Byte value) {
builder.put(k, value);
} else if (v instanceof Short value) {
builder.put(k, value);
} else if (v instanceof Double value) {
builder.put(k, value);
} else if (v instanceof Float value) {
builder.put(k, value);
} else if (v instanceof Boolean value) {
builder.put(k, value);
} else {
throw new IllegalArgumentException("attributes do not support value type of [" + v.getClass().getCanonicalName() + "]");
}
});
return builder.build();
}

There are also other example paths in Elasticsearch using boolean attributes, e.g the system_thread setting in:

private static void recordPhaseLatency(
LongHistogram histogramMetric,
long tookInNanos,
ShardSearchRequest request,
Long timeRangeFilterFromMillis
) {
Map<String, Object> attributes = SearchRequestAttributesExtractor.extractAttributes(
request,
timeRangeFilterFromMillis,
request.nowInMillis()
);
histogramMetric.record(TimeUnit.NANOSECONDS.toMillis(tookInNanos), attributes);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for double-checking!

}

private final LongHistogram unassignedToInitializingDuration;
private final LongHistogram unassignedToStartedDuration;
private final LongSupplier currentTimeMillisSupplier;

ShardChangesObserver(MeterRegistry meterRegistry) {
this(meterRegistry, System::currentTimeMillis);
}

ShardChangesObserver(MeterRegistry meterRegistry, LongSupplier currentTimeMillisSupplier) {
this.unassignedToInitializingDuration = meterRegistry.registerLongHistogram(
UNASSIGNED_TO_INITIALIZING_METRIC,
"Duration a shard spent in UNASSIGNED state before being assigned to a node",
"ms"
);
this.unassignedToStartedDuration = meterRegistry.registerLongHistogram(
UNASSIGNED_TO_STARTED_METRIC,
"Total duration from when a shard became UNASSIGNED to when it became STARTED",
"ms"
);
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
}

@Override
public void shardInitialized(ShardRouting unassignedShard, ShardRouting initializedShard) {
logger.trace(
Expand All @@ -27,6 +67,11 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali
initializedShard.recoverySource().getType(),
initializedShard.currentNodeId()
);
UnassignedInfo info = unassignedShard.unassignedInfo();
if (info != null) {
long durationMillis = currentTimeMillisSupplier.getAsLong() - info.unassignedTimeMillis();
unassignedToInitializingDuration.record(Math.max(0, durationMillis), attributes(info, initializedShard));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the need to floor this value at 0, is this just to accommodate slight clock skew between the nodes?

Otherwise, if we are expecting to see missing unassigned times for some reason, is recording a 0 in that case going to dilute the histogram?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the need to floor this value at 0, is this just to accommodate slight clock skew between the nodes?

Yep, that's the only reason! I went through the code and unassignedTimeMillis should always be a real timestamp.

Otherwise, if we are expecting to see missing unassigned times for some reason, is recording a 0 in that case going to dilute the histogram?

Clock skew large enough to produce a negative value would be unusual in practice so not too worried about this. Considering that, I'd rather floor to 0 than skip the recording entirely. Otherwise we could silently be dropping actual data points would make the histogram's total count slightly undercount actual transitions, which seems worse than a rare artificial 0ms entry. Happy to reconsider if you feel strongly though!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining, agree that we should just round to zero if it's due to clock skew

}
}

@Override
Expand All @@ -37,6 +82,12 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha
initializingShard.recoverySource().getType(),
startedShard.currentNodeId()
);
// Relocation target shards have no unassignedInfo
UnassignedInfo info = initializingShard.unassignedInfo();
if (info != null) {
long durationMillis = currentTimeMillisSupplier.getAsLong() - info.unassignedTimeMillis();
unassignedToStartedDuration.record(Math.max(0, durationMillis), attributes(info, startedShard));
}
}

@Override
Expand All @@ -63,4 +114,8 @@ public void replicaPromoted(ShardRouting replicaShard) {
private static String shardIdentifier(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + '[' + (shardRouting.primary() ? 'P' : 'R') + ']';
}

private static Map<String, Object> attributes(UnassignedInfo info, ShardRouting shard) {
return (shard.primary() ? PRIMARY_ATTRIBUTES : REPLICA_ATTRIBUTES).get(info.reason());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

Expand Down Expand Up @@ -154,7 +155,8 @@ public ShardAllocationDecision explainShardAllocation(ShardRouting shard, Routin
},
new EmptyClusterInfoService(),
EmptySnapshotsInfoService.INSTANCE,
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY,
MeterRegistry.NOOP
);

final String unrealisticAllocatorName = "unrealistic";
Expand Down Expand Up @@ -268,7 +270,8 @@ public void testExplainsNonAllocationOfShardWithUnknownAllocator() {
null,
null,
null,
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY,
MeterRegistry.NOOP
);
allocationService.setExistingShardsAllocators(
Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())
Expand Down Expand Up @@ -390,7 +393,8 @@ public void testAutoExpandReplicas() throws Exception {
null,
new EmptyClusterInfoService(),
EmptySnapshotsInfoService.INSTANCE,
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY
TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY,
MeterRegistry.NOOP
);

final ProjectId project1 = randomUniqueProjectId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,30 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.elasticsearch.test.MockLog.assertThatLogger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

@TestLogging(value = "org.elasticsearch.cluster.routing.allocation.ShardChangesObserver:TRACE", reason = "verifies debug level logging")
public class ShardChangesObserverTests extends ESAllocationTestCase {

public void testLogShardStarting() {

var indexName = randomIdentifier();
var indexMetadata = IndexMetadata.builder(indexName).settings(indexSettings(IndexVersion.current(), 1, 0)).build();

Expand Down Expand Up @@ -66,7 +75,6 @@ public void testLogShardStarting() {
}

public void testLogShardMovement() {

var allocationId = randomUUID();
var indexName = randomIdentifier();
var indexMetadata = IndexMetadata.builder(indexName)
Expand Down Expand Up @@ -143,4 +151,78 @@ public void testLogShardFailureAndPromotion() {
)
);
}

public void testUnassignedMetrics() {
final var meterRegistry = new RecordingMeterRegistry();
final long unassignedAtMillis = randomLongBetween(0, System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
final AtomicLong nowMillis = new AtomicLong(unassignedAtMillis);
final var observer = new ShardChangesObserver(meterRegistry, nowMillis::get);

final var reason = randomFrom(UnassignedInfo.Reason.values());
// ALLOCATION_FAILED needs failedAllocations > 0
final int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? randomIntBetween(1, 5) : 0;
// NODE_RESTARTING needs lastAllocatedNodeId
final String lastAllocatedNodeId = reason == UnassignedInfo.Reason.NODE_RESTARTING ? randomIdentifier() : null;
final var unassignedInfo = new UnassignedInfo(
reason,
null,
null,
failedAllocations,
System.nanoTime(),
unassignedAtMillis,
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Collections.emptySet(),
lastAllocatedNodeId
);
final var shardId = new ShardId("test-index", "_na_", 0);
final var primary = randomBoolean();
final var unassignedShard = shardRoutingBuilder(shardId, null, primary, ShardRoutingState.UNASSIGNED).withUnassignedInfo(
unassignedInfo
).build();

final var recorder = meterRegistry.getRecorder();
assertThat(
recorder.getMeasurements(InstrumentType.LONG_HISTOGRAM, ShardChangesObserver.UNASSIGNED_TO_INITIALIZING_METRIC).size(),
equalTo(0)
);
assertThat(
recorder.getMeasurements(InstrumentType.LONG_HISTOGRAM, ShardChangesObserver.UNASSIGNED_TO_STARTED_METRIC).size(),
equalTo(0)
);

// replica shards must recover from primary
final var recoverySource = primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE;
final var initializedShard = shardRoutingBuilder(shardId, "node-1", primary, ShardRoutingState.INITIALIZING).withRecoverySource(
recoverySource
).withUnassignedInfo(unassignedInfo).build();
final long initializedTimeMillis = System.currentTimeMillis();
nowMillis.set(initializedTimeMillis);
observer.shardInitialized(unassignedShard, initializedShard);

final var startedShard = shardRoutingBuilder(shardId, "node-1", primary, ShardRoutingState.STARTED).build();
final long startedTimeMillis = initializedTimeMillis + randomLongBetween(0, 1000L);
nowMillis.set(startedTimeMillis);
observer.shardStarted(initializedShard, startedShard);

final List<Measurement> initializedMetrics = recorder.getMeasurements(
InstrumentType.LONG_HISTOGRAM,
ShardChangesObserver.UNASSIGNED_TO_INITIALIZING_METRIC
);
assertThat(initializedMetrics, hasSize(1));
final var initializedMetricValue = initializedMetrics.getFirst();
assertThat(initializedMetricValue.getLong(), equalTo(Math.max(0, initializedTimeMillis - unassignedAtMillis)));
assertThat(initializedMetricValue.attributes().get("es_shard_primary"), equalTo(primary));
assertThat(initializedMetricValue.attributes().get("es_shard_reason"), equalTo(reason.name()));

final List<Measurement> startedMetrics = recorder.getMeasurements(
InstrumentType.LONG_HISTOGRAM,
ShardChangesObserver.UNASSIGNED_TO_STARTED_METRIC
);
assertThat(startedMetrics, hasSize(1));
final var startedMetricValue = startedMetrics.getFirst();
assertThat(startedMetricValue.getLong(), equalTo(Math.max(0, startedTimeMillis - unassignedAtMillis)));
assertThat(startedMetricValue.attributes().get("es_shard_primary"), equalTo(primary));
assertThat(startedMetricValue.attributes().get("es_shard_reason"), equalTo(reason.name()));
}
}
Loading
Loading