diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index 765c70abee378..f4ade5b441877 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -1,6 +1,5 @@ apply plugin: 'elasticsearch.internal-es-plugin' apply plugin: 'elasticsearch.internal-cluster-test' -apply plugin: 'elasticsearch.internal-java-rest-test' esplugin { name = 'x-pack-ccr' description = 'Elasticsearch Expanded Pack Plugin - CCR' @@ -34,16 +33,6 @@ tasks.named('internalClusterTestTestingConventions').configure { baseClass 'org.elasticsearch.test.ESIntegTestCase' } -tasks.named("javaRestTest").configure { - usesDefaultDistribution() -} - -restResources { - restApi { - include 'bulk', 'search', '_common', 'indices', 'index', 'cluster', 'data_stream' - } -} - addQaCheckDependencies(project) dependencies { diff --git a/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java b/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java deleted file mode 100644 index 4c61904475093..0000000000000 --- a/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.ccr.rest; - -import org.apache.http.util.EntityUtils; -import org.elasticsearch.Build; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.cluster.ElasticsearchCluster; -import org.elasticsearch.test.cluster.local.distribution.DistributionType; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xcontent.json.JsonXContent; -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.ClassRule; - -import java.io.IOException; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -public class ShardChangesRestIT extends ESRestTestCase { - private static final String CCR_SHARD_CHANGES_ENDPOINT = "/%s/ccr/shard_changes"; - private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk"; - private static final String DATA_STREAM_ENDPOINT = "/_data_stream/%s"; - private static final String INDEX_TEMPLATE_ENDPOINT = "/_index_template/%s"; - - private static final String[] SHARD_RESPONSE_FIELDS = new String[] { - "took_in_millis", - "operations", - "shard_id", - "index_abstraction", - "index", - "settings_version", - "max_seq_no_of_updates_or_deletes", - "number_of_operations", - "mapping_version", - "aliases_version", - "max_seq_no", - "global_checkpoint" }; - - private static final String BULK_INDEX_TEMPLATE = """ - { "index": { "op_type": "create" } } - { "@timestamp": "%s", "name": "%s" } - """;; - private static final String[] NAMES = { "skywalker", "leia", "obi-wan", "yoda", "chewbacca", "r2-d2", "c-3po", "darth-vader" }; - @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local() - .distribution(DistributionType.DEFAULT) - .setting("xpack.security.enabled", "false") - .setting("xpack.license.self_generated.type", "trial") - .build(); - - @Override - protected String getTestRestCluster() { - return cluster.getHttpAddresses(); - } - - @Before - public void assumeSnapshotBuild() { - assumeTrue("/{index}/ccr/shard_changes endpoint only available in snapshot builds", Build.current().isSnapshot()); - } - - public void testShardChangesNoOperation() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build() - ); - assertTrue(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - assertOK(client().performRequest(shardChangesRequest)); - } - - public void testShardChangesDefaultParams() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - final Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build(); - final String mappings = """ - { - "properties": { - "name": { - "type": "keyword" - } - } - } - """; - createIndex(indexName, settings, mappings); - assertTrue(indexExists(indexName)); - - assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - final Response response = client().performRequest(shardChangesRequest); - assertOK(response); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), - indexName - ); - } - - public void testDataStreamShardChangesDefaultParams() throws IOException { - final String templateName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); - assertOK(createIndexTemplate(templateName, """ - { - "index_patterns": [ "test-*-*" ], - "data_stream": {}, - "priority": 100, - "template": { - "mappings": { - "properties": { - "@timestamp": { - "type": "date" - }, - "name": { - "type": "keyword" - } - } - } - } - }""")); - - final String dataStreamName = "test-" - + randomAlphanumericOfLength(5).toLowerCase(Locale.ROOT) - + "-" - + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); - assertOK(createDataStream(dataStreamName)); - - assertOK(bulkIndex(dataStreamName, randomIntBetween(10, 20))); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(dataStreamName)); - final Response response = client().performRequest(shardChangesRequest); - assertOK(response); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), - dataStreamName - ); - } - - public void testIndexAliasShardChangesDefaultParams() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - final String aliasName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); - final Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build(); - final String mappings = """ - { - "properties": { - "name": { - "type": "keyword" - } - } - } - """; - createIndex(indexName, settings, mappings); - assertTrue(indexExists(indexName)); - - final Request putAliasRequest = new Request("PUT", "/" + indexName + "/_alias/" + aliasName); - assertOK(client().performRequest(putAliasRequest)); - - assertOK(bulkIndex(aliasName, randomIntBetween(10, 20))); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(aliasName)); - final Response response = client().performRequest(shardChangesRequest); - assertOK(response); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), - aliasName - ); - } - - public void testShardChangesWithAllParameters() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build() - ); - assertTrue(indexExists(indexName)); - - assertOK(bulkIndex(indexName, randomIntBetween(100, 200))); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - shardChangesRequest.addParameter("from_seq_no", "0"); - shardChangesRequest.addParameter("max_operations_count", "1"); - shardChangesRequest.addParameter("poll_timeout", "10s"); - shardChangesRequest.addParameter("max_batch_size", "1MB"); - - final Response response = client().performRequest(shardChangesRequest); - assertOK(response); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), - indexName - ); - } - - public void testShardChangesMultipleRequests() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build() - ); - assertTrue(indexExists(indexName)); - - assertOK(bulkIndex(indexName, randomIntBetween(100, 200))); - - final Request firstRequest = new Request("GET", shardChangesEndpoint(indexName)); - firstRequest.addParameter("from_seq_no", "0"); - firstRequest.addParameter("max_operations_count", "10"); - firstRequest.addParameter("poll_timeout", "10s"); - firstRequest.addParameter("max_batch_size", "1MB"); - - final Response firstResponse = client().performRequest(firstRequest); - assertOK(firstResponse); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false), - indexName - ); - - final Request secondRequest = new Request("GET", shardChangesEndpoint(indexName)); - secondRequest.addParameter("from_seq_no", "10"); - secondRequest.addParameter("max_operations_count", "10"); - secondRequest.addParameter("poll_timeout", "10s"); - secondRequest.addParameter("max_batch_size", "1MB"); - - final Response secondResponse = client().performRequest(secondRequest); - assertOK(secondResponse); - assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false), - indexName - ); - } - - public void testShardChangesInvalidFromSeqNo() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName); - assertTrue(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - shardChangesRequest.addParameter("from_seq_no", "-1"); - final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); - assertResponseException(ex, RestStatus.BAD_REQUEST, "Validation Failed: 1: fromSeqNo [-1] cannot be lower than 0"); - } - - public void testShardChangesInvalidMaxOperationsCount() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName); - assertTrue(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - shardChangesRequest.addParameter("max_operations_count", "-1"); - final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); - assertResponseException(ex, RestStatus.BAD_REQUEST, "Validation Failed: 1: maxOperationCount [-1] cannot be lower than 0"); - } - - public void testShardChangesNegativePollTimeout() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName); - assertTrue(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - shardChangesRequest.addParameter("poll_timeout", "-1s"); - assertOK(client().performRequest(shardChangesRequest)); - } - - public void testShardChangesInvalidMaxBatchSize() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex(indexName); - assertTrue(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - shardChangesRequest.addParameter("max_batch_size", "-1MB"); - final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); - assertResponseException( - ex, - RestStatus.BAD_REQUEST, - "failed to parse setting [max_batch_size] with value [-1MB] as a size in bytes" - ); - } - - public void testShardChangesMissingIndex() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - assertFalse(indexExists(indexName)); - - final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); - final ResponseException ex = assertThrows(ResponseException.class, () -> client().performRequest(shardChangesRequest)); - assertResponseException(ex, RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]"); - } - - private static Response bulkIndex(final String indexName, int numberOfDocuments) throws IOException { - final StringBuilder sb = new StringBuilder(); - - long timestamp = System.currentTimeMillis(); - for (int i = 0; i < numberOfDocuments; i++) { - sb.append( - String.format( - Locale.ROOT, - BULK_INDEX_TEMPLATE, - Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), - randomFrom(NAMES) - ) - ); - timestamp += 1000; // 1 second - } - - final Request request = new Request("POST", bulkEndpoint(indexName)); - request.setJsonEntity(sb.toString()); - request.addParameter("refresh", "true"); - return client().performRequest(request); - } - - private Response createDataStream(final String dataStreamName) throws IOException { - return client().performRequest(new Request("PUT", dataStreamEndpoint(dataStreamName))); - } - - private static Response createIndexTemplate(final String templateName, final String mappings) throws IOException { - final Request request = new Request("PUT", indexTemplateEndpoint(templateName)); - request.setJsonEntity(mappings); - return client().performRequest(request); - } - - private static String shardChangesEndpoint(final String indexName) { - return String.format(Locale.ROOT, CCR_SHARD_CHANGES_ENDPOINT, indexName); - } - - private static String bulkEndpoint(final String indexName) { - return String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName); - } - - private static String dataStreamEndpoint(final String dataStreamName) { - return String.format(Locale.ROOT, DATA_STREAM_ENDPOINT, dataStreamName); - } - - private static String indexTemplateEndpoint(final String templateName) { - return String.format(Locale.ROOT, INDEX_TEMPLATE_ENDPOINT, templateName); - } - - private void assertResponseException(final ResponseException ex, final RestStatus restStatus, final String error) { - assertEquals(restStatus.getStatus(), ex.getResponse().getStatusLine().getStatusCode()); - assertThat(ex.getMessage(), Matchers.containsString(error)); - } - - private void assertShardChangesResponse(final Map shardChangesResponseBody, final String indexAbstractionName) { - for (final String fieldName : SHARD_RESPONSE_FIELDS) { - final Object fieldValue = shardChangesResponseBody.get(fieldName); - assertNotNull("Field " + fieldName + " is missing or has a null value.", fieldValue); - - if ("index_abstraction".equals(fieldName)) { - assertEquals(indexAbstractionName, fieldValue); - } - - if ("operations".equals(fieldName)) { - if (fieldValue instanceof List operationsList) { - assertFalse("Field 'operations' is empty.", operationsList.isEmpty()); - - for (final Object operation : operationsList) { - assertNotNull("Operation is null.", operation); - if (operation instanceof Map operationMap) { - assertNotNull("seq_no is missing in operation.", operationMap.get("seq_no")); - assertNotNull("op_type is missing in operation.", operationMap.get("op_type")); - assertNotNull("primary_term is missing in operation.", operationMap.get("primary_term")); - } - } - } - } - } - } -} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 5305e179058b2..87a4c2c7d4826 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ccr; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.Build; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequest; @@ -92,7 +91,6 @@ import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; -import org.elasticsearch.xpack.ccr.rest.RestShardChangesAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackFeatureUsage; import org.elasticsearch.xpack.core.XPackField; @@ -114,7 +112,6 @@ import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -143,34 +140,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing"; public static final TransportVersion TRANSPORT_VERSION_ACTION_WITH_SHARD_ID = TransportVersions.V_8_9_X; - private static final List BASE_REST_HANDLERS = Arrays.asList( - // stats API - new RestFollowStatsAction(), - new RestCcrStatsAction(), - new RestFollowInfoAction(), - // follow APIs - new RestPutFollowAction(), - new RestResumeFollowAction(), - new RestPauseFollowAction(), - new RestUnfollowAction(), - // auto-follow APIs - new RestDeleteAutoFollowPatternAction(), - new RestPutAutoFollowPatternAction(), - new RestGetAutoFollowPatternAction(), - new RestPauseAutoFollowPatternAction(), - new RestResumeAutoFollowPatternAction(), - // forget follower API - new RestForgetFollowerAction() - ); - private static final List REST_HANDLERS = Collections.unmodifiableList(BASE_REST_HANDLERS); - - private static final List SNAPSHOT_BUILD_REST_HANDLERS; - static { - List snapshotBuildHandlers = new ArrayList<>(BASE_REST_HANDLERS); - snapshotBuildHandlers.add(new RestShardChangesAction()); - SNAPSHOT_BUILD_REST_HANDLERS = Collections.unmodifiableList(snapshotBuildHandlers); - } private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; @@ -302,7 +272,25 @@ public List getRestHandlers( return emptyList(); } - return Build.current().isSnapshot() ? SNAPSHOT_BUILD_REST_HANDLERS : REST_HANDLERS; + return Arrays.asList( + // stats API + new RestFollowStatsAction(), + new RestCcrStatsAction(), + new RestFollowInfoAction(), + // follow APIs + new RestPutFollowAction(), + new RestResumeFollowAction(), + new RestPauseFollowAction(), + new RestUnfollowAction(), + // auto-follow APIs + new RestDeleteAutoFollowPatternAction(), + new RestPutAutoFollowPatternAction(), + new RestGetAutoFollowPatternAction(), + new RestPauseAutoFollowPatternAction(), + new RestResumeAutoFollowPatternAction(), + // forget follower API + new RestForgetFollowerAction() + ); } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java deleted file mode 100644 index 0bbcfab97eb34..0000000000000 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.ccr.rest; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; -import org.elasticsearch.rest.action.RestActionListener; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; -import org.elasticsearch.xpack.ccr.Ccr; -import org.elasticsearch.xpack.ccr.action.ShardChangesAction; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; - -import static org.elasticsearch.rest.RestRequest.Method.GET; - -/** - * A REST handler that retrieves shard changes in a specific index, data stream or alias whose name is - * provided as a parameter. It handles GET requests to the "/{index}/ccr/shard_changes" endpoint retrieving - * shard-level changes, such as Translog operations, mapping version, settings version, aliases version, - * the global checkpoint, maximum sequence number and maximum sequence number of updates or deletes. - *

- * In the case of a data stream, the first backing index is considered the target for retrieving shard changes. - * In the case of an alias, the first index that the alias points to is considered the target for retrieving - * shard changes. - *

- * Note: This handler is only available for snapshot builds. - */ -public class RestShardChangesAction extends BaseRestHandler { - - private static final long DEFAULT_FROM_SEQ_NO = 0L; - private static final ByteSizeValue DEFAULT_MAX_BATCH_SIZE = ByteSizeValue.of(32, ByteSizeUnit.MB); - private static final TimeValue DEFAULT_POLL_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - private static final int DEFAULT_MAX_OPERATIONS_COUNT = 1024; - private static final int DEFAULT_TIMEOUT_SECONDS = 60; - private static final TimeValue GET_INDEX_UUID_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - private static final TimeValue SHARD_STATS_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - private static final String INDEX_PARAM_NAME = "index"; - private static final String FROM_SEQ_NO_PARAM_NAME = "from_seq_no"; - private static final String MAX_BATCH_SIZE_PARAM_NAME = "max_batch_size"; - private static final String POLL_TIMEOUT_PARAM_NAME = "poll_timeout"; - private static final String MAX_OPERATIONS_COUNT_PARAM_NAME = "max_operations_count"; - - @Override - public String getName() { - return "ccr_shard_changes_action"; - } - - @Override - public List routes() { - return List.of(new Route(GET, "/{index}/ccr/shard_changes")); - } - - /** - * Prepares the request for retrieving shard changes. - * - * @param restRequest The REST request. - * @param client The NodeClient for executing the request. - * @return A RestChannelConsumer for handling the request. - * @throws IOException If an error occurs while preparing the request. - */ - @Override - protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final var indexAbstractionName = restRequest.param(INDEX_PARAM_NAME); - final var fromSeqNo = restRequest.paramAsLong(FROM_SEQ_NO_PARAM_NAME, DEFAULT_FROM_SEQ_NO); - final var maxBatchSize = restRequest.paramAsSize(MAX_BATCH_SIZE_PARAM_NAME, DEFAULT_MAX_BATCH_SIZE); - final var pollTimeout = restRequest.paramAsTime(POLL_TIMEOUT_PARAM_NAME, DEFAULT_POLL_TIMEOUT); - final var maxOperationsCount = restRequest.paramAsInt(MAX_OPERATIONS_COUNT_PARAM_NAME, DEFAULT_MAX_OPERATIONS_COUNT); - - // NOTE: we first retrieve the concrete index name in case we are dealing with an alias or data stream. - // Then we use the concrete index name to retrieve the index UUID and shard stats. - final CompletableFuture indexNameCompletableFuture = asyncGetIndexName( - client, - indexAbstractionName, - client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME) - ); - final CompletableFuture indexUUIDCompletableFuture = indexNameCompletableFuture.thenCompose( - concreteIndexName -> asyncGetIndexUUID( - client, - concreteIndexName, - client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME), - RestUtils.getMasterNodeTimeout(restRequest) - ) - ); - final CompletableFuture shardStatsCompletableFuture = indexNameCompletableFuture.thenCompose( - concreteIndexName -> asyncShardStats(client, concreteIndexName, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME)) - ); - - return channel -> CompletableFuture.allOf(indexUUIDCompletableFuture, shardStatsCompletableFuture).thenRun(() -> { - try { - final String concreteIndexName = indexNameCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - final String indexUUID = indexUUIDCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - final ShardStats shardStats = shardStatsCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - final ShardId shardId = shardStats.getShardRouting().shardId(); - final String expectedHistoryUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); - - final ShardChangesAction.Request shardChangesRequest = shardChangesRequest( - concreteIndexName, - indexUUID, - shardId, - expectedHistoryUUID, - fromSeqNo, - maxBatchSize, - pollTimeout, - maxOperationsCount - ); - client.execute(ShardChangesAction.INSTANCE, shardChangesRequest, new RestActionListener<>(channel) { - @Override - protected void processResponse(final ShardChangesAction.Response response) { - channel.sendResponse( - new RestResponse( - RestStatus.OK, - shardChangesResponseToXContent(response, indexAbstractionName, concreteIndexName, shardId) - ) - ); - } - }); - - } catch (InterruptedException | ExecutionException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Error while retrieving shard changes", e); - } catch (TimeoutException te) { - throw new IllegalStateException("Timeout while waiting for shard stats or index UUID", te); - } - }).exceptionally(ex -> { - channel.sendResponse( - new RestResponse( - RestStatus.BAD_REQUEST, - "Failed to process shard changes for index [" + indexAbstractionName + "] " + ex.getMessage() - ) - ); - return null; - }); - } - - /** - * Creates a ShardChangesAction.Request object with the provided parameters. - * - * @param indexName The name of the index for which to retrieve shard changes. - * @param indexUUID The UUID of the index. - * @param shardId The ShardId for which to retrieve shard changes. - * @param expectedHistoryUUID The expected history UUID of the shard. - * @param fromSeqNo The sequence number from which to start retrieving shard changes. - * @param maxBatchSize The maximum size of a batch of operations to retrieve. - * @param pollTimeout The maximum time to wait for shard changes. - * @param maxOperationsCount The maximum number of operations to retrieve in a single request. - * @return A ShardChangesAction.Request object with the provided parameters. - */ - private static ShardChangesAction.Request shardChangesRequest( - final String indexName, - final String indexUUID, - final ShardId shardId, - final String expectedHistoryUUID, - long fromSeqNo, - final ByteSizeValue maxBatchSize, - final TimeValue pollTimeout, - int maxOperationsCount - ) { - final ShardChangesAction.Request shardChangesRequest = new ShardChangesAction.Request( - new ShardId(new Index(indexName, indexUUID), shardId.id()), - expectedHistoryUUID - ); - shardChangesRequest.setFromSeqNo(fromSeqNo); - shardChangesRequest.setMaxBatchSize(maxBatchSize); - shardChangesRequest.setPollTimeout(pollTimeout); - shardChangesRequest.setMaxOperationCount(maxOperationsCount); - return shardChangesRequest; - } - - /** - * Converts the response to XContent JSOn format. - * - * @param response The ShardChangesAction response. - * @param indexAbstractionName The name of the index abstraction. - * @param concreteIndexName The name of the index. - * @param shardId The ShardId. - */ - private static XContentBuilder shardChangesResponseToXContent( - final ShardChangesAction.Response response, - final String indexAbstractionName, - final String concreteIndexName, - final ShardId shardId - ) { - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - builder.startObject(); - builder.field("index_abstraction", indexAbstractionName); - builder.field("index", concreteIndexName); - builder.field("shard_id", shardId); - builder.field("mapping_version", response.getMappingVersion()); - builder.field("settings_version", response.getSettingsVersion()); - builder.field("aliases_version", response.getAliasesVersion()); - builder.field("global_checkpoint", response.getGlobalCheckpoint()); - builder.field("max_seq_no", response.getMaxSeqNo()); - builder.field("max_seq_no_of_updates_or_deletes", response.getMaxSeqNoOfUpdatesOrDeletes()); - builder.field("took_in_millis", response.getTookInMillis()); - if (response.getOperations() != null && response.getOperations().length > 0) { - operationsToXContent(response, builder); - } - builder.endObject(); - - return builder; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Converts the operations from a ShardChangesAction response to XContent JSON format. - * - * @param response The ShardChangesAction response containing the operations to be converted. - * @param builder The XContentBuilder to which the converted operations will be added. - * @throws IOException If an error occurs while writing to the XContentBuilder. - */ - private static void operationsToXContent(final ShardChangesAction.Response response, final XContentBuilder builder) throws IOException { - builder.field("number_of_operations", response.getOperations().length); - builder.field("operations"); - builder.startArray(); - for (final Translog.Operation operation : response.getOperations()) { - builder.startObject(); - builder.field("op_type", operation.opType()); - builder.field("seq_no", operation.seqNo()); - builder.field("primary_term", operation.primaryTerm()); - builder.endObject(); - } - builder.endArray(); - } - - /** - * Execute an asynchronous task using a task supplier and an executor service. - * - * @param The type of data to be retrieved. - * @param task The supplier task that provides the data. - * @param executorService The executorService service for executing the asynchronous task. - * @param errorMessage The error message to be thrown if the task execution fails. - * @return A CompletableFuture that completes with the retrieved data. - */ - private static CompletableFuture supplyAsyncTask( - final Supplier task, - final ExecutorService executorService, - final String errorMessage - ) { - return CompletableFuture.supplyAsync(() -> { - try { - return task.get(); - } catch (Exception e) { - throw new ElasticsearchException(errorMessage, e); - } - }, executorService); - } - - /** - * Asynchronously retrieves the index name for a given index, alias or data stream. - * If the name represents a data stream, the name of the first backing index is returned. - * If the name represents an alias, the name of the first index that the alias points to is returned. - * - * @param client The NodeClient for executing the asynchronous request. - * @param indexAbstractionName The name of the index, alias or data stream. - * @return A CompletableFuture that completes with the retrieved index name. - */ - private static CompletableFuture asyncGetIndexName( - final NodeClient client, - final String indexAbstractionName, - final ExecutorService executorService - ) { - return supplyAsyncTask(() -> { - final ClusterState clusterState = client.admin() - .cluster() - .prepareState(new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) - .get(GET_INDEX_UUID_TIMEOUT) - .getState(); - final IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(indexAbstractionName); - if (indexAbstraction == null) { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "Invalid index or data stream name [%s]", indexAbstractionName) - ); - } - if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM - || indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) { - return indexAbstraction.getIndices().getFirst().getName(); - } - return indexAbstractionName; - }, executorService, "Error while retrieving index name for index or data stream [" + indexAbstractionName + "]"); - } - - /** - * Asynchronously retrieves the shard stats for a given index using an executor service. - * - * @param client The NodeClient for executing the asynchronous request. - * @param concreteIndexName The name of the index for which to retrieve shard statistics. - * @param executorService The executorService service for executing the asynchronous task. - * @return A CompletableFuture that completes with the retrieved ShardStats. - * @throws ElasticsearchException If an error occurs while retrieving shard statistics. - */ - private static CompletableFuture asyncShardStats( - final NodeClient client, - final String concreteIndexName, - final ExecutorService executorService - ) { - return supplyAsyncTask( - () -> Arrays.stream(client.admin().indices().prepareStats(concreteIndexName).clear().get(SHARD_STATS_TIMEOUT).getShards()) - .max(Comparator.comparingLong(shardStats -> shardStats.getCommitStats().getGeneration())) - .orElseThrow(() -> new ElasticsearchException("Unable to retrieve shard stats for index: " + concreteIndexName)), - executorService, - "Error while retrieving shard stats for index [" + concreteIndexName + "]" - ); - } - - /** - * Asynchronously retrieves the index UUID for a given index using an executor service. - * - * @param client The NodeClient for executing the asynchronous request. - * @param concreteIndexName The name of the index for which to retrieve the index UUID. - * @param executorService The executorService service for executing the asynchronous task. - * @param masterTimeout The timeout for waiting until the cluster is unblocked. - * @return A CompletableFuture that completes with the retrieved index UUID. - * @throws ElasticsearchException If an error occurs while retrieving the index UUID. - */ - private static CompletableFuture asyncGetIndexUUID( - final NodeClient client, - final String concreteIndexName, - final ExecutorService executorService, - TimeValue masterTimeout - ) { - return supplyAsyncTask( - () -> client.admin() - .indices() - .prepareGetIndex(masterTimeout) - .setIndices(concreteIndexName) - .get(GET_INDEX_UUID_TIMEOUT) - .getSetting(concreteIndexName, IndexMetadata.SETTING_INDEX_UUID), - executorService, - "Error while retrieving index UUID for index [" + concreteIndexName + "]" - ); - } -} diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/seqno/RetentionLeaseRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/seqno/RetentionLeaseRestIT.java deleted file mode 100644 index fa2d92a8fdb89..0000000000000 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/seqno/RetentionLeaseRestIT.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logsdb.seqno; - -import org.apache.http.util.EntityUtils; -import org.elasticsearch.Build; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.cluster.ElasticsearchCluster; -import org.elasticsearch.test.cluster.local.distribution.DistributionType; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xcontent.json.JsonXContent; -import org.junit.Before; -import org.junit.ClassRule; - -import java.io.IOException; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -public class RetentionLeaseRestIT extends ESRestTestCase { - private static final String ADD_RETENTION_LEASE_ENDPOINT = "/%s/seq_no/add_retention_lease"; - private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk"; - private static final String[] DOCUMENT_NAMES = { "alpha", "beta", "gamma", "delta" }; - - @Before - public void assumeSnapshotBuild() { - assumeTrue("/{index}/seq_no/add_retention_lease endpoint only available in snapshot builds", Build.current().isSnapshot()); - } - - @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local() - .distribution(DistributionType.DEFAULT) - .setting("xpack.security.enabled", "false") - .setting("xpack.license.self_generated.type", "trial") - .build(); - - @Override - protected String getTestRestCluster() { - return cluster.getHttpAddresses(); - } - - public void testAddRetentionLeaseSuccessfully() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - assertTrue(indexExists(indexName)); - - assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); - - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName)); - final String retentionLeaseId = randomAlphaOfLength(6); - final String retentionLeaseSource = randomAlphaOfLength(8); - retentionLeaseRequest.addParameter("id", retentionLeaseId); - retentionLeaseRequest.addParameter("source", retentionLeaseSource); - - final Response response = client().performRequest(retentionLeaseRequest); - assertOK(response); - - assertRetentionLeaseResponseContent(response, indexName, indexName, retentionLeaseId, retentionLeaseSource); - assertRetentionLeaseExists(indexName, retentionLeaseId, retentionLeaseSource); - } - - public void testAddRetentionLeaseWithoutIdAndSource() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - assertTrue(indexExists(indexName)); - - assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); - - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName)); - - final Response response = client().performRequest(retentionLeaseRequest); - assertOK(response); - - assertRetentionLeaseResponseContent(response, indexName, indexName, null, null); - } - - public void testAddRetentionLeaseToDataStream() throws IOException { - final String templateName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); - assertOK(createIndexTemplate(templateName, """ - { - "index_patterns": [ "test-*-*" ], - "data_stream": {}, - "priority": 100, - "template": { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "properties": { - "@timestamp": { - "type": "date" - }, - "name": { - "type": "keyword" - } - } - } - } - } - """)); - - final String dataStreamName = "test-" - + randomAlphanumericOfLength(5).toLowerCase(Locale.ROOT) - + "-" - + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); - assertOK(createDataStream(dataStreamName)); - assertOK(bulkIndex(dataStreamName, randomIntBetween(10, 20))); - - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, dataStreamName)); - final String retentionLeaseId = randomAlphaOfLength(6); - final String retentionLeaseSource = randomAlphaOfLength(8); - retentionLeaseRequest.addParameter("id", retentionLeaseId); - retentionLeaseRequest.addParameter("source", retentionLeaseSource); - - final Response response = client().performRequest(retentionLeaseRequest); - assertOK(response); - - final String dataStreamBackingIndex = getFirstBackingIndex(dataStreamName); - assertRetentionLeaseResponseContent(response, dataStreamName, dataStreamBackingIndex, retentionLeaseId, retentionLeaseSource); - assertRetentionLeaseExists(dataStreamBackingIndex, retentionLeaseId, retentionLeaseSource); - } - - public void testAddRetentionLeaseUsingAlias() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - assertTrue(indexExists(indexName)); - - final String aliasName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); - final Request putAliasRequest = new Request("PUT", "/" + indexName + "/_alias/" + aliasName); - assertOK(client().performRequest(putAliasRequest)); - - assertOK(bulkIndex(aliasName, randomIntBetween(10, 20))); - - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, aliasName)); - final String retentionLeaseId = randomAlphaOfLength(6); - final String retentionLeaseSource = randomAlphaOfLength(8); - retentionLeaseRequest.addParameter("id", retentionLeaseId); - retentionLeaseRequest.addParameter("source", retentionLeaseSource); - - final Response response = client().performRequest(retentionLeaseRequest); - assertOK(response); - - assertRetentionLeaseResponseContent(response, aliasName, indexName, retentionLeaseId, retentionLeaseSource); - assertRetentionLeaseExists(indexName, retentionLeaseId, retentionLeaseSource); - } - - public void testAddRetentionLeaseMissingIndex() throws IOException { - final String missingIndexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - assertFalse(indexExists(missingIndexName)); - - final Request retentionLeaseRequest = new Request( - "PUT", - String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, missingIndexName) - ); - final ResponseException exception = assertThrows(ResponseException.class, () -> client().performRequest(retentionLeaseRequest)); - assertResponseException(exception, RestStatus.BAD_REQUEST, "Error adding retention lease for [" + missingIndexName + "]"); - } - - public void testAddRetentionLeaseInvalidParameters() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - assertTrue(indexExists(indexName)); - assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); - - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName)); - retentionLeaseRequest.addParameter("id", null); - retentionLeaseRequest.addParameter("source", randomBoolean() ? UUIDs.randomBase64UUID() : "test-source"); - - final ResponseException exception = assertThrows(ResponseException.class, () -> client().performRequest(retentionLeaseRequest)); - assertResponseException(exception, RestStatus.BAD_REQUEST, "retention lease ID can not be empty"); - } - - public void testAddMultipleRetentionLeasesForSameShard() throws IOException { - final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); - createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - assertTrue(indexExists(indexName)); - assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); - - int numberOfLeases = randomIntBetween(2, 5); - for (int i = 0; i < numberOfLeases; i++) { - final Request retentionLeaseRequest = new Request("PUT", String.format(Locale.ROOT, ADD_RETENTION_LEASE_ENDPOINT, indexName)); - retentionLeaseRequest.addParameter("id", "lease-" + i); - retentionLeaseRequest.addParameter("source", "test-source-" + i); - - final Response response = client().performRequest(retentionLeaseRequest); - assertOK(response); - - assertRetentionLeaseResponseContent(response, indexName, indexName, "lease-" + i, "test-source-" + i); - } - - for (int i = 0; i < numberOfLeases; i++) { - assertRetentionLeaseExists(indexName, "lease-" + i, "test-source-" + i); - } - } - - private static Response bulkIndex(final String indexName, int numberOfDocuments) throws IOException { - final StringBuilder sb = new StringBuilder(); - long timestamp = System.currentTimeMillis(); - - for (int i = 0; i < numberOfDocuments; i++) { - sb.append( - String.format( - Locale.ROOT, - "{ \"index\": {} }\n{ \"@timestamp\": \"%s\", \"name\": \"%s\" }\n", - Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), - randomFrom(DOCUMENT_NAMES) - ) - ); - timestamp += 1000; - } - - final Request request = new Request("POST", String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName)); - request.setJsonEntity(sb.toString()); - request.addParameter("refresh", "true"); - return client().performRequest(request); - } - - private void assertResponseException(final ResponseException exception, final RestStatus expectedStatus, final String expectedMessage) { - assertEquals(expectedStatus.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); - assertTrue(exception.getMessage().contains(expectedMessage)); - } - - private Map getRetentionLeases(final String indexName) throws IOException { - final Request statsRequest = new Request("GET", "/" + indexName + "/_stats"); - statsRequest.addParameter("level", "shards"); - - final Response response = client().performRequest(statsRequest); - assertOK(response); - - final Map responseMap = XContentHelper.convertToMap( - JsonXContent.jsonXContent, - EntityUtils.toString(response.getEntity()), - false - ); - - @SuppressWarnings("unchecked") - final Map indices = (Map) responseMap.get("indices"); - if (indices == null || indices.containsKey(indexName) == false) { - throw new IllegalArgumentException("No shard stats found for: " + indexName); - } - - @SuppressWarnings("unchecked") - final Map shards = (Map) ((Map) indices.get(indexName)).get("shards"); - - @SuppressWarnings("unchecked") - final List> shardList = (List>) shards.get("0"); - - return getRetentionLeases(indexName, shardList); - } - - private static Map getRetentionLeases(final String indexName, final List> shardList) { - final Map shardStats = shardList.getFirst(); - - @SuppressWarnings("unchecked") - final Map retentionLeases = (Map) shardStats.get("retention_leases"); - if (retentionLeases == null) { - throw new IllegalArgumentException("No retention leases found for shard 0 of index: " + indexName); - } - return retentionLeases; - } - - private void assertRetentionLeaseExists( - final String indexAbstractionName, - final String expectedRetentionLeaseId, - final String expectedRetentionLeaseSource - ) throws IOException { - final Map retentionLeases = getRetentionLeases(indexAbstractionName); - - @SuppressWarnings("unchecked") - final List> leases = (List>) retentionLeases.get("leases"); - - boolean retentionLeaseExists = leases.stream().anyMatch(lease -> { - final String id = (String) lease.get("id"); - final String source = (String) lease.get("source"); - return expectedRetentionLeaseId.equals(id) && expectedRetentionLeaseSource.equals(source); - }); - - assertTrue( - "Retention lease with ID [" + expectedRetentionLeaseId + "] and source [" + expectedRetentionLeaseSource + "] does not exist.", - retentionLeaseExists - ); - } - - private Response createDataStream(final String dataStreamName) throws IOException { - return client().performRequest(new Request("PUT", "/_data_stream/" + dataStreamName)); - } - - private String getFirstBackingIndex(final String dataStreamName) throws IOException { - final Response response = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName)); - - final Map responseMap = XContentHelper.convertToMap( - JsonXContent.jsonXContent, - EntityUtils.toString(response.getEntity()), - false - ); - - @SuppressWarnings("unchecked") - final List> dataStreams = (List>) responseMap.get("data_streams"); - - if (dataStreams == null || dataStreams.isEmpty()) { - throw new IllegalArgumentException("No data stream found for name: " + dataStreamName); - } - - @SuppressWarnings("unchecked") - final List> backingIndices = (List>) dataStreams.get(0).get("indices"); - - if (backingIndices == null || backingIndices.isEmpty()) { - throw new IllegalArgumentException("No backing indices found for data stream: " + dataStreamName); - } - - return (String) backingIndices.getFirst().get("index_name"); - } - - private static Response createIndexTemplate(final String templateName, final String mappings) throws IOException { - final Request request = new Request("PUT", "/_index_template/" + templateName); - request.setJsonEntity(mappings); - return client().performRequest(request); - } - - private void assertRetentionLeaseResponseContent( - final Response response, - final String expectedIndexAbstraction, - final String expectedConcreteIndex, - final String expectedLeaseId, - final String expectedLeaseSource - ) throws IOException { - final Map responseBody = XContentHelper.convertToMap( - JsonXContent.jsonXContent, - EntityUtils.toString(response.getEntity()), - false - ); - - assertEquals("Unexpected index abstraction in response", expectedIndexAbstraction, responseBody.get("index_abstraction")); - assertEquals("Unexpected concrete index in response", expectedConcreteIndex, responseBody.get("index")); - assertNotNull("Shard ID missing in response", responseBody.get("shard_id")); - - if (expectedLeaseId != null) { - assertEquals("Unexpected lease ID in response", expectedLeaseId, responseBody.get("id")); - } - if (expectedLeaseSource != null) { - assertEquals("Unexpected lease source in response", expectedLeaseSource, responseBody.get("source")); - } - } -} diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 20f114389fe4e..1fb46f74a09b2 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -7,38 +7,24 @@ package org.elasticsearch.xpack.logsdb; -import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestHandler; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; -import org.elasticsearch.xpack.logsdb.seqno.RestAddRetentionLeaseAction; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.function.Predicate; -import java.util.function.Supplier; import static org.elasticsearch.xpack.logsdb.LogsdbLicenseService.FALLBACK_SETTING; @@ -113,24 +99,6 @@ public List> getSettings() { return actions; } - @Override - public Collection getRestHandlers( - Settings settings, - NamedWriteableRegistry namedWriteableRegistry, - RestController restController, - ClusterSettings clusterSettings, - IndexScopedSettings indexScopedSettings, - SettingsFilter settingsFilter, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster, - Predicate clusterSupportsFeature - ) { - if (Build.current().isSnapshot()) { - return List.of(new RestAddRetentionLeaseAction()); - } - return Collections.emptyList(); - } - protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/seqno/RestAddRetentionLeaseAction.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/seqno/RestAddRetentionLeaseAction.java deleted file mode 100644 index 1370b18a431bc..0000000000000 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/seqno/RestAddRetentionLeaseAction.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.logsdb.seqno; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.client.internal.node.NodeClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.seqno.RetentionLeaseActions; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.RestActionListener; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import static org.elasticsearch.rest.RestRequest.Method.PUT; - -/** - * This class implements a REST API for adding a retention lease to a shard in Elasticsearch. - * Retention leases ensure that specific sequence numbers are retained, even as the global checkpoint - * advances during indexing. This guarantees seq_no values availability until the retention lease is - * removed. - * - * The API supports adding retention leases to indices, data streams, and index aliases. For data streams - * or aliases, the first backing index or underlying index is identified, and the retention lease is added - * to its shard. - * - * **Note:** This REST API is available only in Elasticsearch snapshot builds and is intended solely - * for benchmarking purposes, such as benchmarking operations like the shard changes API in Rally tracks. - * It is not intended for use in production environments. - * - * The response provides details about the added retention lease, including the target index, - * shard ID, retention lease ID, and source. - */ -public class RestAddRetentionLeaseAction extends BaseRestHandler { - - private static final int DEFAULT_TIMEOUT_SECONDS = 60; - private static final TimeValue SHARD_STATS_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - private static final TimeValue GET_INDEX_TIMEOUT = new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - private static final String INDEX_PARAM = "index"; - private static final String ID_PARAM = "id"; - private static final String SOURCE_PARAM = "source"; - - @Override - public String getName() { - return "add_retention_lease_action"; - } - - @Override - public List routes() { - return List.of(new Route(PUT, "/{index}/seq_no/add_retention_lease")); - } - - /** - * Prepare a request to add a retention lease. When the target is an alias or data stream we just - * get the first shard of the first index using the shard stats api. - * - * @param request the request to execute - * @param client The NodeClient for executing the request. - * @return A RestChannelConsumer for handling the request. - * @throws IOException If an error occurs while preparing the request. - */ - @Override - protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - final String indexAbstractionName = request.param(INDEX_PARAM); - final String retentionLeaseId = request.param(ID_PARAM, UUIDs.randomBase64UUID()); - final String retentionLeaseSource = request.param(SOURCE_PARAM, UUIDs.randomBase64UUID()); - - return channel -> asyncGetIndexName(client, indexAbstractionName, client.threadPool().executor(ThreadPool.Names.GENERIC)) - .thenCompose( - concreteIndexName -> asyncShardStats(client, concreteIndexName, client.threadPool().executor(ThreadPool.Names.GENERIC)) - .thenCompose( - shardStats -> addRetentionLease( - channel, - client, - indexAbstractionName, - concreteIndexName, - shardStats, - retentionLeaseId, - retentionLeaseSource - ) - ) - ) - .exceptionally(ex -> { - final String message = ex.getCause() != null ? ex.getCause().getMessage() : ex.getMessage(); - channel.sendResponse( - new RestResponse(RestStatus.BAD_REQUEST, "Error adding retention lease for [" + indexAbstractionName + "]: " + message) - ); - return null; - }); - } - - private static XContentBuilder addRetentionLeaseResponseToXContent( - final String indexAbstractionName, - final String concreteIndexName, - final ShardId shardId, - final String retentionLeaseId, - final String retentionLeaseSource - ) { - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - builder.startObject(); - builder.field("index_abstraction", indexAbstractionName); - builder.field("index", concreteIndexName); - builder.field("shard_id", shardId); - builder.field("id", retentionLeaseId); - builder.field("source", retentionLeaseSource); - builder.endObject(); - - return builder; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Adds a retention lease to a specific shard in an index, data stream, or alias. - * This operation is asynchronous and sends the response back to the client through the provided {@link RestChannel}. - * - * @param channel The {@link RestChannel} used to send the response back to the client. - * @param client The {@link NodeClient} used to execute the retention lease addition request. - * @param indexAbstractionName The name of the index, data stream, or alias for which the retention lease is being added. - * @param shardStats The {@link ShardStats} of the target shard where the retention lease will be added. - * @param retentionLeaseId A unique identifier for the retention lease being added. This identifies the lease in future operations. - * @param retentionLeaseSource A description or source of the retention lease request, often used for auditing or tracing purposes. - * @return A {@link CompletableFuture} that completes when the operation finishes. If the operation succeeds, the future completes - * successfully with {@code null}. If an error occurs, the future completes exceptionally with the corresponding exception. - * @throws ElasticsearchException If the request fails or encounters an unexpected error. - */ - private CompletableFuture addRetentionLease( - final RestChannel channel, - final NodeClient client, - final String indexAbstractionName, - final String concreteIndexName, - final ShardStats shardStats, - final String retentionLeaseId, - final String retentionLeaseSource - ) { - final CompletableFuture completableFuture = new CompletableFuture<>(); - try { - final ShardId shardId = shardStats.getShardRouting().shardId(); - final RetentionLeaseActions.AddRequest addRetentionLeaseRequest = new RetentionLeaseActions.AddRequest( - shardId, - retentionLeaseId, - RetentionLeaseActions.RETAIN_ALL, - retentionLeaseSource - ); - - client.execute(RetentionLeaseActions.ADD, addRetentionLeaseRequest, new RestActionListener<>(channel) { - - @Override - protected void processResponse(final ActionResponse.Empty empty) { - completableFuture.complete(null); - channel.sendResponse( - new RestResponse( - RestStatus.OK, - addRetentionLeaseResponseToXContent( - indexAbstractionName, - concreteIndexName, - shardId, - retentionLeaseId, - retentionLeaseSource - ) - ) - ); - } - }); - } catch (Exception e) { - completableFuture.completeExceptionally( - new ElasticsearchException("Failed to add retention lease for [" + indexAbstractionName + "]", e) - ); - } - return completableFuture; - } - - /** - * Execute an asynchronous task using a task supplier and an executor service. - * - * @param The type of data to be retrieved. - * @param task The supplier task that provides the data. - * @param executorService The {@link ExecutorService} for executing the asynchronous task. - * @param errorMessage The error message to be thrown if the task execution fails. - * @return A {@link CompletableFuture} that completes with the retrieved data. - */ - private static CompletableFuture supplyAsyncTask( - final Supplier task, - final ExecutorService executorService, - final String errorMessage - ) { - return CompletableFuture.supplyAsync(() -> { - try { - return task.get(); - } catch (Exception e) { - throw new ElasticsearchException(errorMessage, e); - } - }, executorService); - } - - /** - * Asynchronously retrieves the index name for a given index, alias or data stream. - * If the name represents a data stream, the name of the first backing index is returned. - * If the name represents an alias, the name of the first index that the alias points to is returned. - * - * @param client The {@link NodeClient} for executing the asynchronous request. - * @param indexAbstractionName The name of the index, alias or data stream. - * @return A {@link CompletableFuture} that completes with the retrieved index name. - */ - private static CompletableFuture asyncGetIndexName( - final NodeClient client, - final String indexAbstractionName, - final ExecutorService executorService - ) { - return supplyAsyncTask(() -> { - final ClusterState clusterState = client.admin() - .cluster() - .prepareState(new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) - .get(GET_INDEX_TIMEOUT) - .getState(); - final IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(indexAbstractionName); - if (indexAbstraction == null) { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "Invalid index or data stream name [%s]", indexAbstractionName) - ); - } - if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM - || indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) { - return indexAbstraction.getIndices().getFirst().getName(); - } - return indexAbstractionName; - }, executorService, "Error while retrieving index name for or data stream [" + indexAbstractionName + "]"); - } - - /** - * Asynchronously retrieves the shard stats for a given index using an executor service. - * - * @param client The {@link NodeClient} for executing the asynchronous request. - * @param concreteIndexName The name of the index for which to retrieve shard statistics. - * @param executorService The {@link ExecutorService} for executing the asynchronous task. - * @return A {@link CompletableFuture} that completes with the retrieved ShardStats. - * @throws ElasticsearchException If an error occurs while retrieving shard statistics. - */ - private static CompletableFuture asyncShardStats( - final NodeClient client, - final String concreteIndexName, - final ExecutorService executorService - ) { - return supplyAsyncTask( - () -> Arrays.stream(client.admin().indices().prepareStats(concreteIndexName).clear().get(SHARD_STATS_TIMEOUT).getShards()) - .max(Comparator.comparingLong(shardStats -> shardStats.getCommitStats().getGeneration())) - .orElseThrow(() -> new ElasticsearchException("Unable to retrieve shard stats for: " + concreteIndexName)), - executorService, - "Error while retrieving shard stats for [" + concreteIndexName + "]" - ); - } -}