Skip to content

Commit 6f2134c

Browse files
committed
Merge remote-tracking branch 'upstream/main' into gcp-metering
2 parents 606f018 + 473c4da commit 6f2134c

File tree

9 files changed

+142
-18
lines changed

9 files changed

+142
-18
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.atomic.LongAdder;
5656
import java.util.stream.Collectors;
5757

58-
import static org.elasticsearch.core.Strings.format;
5958
import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED;
6059

6160
class S3BlobStore implements BlobStore {
@@ -398,16 +397,7 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, D
398397
} catch (MultiObjectDeleteException e) {
399398
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
400399
// first remove all keys that were sent in the request and then add back those that ran into an exception.
401-
logger.warn(
402-
() -> format(
403-
"Failed to delete some blobs %s",
404-
e.getErrors()
405-
.stream()
406-
.map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]")
407-
.toList()
408-
),
409-
e
410-
);
400+
logger.warn(buildDeletionErrorMessage(e), e);
411401
deletionExceptions.useOrMaybeSuppress(e);
412402
return;
413403
} catch (AmazonClientException e) {
@@ -430,6 +420,26 @@ private void deletePartition(OperationPurpose purpose, List<String> partition, D
430420
}
431421
}
432422

423+
private String buildDeletionErrorMessage(MultiObjectDeleteException e) {
424+
final var sb = new StringBuilder("Failed to delete some blobs ");
425+
final var errors = e.getErrors();
426+
for (int i = 0; i < errors.size() && i < MAX_DELETE_EXCEPTIONS; i++) {
427+
final var err = errors.get(i);
428+
sb.append("[").append(err.getKey()).append("][").append(err.getCode()).append("][").append(err.getMessage()).append("]");
429+
if (i < errors.size() - 1) {
430+
sb.append(",");
431+
}
432+
}
433+
if (errors.size() > MAX_DELETE_EXCEPTIONS) {
434+
sb.append("... (")
435+
.append(errors.size())
436+
.append(" in total, ")
437+
.append(errors.size() - MAX_DELETE_EXCEPTIONS)
438+
.append(" omitted)");
439+
}
440+
return sb.toString();
441+
}
442+
433443
/**
434444
* If there are remaining retries, pause for the configured interval then return true
435445
*

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.sun.net.httpserver.HttpHandler;
2222

2323
import org.apache.http.HttpStatus;
24+
import org.apache.logging.log4j.Level;
2425
import org.apache.lucene.index.CorruptIndexException;
2526
import org.apache.lucene.store.AlreadyClosedException;
2627
import org.elasticsearch.ExceptionsHelper;
@@ -51,10 +52,12 @@
5152
import org.elasticsearch.repositories.RepositoriesMetrics;
5253
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
5354
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
55+
import org.elasticsearch.rest.RestStatus;
5456
import org.elasticsearch.telemetry.InstrumentType;
5557
import org.elasticsearch.telemetry.Measurement;
5658
import org.elasticsearch.telemetry.RecordingMeterRegistry;
5759
import org.elasticsearch.test.ESTestCase;
60+
import org.elasticsearch.test.MockLog;
5861
import org.elasticsearch.watcher.ResourceWatcherService;
5962
import org.hamcrest.Matcher;
6063
import org.junit.After;
@@ -65,6 +68,7 @@
6568
import java.io.FilterInputStream;
6669
import java.io.IOException;
6770
import java.io.InputStream;
71+
import java.io.InputStreamReader;
6872
import java.net.InetSocketAddress;
6973
import java.net.SocketTimeoutException;
7074
import java.net.UnknownHostException;
@@ -83,6 +87,7 @@
8387
import java.util.concurrent.atomic.AtomicLong;
8488
import java.util.concurrent.atomic.AtomicReference;
8589
import java.util.function.IntConsumer;
90+
import java.util.regex.Pattern;
8691

8792
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
8893
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
@@ -1106,6 +1111,60 @@ public void testSuppressedDeletionErrorsAreCapped() {
11061111
assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
11071112
}
11081113

1114+
public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() {
1115+
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
1116+
int maxBulkDeleteSize = randomIntBetween(10, 30);
1117+
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
1118+
1119+
final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
1120+
httpServer.createContext("/", exchange -> {
1121+
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
1122+
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
1123+
final var matcher = pattern.matcher(requestBody);
1124+
final StringBuilder deletes = new StringBuilder();
1125+
deletes.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
1126+
deletes.append("<DeleteResult>");
1127+
while (matcher.find()) {
1128+
final String key = matcher.group(1);
1129+
deletes.append("<Error>");
1130+
deletes.append("<Code>").append(randomAlphaOfLength(10)).append("</Code>");
1131+
deletes.append("<Key>").append(key).append("</Key>");
1132+
deletes.append("<Message>").append(randomAlphaOfLength(40)).append("</Message>");
1133+
deletes.append("</Error>");
1134+
}
1135+
deletes.append("</DeleteResult>");
1136+
1137+
byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8);
1138+
exchange.getResponseHeaders().add("Content-Type", "application/xml");
1139+
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
1140+
exchange.getResponseBody().write(response);
1141+
exchange.close();
1142+
} else {
1143+
fail("expected only deletions");
1144+
}
1145+
});
1146+
var blobs = randomList(maxBulkDeleteSize, maxBulkDeleteSize, ESTestCase::randomIdentifier);
1147+
try (var mockLog = MockLog.capture(S3BlobStore.class)) {
1148+
mockLog.addExpectation(
1149+
new MockLog.SeenEventExpectation(
1150+
"deletion log",
1151+
S3BlobStore.class.getCanonicalName(),
1152+
Level.WARN,
1153+
blobs.size() > S3BlobStore.MAX_DELETE_EXCEPTIONS
1154+
? "Failed to delete some blobs [*... (* in total, * omitted)"
1155+
: "Failed to delete some blobs [*]"
1156+
)
1157+
);
1158+
var exception = expectThrows(
1159+
IOException.class,
1160+
"deletion should not succeed",
1161+
() -> blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())
1162+
);
1163+
assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
1164+
mockLog.awaitAllExpectationsMatched();
1165+
}
1166+
}
1167+
11091168
@Override
11101169
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
11111170
// some attempts make meaningful progress and do not count towards the max retry limit

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ tests:
345345
- class: org.elasticsearch.compute.data.BlockMultiValuedTests
346346
method: testToMask {elementType=BOOLEAN nullAllowed=true}
347347
issue: https://github.com/elastic/elasticsearch/issues/124165
348+
- class: org.elasticsearch.smoketest.MlWithSecurityIT
349+
method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable is missing}
350+
issue: https://github.com/elastic/elasticsearch/issues/124168
348351

349352
# Examples:
350353
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ static TransportVersion def(int id) {
183183
public static final TransportVersion STORED_SCRIPT_CONTENT_LENGTH = def(9_019_0_00);
184184
public static final TransportVersion JINA_AI_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_020_0_00);
185185
public static final TransportVersion RE_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(9_021_0_00);
186-
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_022_0_00);
186+
public static final TransportVersion UNASSIGENEDINFO_RESHARD_ADDED = def(9_022_0_00);
187+
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
187188

188189
/*
189190
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,6 +1947,39 @@ public Builder numberOfShards(int numberOfShards) {
19471947
return this;
19481948
}
19491949

1950+
/**
1951+
* Builder to create IndexMetadata that has an increased shard count (used for re-shard).
1952+
* The new shard count must be a multiple of the original shardcount.
1953+
* We do not support shrinking the shard count.
1954+
* @param shardCount updated shardCount
1955+
*
1956+
* TODO: Check if this.version needs to be incremented
1957+
*/
1958+
public Builder reshardAddShards(int shardCount) {
1959+
// Assert routingNumShards is null ?
1960+
// Assert numberOfShards > 0
1961+
if (shardCount % numberOfShards() != 0) {
1962+
throw new IllegalArgumentException(
1963+
"New shard count ["
1964+
+ shardCount
1965+
+ "] should be a multiple"
1966+
+ " of current shard count ["
1967+
+ numberOfShards()
1968+
+ "] for ["
1969+
+ index
1970+
+ "]"
1971+
);
1972+
}
1973+
IndexVersion indexVersionCreated = indexCreatedVersion(settings);
1974+
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SHARDS, shardCount).build();
1975+
var newPrimaryTerms = new long[shardCount];
1976+
Arrays.fill(newPrimaryTerms, this.primaryTerms.length, newPrimaryTerms.length, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
1977+
System.arraycopy(primaryTerms, 0, newPrimaryTerms, 0, this.primaryTerms.length);
1978+
primaryTerms = newPrimaryTerms;
1979+
routingNumShards = MetadataCreateIndexService.calculateNumRoutingShards(shardCount, indexVersionCreated);
1980+
return this;
1981+
}
1982+
19501983
/**
19511984
* Sets the number of shards that should be used for routing. This should only be used if the number of shards in
19521985
* an index has changed ie if the index is shrunk.

server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ public Builder addShard(ShardRouting shard) {
662662
return this;
663663
}
664664

665-
void ensureShardArray(int shardCount) {
665+
public void ensureShardArray(int shardCount) {
666666
if (shards == null) {
667667
shards = new IndexShardRoutingTable.Builder[shardCount];
668668
} else if (shards.length < shardCount) {

server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,14 @@ public Builder(ShardRoutingRoleStrategy shardRoutingRoleStrategy, RoutingTable r
436436
this.indicesRouting = ImmutableOpenMap.builder(routingTable.indicesRouting);
437437
}
438438

439+
public IndexRoutingTable getIndexRoutingTable(String index) {
440+
return indicesRouting.get(index);
441+
}
442+
443+
public ShardRoutingRoleStrategy getShardRoutingRoleStrategy() {
444+
return shardRoutingRoleStrategy;
445+
}
446+
439447
private static void addShard(
440448
final Map<String, IndexRoutingTable.Builder> indexRoutingTableBuilders,
441449
final ShardRouting shardRoutingEntry

server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,11 @@ public enum Reason {
175175
/**
176176
* Replica is unpromotable and the primary failed.
177177
*/
178-
UNPROMOTABLE_REPLICA
178+
UNPROMOTABLE_REPLICA,
179+
/**
180+
* New shard added as part of index re-sharding operation
181+
*/
182+
RESHARD_ADDED
179183
}
180184

181185
/**
@@ -335,9 +339,14 @@ public void writeTo(StreamOutput out) throws IOException {
335339
out.writeByte((byte) Reason.NODE_LEFT.ordinal());
336340
} else if (reason.equals(Reason.UNPROMOTABLE_REPLICA) && out.getTransportVersion().before(VERSION_UNPROMOTABLE_REPLICA_ADDED)) {
337341
out.writeByte((byte) Reason.PRIMARY_FAILED.ordinal());
338-
} else {
339-
out.writeByte((byte) reason.ordinal());
340-
}
342+
} else if (reason.equals(Reason.RESHARD_ADDED)
343+
&& out.getTransportVersion().before(TransportVersions.UNASSIGENEDINFO_RESHARD_ADDED)) {
344+
// We should have protection to ensure we do not reshard in mixed clusters
345+
assert false;
346+
out.writeByte((byte) Reason.FORCED_EMPTY_PRIMARY.ordinal());
347+
} else {
348+
out.writeByte((byte) reason.ordinal());
349+
}
341350
out.writeLong(unassignedTimeMillis);
342351
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
343352
out.writeBoolean(delayed);

server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ public void testReasonOrdinalOrder() {
8888
UnassignedInfo.Reason.MANUAL_ALLOCATION,
8989
UnassignedInfo.Reason.INDEX_CLOSED,
9090
UnassignedInfo.Reason.NODE_RESTARTING,
91-
UnassignedInfo.Reason.UNPROMOTABLE_REPLICA };
91+
UnassignedInfo.Reason.UNPROMOTABLE_REPLICA,
92+
UnassignedInfo.Reason.RESHARD_ADDED };
9293
for (int i = 0; i < order.length; i++) {
9394
assertThat(order[i].ordinal(), equalTo(i));
9495
}

0 commit comments

Comments
 (0)