Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ record LazyRolloverExecutor(
@Override
public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecutionContext) {
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
var reasonBuilder = new StringBuilder("lazy bulk rollover [");
final var resultsCollector = new Strings.BoundedDelimitedStringCollector(reasonBuilder, ",", 1024);
var state = batchExecutionContext.initialState();
Map<ProjectId, Map<RolloverRequest, List<TaskContext<LazyRolloverTask>>>> groupedRequests = new HashMap<>();
for (final var taskContext : batchExecutionContext.taskContexts()) {
Expand All @@ -202,7 +203,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
try {
RolloverRequest rolloverRequest = entry.getKey();
final var projectState = state.projectState(projectRequests.getKey());
state = executeTask(projectState, rolloverRequest, results, rolloverTaskContexts, listener);
state = executeTask(projectState, rolloverRequest, resultsCollector::appendItem, rolloverTaskContexts, listener);
} catch (Exception e) {
rolloverTaskContexts.forEach(taskContext -> taskContext.onFailure(e));
} finally {
Expand All @@ -212,11 +213,10 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
}

if (state != batchExecutionContext.initialState()) {
var reason = new StringBuilder("lazy bulk rollover [");
Strings.collectionToDelimitedStringWithLimit(results, ",", 1024, reason);
reason.append(']');
resultsCollector.finish();
reasonBuilder.append(']');
try (var ignored = batchExecutionContext.dropHeadersContext()) {
state = allocationService.reroute(state, reason.toString(), listener.reroute());
state = allocationService.reroute(state, reasonBuilder.toString(), listener.reroute());
}
} else {
listener.noRerouteNeeded();
Expand All @@ -227,7 +227,7 @@ public ClusterState execute(BatchExecutionContext<LazyRolloverTask> batchExecuti
public ClusterState executeTask(
ProjectState currentState,
RolloverRequest rolloverRequest,
ArrayList<String> results,
Consumer<String> results,
List<TaskContext<LazyRolloverTask>> rolloverTaskContexts,
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
) throws Exception {
Expand Down Expand Up @@ -264,7 +264,7 @@ public ClusterState executeTask(
null,
isFailureStoreRollover
);
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
results.accept(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
logger.trace("lazy rollover result [{}]", rolloverResult);

final var rolloverIndexName = rolloverResult.rolloverIndexName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@
import org.elasticsearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -517,22 +517,22 @@ record RolloverExecutor(
@Override
public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionContext) {
final var listener = new AllocationActionMultiListener<RolloverResponse>(threadPool.getThreadContext());
final var results = new ArrayList<String>(batchExecutionContext.taskContexts().size());
final var reasonBuilder = new StringBuilder("bulk rollover [");
final var resultsCollector = new Strings.BoundedDelimitedStringCollector(reasonBuilder, ",", 1024);
var state = batchExecutionContext.initialState();
for (final var taskContext : batchExecutionContext.taskContexts()) {
try (var ignored = taskContext.captureResponseHeaders()) {
state = executeTask(state, results, taskContext, listener);
state = executeTask(state, resultsCollector::appendItem, taskContext, listener);
} catch (Exception e) {
taskContext.onFailure(e);
}
}

if (state != batchExecutionContext.initialState()) {
var reason = new StringBuilder("bulk rollover [");
Strings.collectionToDelimitedStringWithLimit(results, ",", 1024, reason);
reason.append(']');
resultsCollector.finish();
reasonBuilder.append(']');
try (var ignored = batchExecutionContext.dropHeadersContext()) {
state = allocationService.reroute(state, reason.toString(), listener.reroute());
state = allocationService.reroute(state, reasonBuilder.toString(), listener.reroute());
}
} else {
listener.noRerouteNeeded();
Expand All @@ -542,7 +542,7 @@ public ClusterState execute(BatchExecutionContext<RolloverTask> batchExecutionCo

public ClusterState executeTask(
ClusterState currentState,
ArrayList<String> results,
Consumer<String> resultsCollector,
TaskContext<RolloverTask> rolloverTaskContext,
AllocationActionMultiListener<RolloverResponse> allocationActionMultiListener
) throws Exception {
Expand Down Expand Up @@ -613,7 +613,7 @@ public ClusterState executeTask(
rolloverTask.autoShardingResult(),
targetFailureStore
);
results.add(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
resultsCollector.accept(rolloverResult.sourceIndexName() + "->" + rolloverResult.rolloverIndexName());
logger.trace("rollover result [{}]", rolloverResult);

final var rolloverIndexName = rolloverResult.rolloverIndexName();
Expand Down
63 changes: 48 additions & 15 deletions server/src/main/java/org/elasticsearch/common/Strings.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,22 +575,55 @@ public static void collectionToDelimitedString(Iterable<?> coll, String delimite
* items are omitted
*/
public static void collectionToDelimitedStringWithLimit(Iterable<?> coll, String delimiter, int appendLimit, StringBuilder sb) {
final Iterator<?> it = coll.iterator();
final long lengthLimit = sb.length() + appendLimit; // long to avoid overflow
int count = 0;
while (it.hasNext()) {
sb.append(it.next());
final var boundedDelimitedStringCollector = new BoundedDelimitedStringCollector(sb, delimiter, appendLimit);
coll.forEach(boundedDelimitedStringCollector::appendItem);
boundedDelimitedStringCollector.finish();
}

/**
* Collects a sequence of objects into a delimited string, dropping objects once the string reaches a certain maximum length. Similar to
* {@link #collectionToDelimitedStringWithLimit} except that this doesn't need the collection of items to be provided up front.
*/
public static final class BoundedDelimitedStringCollector {
private final StringBuilder stringBuilder;
private final String delimiter;
private final long lengthLimit;
private int count = 0;
private int omitted = 0;

public BoundedDelimitedStringCollector(StringBuilder stringBuilder, String delimiter, int appendLimit) {
this.stringBuilder = stringBuilder;
this.delimiter = delimiter;
this.lengthLimit = stringBuilder.length() + appendLimit; // long to avoid overflow
}

/**
* Add the given item's string representation to the string, with a delimiter if necessary and surrounded by the given prefix and
* suffix, as long as the string is not already too long.
*/
public void appendItem(Object item) {
count += 1;
if (it.hasNext()) {
sb.append(delimiter);
if (sb.length() > lengthLimit) {
int omitted = 0;
while (it.hasNext()) {
it.next();
omitted += 1;
}
sb.append("... (").append(count + omitted).append(" in total, ").append(omitted).append(" omitted)");
}
if (omitted > 0) {
omitted += 1;
return;
}
if (count > 1) {
stringBuilder.append(delimiter);
}
if (stringBuilder.length() > lengthLimit) {
omitted += 1;
stringBuilder.append("..."); // indicate there are some omissions, just in case the caller forgets to call finish()
return;
}
stringBuilder.append(item);
}

/**
* Complete the collection, adding to the string a summary of omitted objects, if any.
*/
public void finish() {
if (omitted > 0) {
stringBuilder.append(" (").append(count).append(" in total, ").append(omitted).append(" omitted)");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2506,13 +2506,10 @@ public void onFailure(Exception e) {
@Override
public void onFailure(Exception e) {
logger.warn(() -> {
final StringBuilder sb = new StringBuilder("failed to complete snapshot deletion for [");
Strings.collectionToDelimitedStringWithLimit(
deleteEntry.snapshots().stream().map(SnapshotId::getName).toList(),
",",
1024,
sb
);
final var sb = new StringBuilder("failed to complete snapshot deletion for [");
final var collector = new Strings.BoundedDelimitedStringCollector(sb, ",", 1024);
deleteEntry.snapshots().forEach(s -> collector.appendItem(s.getName()));
collector.finish();
sb.append("] from repository [").append(deleteEntry.repository()).append("]");
return sb;
}, e);
Expand All @@ -2527,7 +2524,14 @@ protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
);
}
}, () -> {
logger.info("snapshots {} deleted", snapshotIds);
logger.info(() -> {
final var sb = new StringBuilder("snapshots [");
final var collector = new Strings.BoundedDelimitedStringCollector(sb, ",", 1024);
snapshotIds.forEach(collector::appendItem);
collector.finish();
sb.append("] deleted");
return sb;
});
doneFuture.onResponse(null);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.elasticsearch.common.Strings.collectionToDelimitedString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class BoundedDelimitedStringCollectorTests extends ESTestCase {

private interface TestHarness {
String getResult(Iterable<?> collection, String delimiter, int appendLimit);

enum Type {
COLLECTING,
ITERATING
}
}

private final TestHarness testHarness;

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return Stream.of(TestHarness.Type.values()).map(x -> new Object[] { x })::iterator;
}

public BoundedDelimitedStringCollectorTests(@Name("type") TestHarness.Type testHarnessType) {
testHarness = switch (testHarnessType) {
case COLLECTING -> (collection, delimiter, appendLimit) -> {
final var stringBuilder = new StringBuilder();
final var collector = new Strings.BoundedDelimitedStringCollector(stringBuilder, delimiter, appendLimit);
collection.forEach(collector::appendItem);
collector.finish();
return stringBuilder.toString();
};
case ITERATING -> (collection, delimiter, appendLimit) -> {
final var stringBuilder = new StringBuilder();
Strings.collectionToDelimitedStringWithLimit(collection, delimiter, appendLimit, stringBuilder);
return stringBuilder.toString();
};
};
}

public void testCollectionToDelimitedStringWithLimitZero() {
final String delimiter = randomFrom("", ",", ", ", "/");

final int count = between(0, 100);
final List<String> strings = new ArrayList<>(count);
while (strings.size() < count) {
// avoid starting with a sequence of empty appends, it makes the assertions much messier
final int minLength = strings.isEmpty() && delimiter.isEmpty() ? 1 : 0;
strings.add(randomAlphaOfLength(between(minLength, 10)));
}

final String completelyTruncatedDescription = testHarness.getResult(strings, delimiter, 0);

if (count == 0) {
assertThat(completelyTruncatedDescription, equalTo(""));
} else if (count == 1) {
assertThat(completelyTruncatedDescription, equalTo(strings.get(0)));
} else {
assertThat(
completelyTruncatedDescription,
equalTo(strings.get(0) + delimiter + "... (" + count + " in total, " + (count - 1) + " omitted)")
);
}
}

public void testCollectionToDelimitedStringWithLimitTruncation() {
final String delimiter = randomFrom("", ",", ", ", "/");

final int count = between(2, 100);
final List<String> strings = new ArrayList<>(count);
while (strings.size() < count) {
// avoid empty appends, it makes the assertions much messier
final int minLength = delimiter.isEmpty() ? 1 : 0;
strings.add(randomAlphaOfLength(between(minLength, 10)));
}

final int fullDescriptionLength = collectionToDelimitedString(strings, delimiter).length();
final int lastItemSize = strings.get(count - 1).length();
final int truncatedLength = between(0, fullDescriptionLength - lastItemSize - 1);
final String truncatedDescription = testHarness.getResult(strings, delimiter, truncatedLength);

assertThat(truncatedDescription, allOf(containsString("... (" + count + " in total,"), endsWith(" omitted)")));

assertThat(
truncatedDescription,
truncatedDescription.length(),
lessThanOrEqualTo(truncatedLength + ("0123456789" + delimiter + "... (999 in total, 999 omitted)").length())
);
}

public void testCollectionToDelimitedStringWithLimitNoTruncation() {
final String delimiter = randomFrom("", ",", ", ", "/");

final int count = between(1, 100);
final List<String> strings = new ArrayList<>(count);
while (strings.size() < count) {
strings.add(randomAlphaOfLength(between(0, 10)));
}

final String fullDescription = collectionToDelimitedString(strings, delimiter);
for (String string : strings) {
assertThat(fullDescription, containsString(string));
}

final int lastItemSize = strings.get(count - 1).length();
final int minLimit = fullDescription.length() - lastItemSize;
final int limit = randomFrom(between(minLimit, fullDescription.length()), between(minLimit, Integer.MAX_VALUE), Integer.MAX_VALUE);

assertThat(testHarness.getResult(strings, delimiter, limit), equalTo(fullDescription));
}

}
Loading