Skip to content

Commit 04deda8

Browse files
authored
Merge branch 'main' into refactor/streaming-metrics
2 parents be38dd8 + 7626026 commit 04deda8

File tree

22 files changed

+227
-250
lines changed

22 files changed

+227
-250
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3ClientSettings.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,28 +121,28 @@ final class S3ClientSettings {
121121
static final Setting.AffixSetting<TimeValue> READ_TIMEOUT_SETTING = Setting.affixKeySetting(
122122
PREFIX,
123123
"read_timeout",
124-
key -> Setting.timeSetting(key, TimeValue.timeValueMillis(ClientConfiguration.DEFAULT_SOCKET_TIMEOUT), Property.NodeScope)
124+
key -> Setting.timeSetting(key, Defaults.READ_TIMEOUT, Property.NodeScope)
125125
);
126126

127127
/** The maximum number of concurrent connections to use. */
128128
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
129129
PREFIX,
130130
"max_connections",
131-
key -> Setting.intSetting(key, ClientConfiguration.DEFAULT_MAX_CONNECTIONS, 1, Property.NodeScope)
131+
key -> Setting.intSetting(key, Defaults.MAX_CONNECTIONS, 1, Property.NodeScope)
132132
);
133133

134134
/** The number of retries to use when an s3 request fails. */
135135
static final Setting.AffixSetting<Integer> MAX_RETRIES_SETTING = Setting.affixKeySetting(
136136
PREFIX,
137137
"max_retries",
138-
key -> Setting.intSetting(key, ClientConfiguration.DEFAULT_RETRY_POLICY.getMaxErrorRetry(), 0, Property.NodeScope)
138+
key -> Setting.intSetting(key, Defaults.RETRY_COUNT, 0, Property.NodeScope)
139139
);
140140

141141
/** Whether retries should be throttled (ie use backoff). */
142142
static final Setting.AffixSetting<Boolean> USE_THROTTLE_RETRIES_SETTING = Setting.affixKeySetting(
143143
PREFIX,
144144
"use_throttle_retries",
145-
key -> Setting.boolSetting(key, ClientConfiguration.DEFAULT_THROTTLE_RETRIES, Property.NodeScope)
145+
key -> Setting.boolSetting(key, Defaults.THROTTLE_RETRIES, Property.NodeScope)
146146
);
147147

148148
/** Whether the s3 client should use path style access. */
@@ -335,7 +335,7 @@ S3ClientSettings refine(Settings repositorySettings) {
335335

336336
/**
337337
* Load all client settings from the given settings.
338-
*
338+
* <p>
339339
* Note this will always at least return a client named "default".
340340
*/
341341
static Map<String, S3ClientSettings> load(Settings settings) {
@@ -501,4 +501,11 @@ private static <T> T getRepoSettingOrDefault(Setting.AffixSetting<T> setting, Se
501501
}
502502
return defaultValue;
503503
}
504+
505+
static final class Defaults {
506+
static final TimeValue READ_TIMEOUT = TimeValue.timeValueMillis(ClientConfiguration.DEFAULT_SOCKET_TIMEOUT);
507+
static final int MAX_CONNECTIONS = ClientConfiguration.DEFAULT_MAX_CONNECTIONS;
508+
static final int RETRY_COUNT = ClientConfiguration.DEFAULT_RETRY_POLICY.getMaxErrorRetry();
509+
static final boolean THROTTLE_RETRIES = ClientConfiguration.DEFAULT_THROTTLE_RETRIES;
510+
}
504511
}

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
3131
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
3232

33-
import org.apache.http.HttpStatus;
3433
import org.apache.logging.log4j.LogManager;
3534
import org.apache.logging.log4j.Logger;
3635
import org.elasticsearch.ElasticsearchException;
@@ -44,6 +43,7 @@
4443
import org.elasticsearch.core.IOUtils;
4544
import org.elasticsearch.core.TimeValue;
4645
import org.elasticsearch.env.Environment;
46+
import org.elasticsearch.rest.RestStatus;
4747
import org.elasticsearch.watcher.FileChangesListener;
4848
import org.elasticsearch.watcher.FileWatcher;
4949
import org.elasticsearch.watcher.ResourceWatcherService;
@@ -115,8 +115,8 @@ class S3Service implements Closeable {
115115

116116
/**
117117
* Refreshes the settings for the AmazonS3 clients and clears the cache of
118-
* existing clients. New clients will be build using these new settings. Old
119-
* clients are usable until released. On release they will be destroyed instead
118+
* existing clients. New clients will be built using these new settings. Old
119+
* clients are usable until released. On release, they will be destroyed instead
120120
* of being returned to the cache.
121121
*/
122122
public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
@@ -126,7 +126,7 @@ public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clie
126126
this.staticClientSettings = Maps.ofEntries(clientsSettings.entrySet());
127127
derivedClientSettings = emptyMap();
128128
assert this.staticClientSettings.containsKey("default") : "always at least have 'default'";
129-
// clients are built lazily by {@link client}
129+
/* clients are built lazily by {@link #client} */
130130
}
131131

132132
/**
@@ -341,7 +341,8 @@ public void refresh() {
341341
* <ul>
342342
* <li>Reads the location of the web identity token not from AWS_WEB_IDENTITY_TOKEN_FILE, but from a symlink
343343
* in the plugin directory, so we don't need to create a hardcoded read file permission for the plugin.</li>
344-
* <li>Supports customization of the STS endpoint via a system property, so we can test it against a test fixture.</li>
344+
* <li>Supports customization of the STS (Security Token Service) endpoint via a system property, so we can
345+
* test it against a test fixture.</li>
345346
* <li>Supports gracefully shutting down the provider and the STS client.</li>
346347
* </ul>
347348
*/
@@ -384,7 +385,7 @@ static class CustomWebIdentityTokenCredentialsProvider implements AWSCredentials
384385
if (roleArn == null) {
385386
LOGGER.warn(
386387
"Unable to use a web identity token for authentication. The AWS_WEB_IDENTITY_TOKEN_FILE environment "
387-
+ "variable is set, but either AWS_ROLE_ARN is missing"
388+
+ "variable is set, but AWS_ROLE_ARN is missing"
388389
);
389390
return;
390391
}
@@ -528,7 +529,7 @@ interface JvmEnvironment {
528529
return true;
529530
}
530531
if (exception instanceof AmazonServiceException ase) {
531-
return ase.getStatusCode() == HttpStatus.SC_FORBIDDEN && "InvalidAccessKeyId".equals(ase.getErrorCode());
532+
return ase.getStatusCode() == RestStatus.FORBIDDEN.getStatus() && "InvalidAccessKeyId".equals(ase.getErrorCode());
532533
}
533534
return false;
534535
})

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.recycler.Recycler;
3232
import org.elasticsearch.common.transport.NetworkExceptionHelper;
3333
import org.elasticsearch.common.util.concurrent.ThreadContext;
34-
import org.elasticsearch.core.Nullable;
3534
import org.elasticsearch.core.RefCounted;
3635
import org.elasticsearch.core.Releasable;
3736
import org.elasticsearch.core.Releasables;
@@ -116,6 +115,7 @@ void sendRequest(
116115
assert assertValidTransportVersion(transportVersion);
117116
sendMessage(
118117
channel,
118+
MessageDirection.REQUEST,
119119
action,
120120
request,
121121
requestId,
@@ -148,7 +148,8 @@ void sendResponse(
148148
try {
149149
sendMessage(
150150
channel,
151-
null,
151+
MessageDirection.RESPONSE,
152+
action,
152153
response,
153154
requestId,
154155
isHandshake,
@@ -190,7 +191,8 @@ void sendErrorResponse(
190191
try {
191192
sendMessage(
192193
channel,
193-
null,
194+
MessageDirection.RESPONSE_ERROR,
195+
action,
194196
msg,
195197
requestId,
196198
false,
@@ -206,29 +208,36 @@ void sendErrorResponse(
206208
}
207209
}
208210

211+
public enum MessageDirection {
212+
REQUEST,
213+
RESPONSE,
214+
RESPONSE_ERROR
215+
}
216+
209217
private void sendMessage(
210218
TcpChannel channel,
211-
@Nullable String requestAction,
219+
MessageDirection messageDirection,
220+
String action,
212221
Writeable writeable,
213222
long requestId,
214223
boolean isHandshake,
215-
Compression.Scheme compressionScheme,
224+
Compression.Scheme possibleCompressionScheme,
216225
TransportVersion version,
217226
ResponseStatsConsumer responseStatsConsumer,
218227
Releasable onAfter
219228
) throws IOException {
220-
compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
229+
assert action != null;
230+
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
221231
final BytesReference message;
222232
boolean serializeSuccess = false;
223-
final boolean isError = writeable instanceof RemoteTransportException;
224233
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
225234
try {
226235
message = serialize(
227-
requestAction,
236+
messageDirection,
237+
action,
228238
requestId,
229239
isHandshake,
230240
version,
231-
isError,
232241
compressionScheme,
233242
writeable,
234243
threadPool.getThreadContext(),
@@ -244,14 +253,23 @@ private void sendMessage(
244253
}
245254
}
246255
responseStatsConsumer.addResponseStats(message.length());
247-
final var responseType = writeable.getClass();
248-
final boolean compress = compressionScheme != null;
256+
final var messageType = writeable.getClass();
249257
internalSend(
250258
channel,
251259
message,
252-
requestAction == null
253-
? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254-
: () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
260+
() -> (messageDirection == MessageDirection.REQUEST ? "Request{" : "Response{")
261+
+ action
262+
+ "}{id="
263+
+ requestId
264+
+ "}{err="
265+
+ (messageDirection == MessageDirection.RESPONSE_ERROR)
266+
+ "}{cs="
267+
+ compressionScheme
268+
+ "}{hs="
269+
+ isHandshake
270+
+ "}{t="
271+
+ messageType
272+
+ "}",
255273
ActionListener.releasing(
256274
message instanceof ReleasableBytesReference r
257275
? Releasables.wrap(byteStreamOutput, onAfter, r)
@@ -262,38 +280,39 @@ private void sendMessage(
262280

263281
// public for tests
264282
public static BytesReference serialize(
265-
@Nullable String requestAction,
283+
MessageDirection messageDirection,
284+
String action,
266285
long requestId,
267286
boolean isHandshake,
268287
TransportVersion version,
269-
boolean isError,
270288
Compression.Scheme compressionScheme,
271289
Writeable writeable,
272290
ThreadContext threadContext,
273291
RecyclerBytesStreamOutput byteStreamOutput
274292
) throws IOException {
293+
assert action != null;
275294
assert byteStreamOutput.position() == 0;
276295
byteStreamOutput.setTransportVersion(version);
277296
byteStreamOutput.skip(TcpHeader.HEADER_SIZE);
278297
threadContext.writeTo(byteStreamOutput);
279-
if (requestAction != null) {
298+
if (messageDirection == MessageDirection.REQUEST) {
280299
if (version.before(TransportVersions.V_8_0_0)) {
281300
// empty features array
282301
byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
283302
}
284-
byteStreamOutput.writeString(requestAction);
303+
byteStreamOutput.writeString(action);
285304
}
286305

287306
final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
288307
BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);
289308
byte status = 0;
290-
if (requestAction == null) {
309+
if (messageDirection != MessageDirection.REQUEST) {
291310
status = TransportStatus.setResponse(status);
292311
}
293312
if (isHandshake) {
294313
status = TransportStatus.setHandshake(status);
295314
}
296-
if (isError) {
315+
if (messageDirection == MessageDirection.RESPONSE_ERROR) {
297316
status = TransportStatus.setError(status);
298317
}
299318
if (compressionScheme != null) {
@@ -316,6 +335,8 @@ private static BytesReference serializeMessageBody(
316335
try {
317336
stream.setTransportVersion(version);
318337
if (writeable instanceof BytesTransportRequest bRequest) {
338+
assert stream == byteStreamOutput;
339+
assert compressionScheme == null;
319340
bRequest.writeThin(stream);
320341
zeroCopyBuffer = bRequest.bytes;
321342
} else if (writeable instanceof RemoteTransportException remoteTransportException) {

0 commit comments

Comments
 (0)