Skip to content

Commit 1a1dcdc

Browse files
authored
Merge pull request opensearch-project#1694 from AndreKurait/AutomaticDetectTargetCompressionSupport
Add support to automatically detect target cluster compression support based on cluster settings
2 parents 16124b9 + 3fee0e0 commit 1a1dcdc

File tree

20 files changed

+224
-106
lines changed

20 files changed

+224
-106
lines changed

DocumentsFromSnapshotMigration/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ To see the default shard size, use the `--help` CLI option:
9595

9696
These arguments should be carefully considered before setting, can include experimental features, and can impact security posture of the solution. Tread with caution.
9797

98-
| Argument | Description |
99-
|-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------|
100-
| --target-compression | Flag to enable request compression for target cluster. Default: false |
101-
| --documents-per-bulk-request | The number of documents to be included within each bulk request sent. Default: no max (controlled by documents size) |
102-
| --max-connections | The maximum number of connections to simultaneously used to communicate to the target. Default: 10 |
103-
| --target-insecure | Flag to allow untrusted SSL certificates for target cluster. Default: false |
98+
| Argument | Description |
99+
|-----------------------------|:---------------------------------------------------------------------------------------------------------------------|
100+
| --disable-compression | Flag to disable request compression for target cluster. Default: false |
101+
| --documents-per-bulk-request | The number of documents to be included within each bulk request sent. Default: no max (controlled by documents size) |
102+
| --max-connections | The maximum number of connections to simultaneously used to communicate to the target. Default: 10 |
103+
| --target-insecure | Flag to allow untrusted SSL certificates for target cluster. Default: false |

DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.ExecutionException;
1010
import java.util.concurrent.Executors;
11+
import java.util.concurrent.TimeUnit;
1112
import java.util.concurrent.atomic.AtomicInteger;
1213
import java.util.stream.Stream;
1314

@@ -109,7 +110,7 @@ public void testDocumentMigration(
109110
}
110111
var thrownException = Assertions.assertThrows(
111112
ExecutionException.class,
112-
() -> CompletableFuture.allOf(workerFutures.toArray(CompletableFuture[]::new)).get()
113+
() -> CompletableFuture.allOf(workerFutures.toArray(CompletableFuture[]::new)).get(120, TimeUnit.SECONDS)
113114
);
114115
var numTotalRuns = workerFutures.stream().mapToInt(cf -> {
115116
try {

DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,6 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
286286
var coordinatorFactory = new WorkCoordinatorFactory(targetVersion);
287287
var connectionContext = ConnectionContextTestParams.builder()
288288
.host(targetAddress)
289-
.compressionEnabled(true)
290289
.build()
291290
.toConnectionContext();
292291
var workItemRef = new AtomicReference<IWorkCoordinator.WorkItemAndDuration>();

RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import org.opensearch.migrations.AwarenessAttributeSettings;
1616
import org.opensearch.migrations.Version;
17+
import org.opensearch.migrations.bulkload.common.http.CompressionMode;
1718
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
1819
import org.opensearch.migrations.bulkload.common.http.HttpResponse;
1920
import org.opensearch.migrations.bulkload.tracing.IRfsContexts;
@@ -57,15 +58,17 @@ public abstract class OpenSearchClient {
5758
protected final RestClient client;
5859
protected final FailedRequestsLogger failedRequestsLogger;
5960
private final Version version;
61+
private final CompressionMode compressionMode;
6062

61-
protected OpenSearchClient(ConnectionContext connectionContext, Version version) {
62-
this(new RestClient(connectionContext), new FailedRequestsLogger(), version);
63+
protected OpenSearchClient(ConnectionContext connectionContext, Version version, CompressionMode compressionMode) {
64+
this(new RestClient(connectionContext), new FailedRequestsLogger(), version, compressionMode);
6365
}
6466

65-
protected OpenSearchClient(RestClient client, FailedRequestsLogger failedRequestsLogger, Version version) {
67+
protected OpenSearchClient(RestClient client, FailedRequestsLogger failedRequestsLogger, Version version, CompressionMode compressionMode) {
6668
this.client = client;
6769
this.failedRequestsLogger = failedRequestsLogger;
6870
this.version = version;
71+
this.compressionMode = compressionMode;
6972
}
7073

7174
public Version getClusterVersion() {
@@ -427,8 +430,7 @@ public Mono<BulkResponse> sendBulkRequest(String indexName, List<BulkDocSection>
427430
log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap::keySet).log();
428431
var body = BulkDocSection.convertToBulkRequestBody(docsMap.values());
429432
var additionalHeaders = new HashMap<String, List<String>>();
430-
// Reduce network bandwidth by attempting request and response compression
431-
if (client.supportsGzipCompression()) {
433+
if (CompressionMode.GZIP_BODY_COMPRESSION.equals(compressionMode)) {
432434
RestClient.addGzipRequestHeaders(additionalHeaders);
433435
RestClient.addGzipResponseHeaders(additionalHeaders);
434436
}

RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactory.java

Lines changed: 103 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.opensearch.migrations.bulkload.common;
22

3-
43
import java.lang.reflect.InvocationTargetException;
54
import java.util.HashSet;
65
import java.util.Optional;
@@ -9,6 +8,7 @@
98
import org.opensearch.migrations.UnboundVersionMatchers;
109
import org.opensearch.migrations.Version;
1110
import org.opensearch.migrations.VersionMatchers;
11+
import org.opensearch.migrations.bulkload.common.http.CompressionMode;
1212
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
1313
import org.opensearch.migrations.bulkload.common.http.HttpResponse;
1414
import org.opensearch.migrations.bulkload.version_es_5_6.OpenSearchClient_ES_5_6;
@@ -27,10 +27,11 @@
2727
public class OpenSearchClientFactory {
2828
private static final ObjectMapper objectMapper = new ObjectMapper();
2929

30-
private ConnectionContext connectionContext;
30+
private final ConnectionContext connectionContext;
3131
private Version version;
32+
private CompressionMode compressionMode;
3233
RestClient client;
33-
34+
3435
public OpenSearchClientFactory(ConnectionContext connectionContext) {
3536
if (connectionContext == null) {
3637
throw new IllegalArgumentException("Connection context was not provided in constructor.");
@@ -40,26 +41,27 @@ public OpenSearchClientFactory(ConnectionContext connectionContext) {
4041
}
4142

4243
public OpenSearchClient determineVersionAndCreate() {
43-
if (version == null) {
44-
version = getClusterVersion();
45-
}
46-
var clientClass = getOpenSearchClientClass(version);
47-
try {
48-
return clientClass.getConstructor(ConnectionContext.class, Version.class)
49-
.newInstance(connectionContext, version);
50-
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
51-
throw new ClientInstantiationException("Failed to instantiate OpenSearchClient", e);
52-
}
44+
return determineVersionAndCreate(null, null);
5345
}
5446

5547
public OpenSearchClient determineVersionAndCreate(RestClient restClient, FailedRequestsLogger failedRequestsLogger) {
5648
if (version == null) {
5749
version = getClusterVersion();
5850
}
51+
52+
if (!connectionContext.isDisableCompression() && Boolean.TRUE.equals(getCompressionEnabled())) {
53+
compressionMode = CompressionMode.GZIP_BODY_COMPRESSION;
54+
} else {
55+
compressionMode = CompressionMode.UNCOMPRESSED;
56+
}
5957
var clientClass = getOpenSearchClientClass(version);
6058
try {
61-
return clientClass.getConstructor(RestClient.class, FailedRequestsLogger.class, Version.class)
62-
.newInstance(restClient, failedRequestsLogger, version);
59+
if (restClient == null && failedRequestsLogger == null) {
60+
return clientClass.getConstructor(ConnectionContext.class, Version.class, CompressionMode.class)
61+
.newInstance(connectionContext, version, compressionMode);
62+
}
63+
return clientClass.getConstructor(RestClient.class, FailedRequestsLogger.class, Version.class, CompressionMode.class)
64+
.newInstance(restClient, failedRequestsLogger, version, compressionMode);
6365
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
6466
throw new ClientInstantiationException("Failed to instantiate OpenSearchClient", e);
6567
}
@@ -76,28 +78,47 @@ private Class<? extends OpenSearchClient> getOpenSearchClientClass(Version versi
7678
throw new IllegalArgumentException("Unsupported version: " + version);
7779
}
7880

79-
/** Amazon OpenSearch Serverless cluster don't have a version number, but
80-
* it is closely aligned with the latest open-source OpenSearch 2.X */
81+
/** Amazon OpenSearch Serverless clusters don't have a version number, but
82+
* they are closely aligned with the latest open-source OpenSearch 2.X */
8183
private static final Version AMAZON_SERVERLESS_VERSION = Version.builder()
8284
.flavor(Flavor.AMAZON_SERVERLESS_OPENSEARCH)
8385
.major(2)
8486
.build();
8587

88+
private Boolean getCompressionEnabled() {
89+
log.atInfo().setMessage("Checking compression on cluster").log();
90+
return client.getAsync("_cluster/settings?include_defaults=true", null)
91+
.flatMap(this::checkCompressionFromResponse)
92+
.doOnError(e -> log.atWarn()
93+
.setMessage("Check cluster compression failed")
94+
.setCause(e)
95+
.log())
96+
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
97+
.onErrorReturn(false)
98+
.doOnNext(hasCompressionEnabled -> log.atInfo()
99+
.setMessage("After querying target, compression={}")
100+
.addArgument(hasCompressionEnabled).log())
101+
.block();
102+
}
103+
86104
public Version getClusterVersion() {
87105
var versionFromRootApi = client.getAsync("", null)
88-
.flatMap(resp -> {
89-
if (resp.statusCode == 200) {
90-
return versionFromResponse(resp);
91-
}
92-
// If the root API doesn't exist, the cluster is OpenSearch Serverless
93-
if (resp.statusCode == 404) {
94-
return Mono.just(AMAZON_SERVERLESS_VERSION);
95-
}
96-
return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp));
97-
})
98-
.doOnError(e -> log.error(e.getMessage()))
99-
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
100-
.block();
106+
.flatMap(resp -> {
107+
if (resp.statusCode == 200) {
108+
return versionFromResponse(resp);
109+
}
110+
// If the root API doesn't exist, the cluster is OpenSearch Serverless
111+
if (resp.statusCode == 404) {
112+
return Mono.just(AMAZON_SERVERLESS_VERSION);
113+
}
114+
return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp));
115+
})
116+
.doOnError(e -> log.atWarn()
117+
.setMessage("Check cluster version failed")
118+
.setCause(e)
119+
.log())
120+
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
121+
.block();
101122

102123
// Compatibility mode is only enabled on OpenSearch clusters responding with the version of 7.10.2
103124
if (!VersionMatchers.isES_7_10.test(versionFromRootApi)) {
@@ -108,8 +129,9 @@ public Version getClusterVersion() {
108129
.doOnError(e -> log.error(e.getMessage()))
109130
.retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY)
110131
.flatMap(hasCompatibilityModeEnabled -> {
111-
log.atInfo().setMessage("Checking CompatibilityMode, was enabled? {}").addArgument(hasCompatibilityModeEnabled).log();
132+
log.atInfo().setMessage("After querying target, compatibilityMode={}").addArgument(hasCompatibilityModeEnabled).log();
112133
if (Boolean.FALSE.equals(hasCompatibilityModeEnabled)) {
134+
assert versionFromRootApi != null : "Expected version from root api to be set";
113135
return Mono.just(versionFromRootApi);
114136
}
115137
return client.getAsync("_nodes/_all/nodes,version?format=json", null)
@@ -120,8 +142,9 @@ public Version getClusterVersion() {
120142
.onErrorResume(e -> {
121143
log.atWarn()
122144
.setCause(e)
123-
.setMessage("Unable to CompatibilityMode or determine the version from a plugin, falling back to version {}")
145+
.setMessage("Unable to determine CompatibilityMode or version from plugin, falling back to version {}")
124146
.addArgument(versionFromRootApi).log();
147+
assert versionFromRootApi != null : "Expected version from root api to be set";
125148
return Mono.just(versionFromRootApi);
126149
})
127150
.block();
@@ -153,28 +176,64 @@ private Mono<Version> versionFromResponse(HttpResponse resp) {
153176
}
154177

155178
Mono<Boolean> checkCompatibilityModeFromResponse(HttpResponse resp) {
179+
return checkBooleanSettingFromResponse(
180+
resp,
181+
"compatibility",
182+
"override_main_response_version",
183+
"Unable to determine if the cluster is in compatibility mode");
184+
}
185+
186+
Mono<Boolean> checkCompressionFromResponse(HttpResponse resp) {
187+
return checkBooleanSettingFromResponse(
188+
resp,
189+
"http_compression",
190+
"enabled",
191+
"Unable to determine if compression is supported")
192+
.or(checkBooleanSettingFromResponse(
193+
resp,
194+
"http",
195+
"compression",
196+
"Unable to determine if compression is supported")
197+
);
198+
}
199+
200+
private Mono<Boolean> checkBooleanSettingFromResponse(
201+
HttpResponse resp,
202+
String primaryKey,
203+
String secondaryKey,
204+
String errorLogMessage) {
205+
156206
if (resp.statusCode != 200) {
157207
return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp));
158208
}
159209
try {
160210
var body = Optional.of(objectMapper.readTree(resp.body));
161-
var persistentlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("persistent")));
162-
var transientlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("transient")));
163-
return Mono.just(persistentlyInCompatibilityMode || transientlyInCompatibilityMode);
211+
var persistentEnabled = isSettingEnabled(body.map(n -> n.get("persistent")), primaryKey, secondaryKey);
212+
var transientEnabled = isSettingEnabled(body.map(n -> n.get("transient")), primaryKey, secondaryKey);
213+
var defaultsEnabled = isSettingEnabled(body.map(n -> n.get("defaults")), primaryKey, secondaryKey);
214+
return Mono.just(persistentEnabled || transientEnabled || defaultsEnabled);
164215
} catch (Exception e) {
165-
log.error("Unable to determine if the cluster is in compatibility mode", e);
166-
return Mono.error(new OpenSearchClient.OperationFailed("Unable to determine if the cluster is in compatibility mode from response: " + e.getMessage(), resp));
216+
log.error(errorLogMessage, e);
217+
return Mono.error(new OpenSearchClient.OperationFailed(errorLogMessage + " from response: " + e.getMessage(), resp));
167218
}
168219
}
169220

170-
private boolean inCompatibilityMode(Optional<JsonNode> node) {
221+
private boolean isSettingEnabled(Optional<JsonNode> node, String primaryKey, String secondaryKey) {
171222
return node.filter(n -> !n.isNull())
172-
.map(n -> n.get("compatibility"))
173-
.filter(n -> !n.isNull())
174-
.map(n -> n.get("override_main_response_version"))
175-
.filter(n -> !n.isNull())
176-
.map(n -> n.asBoolean())
177-
.orElse(false);
223+
.map(n -> n.get(primaryKey))
224+
.filter(n -> !n.isNull())
225+
.map(n -> n.get(secondaryKey))
226+
.filter(n -> !n.isNull())
227+
.map(n -> {
228+
if (n.isBoolean()) {
229+
return n.asBoolean();
230+
} else if (n.isTextual()) {
231+
return Boolean.parseBoolean(n.asText());
232+
} else {
233+
return false;
234+
}
235+
})
236+
.orElse(false);
178237
}
179238

180239
private Mono<Version> getVersionFromNodes(HttpResponse resp) {
@@ -215,5 +274,4 @@ public ClientInstantiationException(String message, Exception cause) {
215274
super(message, cause);
216275
}
217276
}
218-
219277
}

RFS/src/main/java/org/opensearch/migrations/bulkload/common/RestClient.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,6 @@ public Mono<HttpResponse> asyncRequest(HttpMethod method, String path, String bo
175175
.doOnTerminate(() -> contextCleanupRef.get().run());
176176
}
177177

178-
179-
public boolean supportsGzipCompression() {
180-
return connectionContext.isCompressionSupported();
181-
}
182-
183178
public static void addGzipResponseHeaders(Map<String, List<String>> headers) {
184179
headers.put(ACCEPT_ENCODING_HEADER_NAME, List.of(GZIP_TYPE));
185180
}
@@ -190,10 +185,6 @@ public static void addGzipRequestHeaders(Map<String, List<String>> headers) {
190185
headers.put(GzipPayloadRequestTransformer.CONTENT_ENCODING_HEADER_NAME,
191186
List.of(GzipPayloadRequestTransformer.GZIP_CONTENT_ENCODING_HEADER_VALUE));
192187
}
193-
public static boolean hasGzipRequestHeaders(Map<String, List<String>> headers) {
194-
return headers.getOrDefault(GzipPayloadRequestTransformer.CONTENT_ENCODING_HEADER_NAME, List.of())
195-
.contains(GzipPayloadRequestTransformer.GZIP_CONTENT_ENCODING_HEADER_VALUE);
196-
}
197188

198189

199190
private Map<String, String> extractHeaders(HttpHeaders headers) {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.opensearch.migrations.bulkload.common.http;
2+
3+
public enum CompressionMode {
4+
GZIP_BODY_COMPRESSION,
5+
UNCOMPRESSED
6+
}

0 commit comments

Comments
 (0)