Skip to content

Commit ee87d79

Browse files
authored
Merge branch 'main' into non-issue/esql-full-text-functions-disjunctions
2 parents ce30c76 + 82295cf commit ee87d79

File tree

199 files changed

+2740
-851
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

199 files changed

+2740
-851
lines changed

docs/changelog/120250.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120250
2+
summary: "Retry internally when CAS upload is throttled [GCS]"
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 116546

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/NetworkEntitlement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ public NetworkEntitlement(List<String> actionsList) {
4747
for (String actionString : actionsList) {
4848
var action = ACTION_MAP.get(actionString);
4949
if (action == null) {
50-
throw new IllegalArgumentException("unknown network action [" + actionString + "]");
50+
throw new PolicyValidationException("unknown network action [" + actionString + "]");
5151
}
5252
if ((actionsInt & action) == action) {
53-
throw new IllegalArgumentException(Strings.format("network action [%s] specified multiple times", actionString));
53+
throw new PolicyValidationException(Strings.format("network action [%s] specified multiple times", actionString));
5454
}
5555
actionsInt |= action;
5656
}

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyParser.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.entitlement.runtime.policy;
1111

12+
import org.elasticsearch.xcontent.XContentLocation;
1213
import org.elasticsearch.xcontent.XContentParser;
1314
import org.elasticsearch.xcontent.XContentParserConfiguration;
1415
import org.elasticsearch.xcontent.yaml.YamlXContent;
@@ -119,6 +120,7 @@ protected Scope parseScope(String scopeName) throws IOException {
119120
}
120121

121122
protected Entitlement parseEntitlement(String scopeName, String entitlementType) throws IOException {
123+
XContentLocation startLocation = policyParser.getTokenLocation();
122124
Class<?> entitlementClass = EXTERNAL_ENTITLEMENTS.get(entitlementType);
123125

124126
if (entitlementClass == null) {
@@ -170,7 +172,10 @@ protected Entitlement parseEntitlement(String scopeName, String entitlementType)
170172
try {
171173
return (Entitlement) entitlementConstructor.newInstance(parameterValues);
172174
} catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
173-
throw new IllegalStateException("internal error");
175+
if (e.getCause() instanceof PolicyValidationException piae) {
176+
throw newPolicyParserException(startLocation, scopeName, entitlementType, piae);
177+
}
178+
throw new IllegalStateException("internal error", e);
174179
}
175180
}
176181

@@ -191,4 +196,13 @@ protected PolicyParserException newPolicyParserException(String scopeName, Strin
191196
message
192197
);
193198
}
199+
200+
protected PolicyParserException newPolicyParserException(
201+
XContentLocation location,
202+
String scopeName,
203+
String entitlementType,
204+
PolicyValidationException cause
205+
) {
206+
return PolicyParserException.newPolicyParserException(location, policyName, scopeName, entitlementType, cause);
207+
}
194208
}

libs/entitlement/src/main/java/org/elasticsearch/entitlement/runtime/policy/PolicyParserException.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,36 @@ public static PolicyParserException newPolicyParserException(
8686
}
8787
}
8888

89+
public static PolicyParserException newPolicyParserException(
90+
XContentLocation location,
91+
String policyName,
92+
String scopeName,
93+
String entitlementType,
94+
PolicyValidationException cause
95+
) {
96+
assert (scopeName != null);
97+
return new PolicyParserException(
98+
"["
99+
+ location.lineNumber()
100+
+ ":"
101+
+ location.columnNumber()
102+
+ "] policy parsing error for ["
103+
+ policyName
104+
+ "] in scope ["
105+
+ scopeName
106+
+ "] for entitlement type ["
107+
+ entitlementType
108+
+ "]: "
109+
+ cause.getMessage(),
110+
cause
111+
);
112+
}
113+
89114
private PolicyParserException(String message) {
90115
super(message);
91116
}
117+
118+
private PolicyParserException(String message, PolicyValidationException cause) {
119+
super(message, cause);
120+
}
92121
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.entitlement.runtime.policy;
11+
12+
/**
13+
* This exception is used to track validation errors thrown during the construction
14+
* of entitlements. By using this instead of other exception types the policy
15+
* parser is able to wrap this exception with a line/character number for
16+
* additional useful error information.
17+
*/
18+
class PolicyValidationException extends RuntimeException {
19+
20+
PolicyValidationException(String message) {
21+
super(message);
22+
}
23+
24+
PolicyValidationException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
27+
}

libs/entitlement/src/test/java/org/elasticsearch/entitlement/runtime/policy/PolicyParserTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,24 @@ public void testParseNetwork() throws IOException {
6868
assertEquals(expected, parsedPolicy);
6969
}
7070

71+
public void testParseNetworkIllegalAction() throws IOException {
72+
var ex = expectThrows(PolicyParserException.class, () -> new PolicyParser(new ByteArrayInputStream("""
73+
entitlement-module-name:
74+
- network:
75+
actions:
76+
- listen
77+
- doesnotexist
78+
- connect
79+
""".getBytes(StandardCharsets.UTF_8)), "test-policy.yaml", false).parsePolicy());
80+
assertThat(
81+
ex.getMessage(),
82+
equalTo(
83+
"[2:5] policy parsing error for [test-policy.yaml] in scope [entitlement-module-name] for entitlement type [network]: "
84+
+ "unknown network action [doesnotexist]"
85+
)
86+
);
87+
}
88+
7189
public void testParseCreateClassloader() throws IOException {
7290
Policy parsedPolicy = new PolicyParser(new ByteArrayInputStream("""
7391
entitlement-module-name:

modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.repositories.azure;
1111

12+
import com.sun.net.httpserver.Headers;
1213
import com.sun.net.httpserver.HttpExchange;
1314
import com.sun.net.httpserver.HttpHandler;
1415

@@ -21,6 +22,7 @@
2122
import org.elasticsearch.common.bytes.BytesReference;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.core.SuppressForbidden;
25+
import org.elasticsearch.http.ResponseInjectingHttpHandler;
2426
import org.elasticsearch.plugins.PluginsService;
2527
import org.elasticsearch.repositories.RepositoriesMetrics;
2628
import org.elasticsearch.repositories.RepositoriesService;
@@ -46,7 +48,6 @@
4648
import java.util.stream.IntStream;
4749

4850
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
49-
import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler;
5051
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
5152
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5253
import static org.hamcrest.Matchers.hasSize;
@@ -60,15 +61,15 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
6061
);
6162
private static final int MAX_RETRIES = 3;
6263

63-
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
64+
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
6465

6566
@Override
6667
protected Map<String, HttpHandler> createHttpHandlers() {
6768
Map<String, HttpHandler> httpHandlers = super.createHttpHandlers();
6869
assert httpHandlers.size() == 1 : "This assumes there's a single handler";
6970
return httpHandlers.entrySet()
7071
.stream()
71-
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingAzureHttpHandler(requestHandlers, e.getValue())));
72+
.collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingHttpHandler(requestHandlers, e.getValue())));
7273
}
7374

7475
/**
@@ -106,7 +107,7 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException {
106107
// Queue up some throttle responses
107108
final int numThrottles = randomIntBetween(1, MAX_RETRIES);
108109
IntStream.range(0, numThrottles)
109-
.forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
110+
.forEach(i -> requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)));
110111

111112
// Check that the blob exists
112113
blobContainer.blobExists(purpose, blobName);
@@ -132,11 +133,7 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException {
132133

133134
// Queue up a range-not-satisfied error
134135
requestHandlers.offer(
135-
new ResponseInjectingAzureHttpHandler.FixedRequestHandler(
136-
RestStatus.REQUESTED_RANGE_NOT_SATISFIED,
137-
null,
138-
GET_BLOB_REQUEST_PREDICATE
139-
)
136+
new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE)
140137
);
141138

142139
// Attempt to read the blob
@@ -169,7 +166,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException {
169166
if (status == RestStatus.TOO_MANY_REQUESTS) {
170167
throttles.incrementAndGet();
171168
}
172-
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status));
169+
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(status));
173170
});
174171

175172
// Check that the blob exists
@@ -265,7 +262,7 @@ public void testBatchDeleteFailure() throws IOException {
265262
clearMetrics(dataNodeName);
266263

267264
// Handler will fail one or more of the batch requests
268-
final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
265+
final ResponseInjectingHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
269266

270267
// Exhaust the retries
271268
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
@@ -308,6 +305,35 @@ private MetricsAsserter metricsAsserter(
308305
return new MetricsAsserter(dataNodeName, operationPurpose, operation, repository);
309306
}
310307

308+
/**
309+
* Creates a {@link ResponseInjectingHttpHandler.RequestHandler} that will persistently fail the first <code>numberToFail</code>
310+
* distinct requests it sees. Any other requests are passed through to the delegate.
311+
*
312+
* @param numberToFail The number of requests to fail
313+
* @return the handler
314+
*/
315+
private static ResponseInjectingHttpHandler.RequestHandler createFailNRequestsHandler(int numberToFail) {
316+
final List<String> requestsToFail = new ArrayList<>(numberToFail);
317+
return (exchange, delegate) -> {
318+
final Headers requestHeaders = exchange.getRequestHeaders();
319+
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
320+
boolean failRequest = false;
321+
synchronized (requestsToFail) {
322+
if (requestsToFail.contains(requestId)) {
323+
failRequest = true;
324+
} else if (requestsToFail.size() < numberToFail) {
325+
requestsToFail.add(requestId);
326+
failRequest = true;
327+
}
328+
}
329+
if (failRequest) {
330+
exchange.sendResponseHeaders(500, -1);
331+
} else {
332+
delegate.handle(exchange);
333+
}
334+
};
335+
}
336+
311337
private class MetricsAsserter {
312338
private final String dataNodeName;
313339
private final OperationPurpose purpose;

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.blobstore.OperationPurpose;
1717
import org.elasticsearch.common.bytes.BytesReference;
1818
import org.elasticsearch.core.SuppressForbidden;
19+
import org.elasticsearch.http.ResponseInjectingHttpHandler;
1920
import org.elasticsearch.rest.RestStatus;
2021
import org.junit.Before;
2122

@@ -34,14 +35,14 @@
3435

3536
public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
3637

37-
private final Queue<ResponseInjectingAzureHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
38+
private final Queue<ResponseInjectingHttpHandler.RequestHandler> requestHandlers = new ConcurrentLinkedQueue<>();
3839

3940
@SuppressForbidden(reason = "use a http server")
4041
@Before
4142
public void configureAzureHandler() {
4243
httpServer.createContext(
4344
"/",
44-
new ResponseInjectingAzureHttpHandler(
45+
new ResponseInjectingHttpHandler(
4546
requestHandlers,
4647
new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE)
4748
)
@@ -61,7 +62,7 @@ public void testRetriesAndOperationsAreTrackedSeparately() throws IOException {
6162
for (int i = 0; i < randomIntBetween(10, 50); i++) {
6263
final boolean triggerRetry = randomBoolean();
6364
if (triggerRetry) {
64-
requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
65+
requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS));
6566
}
6667
final AzureBlobStore.Operation operation = randomFrom(supportedOperations);
6768
switch (operation) {

modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.util.BytesRefBuilder;
2626
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2727
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.BackoffPolicy;
2829
import org.elasticsearch.common.blobstore.BlobContainer;
2930
import org.elasticsearch.common.blobstore.BlobPath;
3031
import org.elasticsearch.common.blobstore.BlobStore;
@@ -268,7 +269,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() {
268269
metadata.name(),
269270
storageService,
270271
bigArrays,
271-
randomIntBetween(1, 8) * 1024
272+
randomIntBetween(1, 8) * 1024,
273+
BackoffPolicy.noBackoff()
272274
) {
273275
@Override
274276
long getLargeBlobThresholdInBytes() {

0 commit comments

Comments
 (0)