diff --git a/docs/changelog/134963.yaml b/docs/changelog/134963.yaml new file mode 100644 index 0000000000000..9dc36675fab77 --- /dev/null +++ b/docs/changelog/134963.yaml @@ -0,0 +1,6 @@ +pr: 134963 +summary: Fix a bug in the GET _transform API that incorrectly claims some Transform configurations are missing +area: Transform +type: bug +issues: + - 134263 diff --git a/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv b/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv new file mode 100644 index 0000000000000..4e2559c7e9b0a --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv @@ -0,0 +1 @@ +9170000,9112009,9000018,8841070,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index ffc592e1809ee..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_8,8840010 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 3cc6f439c5ea5..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_5,8841069 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index 8ad2ed1a4cacf..c11e6837bb813 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_8,9000017 +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv new file mode 100644 index 0000000000000..80b97d85f7511 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -0,0 +1 @@ +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv new file mode 100644 index 0000000000000..2c15e0254cbe8 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -0,0 +1 @@ +transform_check_for_dangling_tasks,9170000 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index b0ba81d8eeb37..d7f6a46eada3a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -59,6 +59,7 @@ public final class TransformField { public static final ParseField MAX_AGE = new ParseField("max_age"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks"); /** * Fields for checkpointing */ diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java index d1b9578dec0bb..56faa2f80128f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; @@ -16,6 +17,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -39,6 +41,8 @@ public class GetTransformAction extends ActionType public static final GetTransformAction INSTANCE = new GetTransformAction(); public static final String NAME = "cluster:monitor/transform/get"; + static final TransportVersion DANGLING_TASKS = TransportVersion.fromName("transform_check_for_dangling_tasks"); + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(GetTransformAction.class); private GetTransformAction() { @@ -47,24 +51,49 @@ private GetTransformAction() { public static class Request extends AbstractGetResourcesRequest { + // for legacy purposes, this transport action previously had no timeout + private static final TimeValue LEGACY_TIMEOUT_VALUE = TimeValue.MAX_VALUE; private static final int MAX_SIZE_RETURN = 1000; + private final boolean checkForDanglingTasks; + private final TimeValue timeout; public Request(String id) { - super(id, PageParams.defaultParams(), true); + this(id, false, LEGACY_TIMEOUT_VALUE); } - public Request() { - super(null, PageParams.defaultParams(), true); + public Request(String id, boolean checkForDanglingTasks, TimeValue timeout) { + super(id, PageParams.defaultParams(), true); + this.checkForDanglingTasks = checkForDanglingTasks; + this.timeout = timeout; } public Request(StreamInput in) throws IOException { super(in); + this.checkForDanglingTasks = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readBoolean() : true; + this.timeout = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readTimeValue() : LEGACY_TIMEOUT_VALUE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getTransportVersion().onOrAfter(DANGLING_TASKS)) { + out.writeBoolean(checkForDanglingTasks); + out.writeTimeValue(timeout); + } } public String getId() { return getResourceId(); } + public boolean checkForDanglingTasks() { + return checkForDanglingTasks; + } + + public TimeValue timeout() { + return timeout; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException exception = null; @@ -86,6 +115,20 @@ public String getCancelableTaskDescription() { public String getResourceIdField() { return TransformField.ID.getPreferredName(); } + + @Override + public boolean equals(Object obj) { + return this == obj + || (obj instanceof Request other + && super.equals(obj) + && (checkForDanglingTasks == other.checkForDanglingTasks) + && Objects.equals(timeout, other.timeout)); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), checkForDanglingTasks, timeout); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java index 3d564f0cf2fa7..ce81cf940e4a6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java @@ -7,28 +7,37 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; import org.elasticsearch.xpack.core.transform.action.GetTransformAction.Request; -public class GetTransformActionRequestTests extends AbstractWireSerializingTestCase { +import static org.elasticsearch.xpack.core.transform.action.GetTransformAction.DANGLING_TASKS; + +public class GetTransformActionRequestTests extends AbstractBWCWireSerializationTestCase { @Override protected Request createTestInstance() { if (randomBoolean()) { return new Request(Metadata.ALL); } - return new Request(randomAlphaOfLengthBetween(1, 20)); + return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), randomPositiveTimeValue()); } @Override protected Request mutateInstance(Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + return randomValueOtherThan(instance, this::createTestInstance); } @Override protected Writeable.Reader instanceReader() { return Request::new; } + + @Override + protected Request mutateInstanceForVersion(Request instance, TransportVersion version) { + return version.onOrAfter(DANGLING_TASKS) ? instance : new Request(instance.getId(), true, TimeValue.MAX_VALUE); + } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java index 570feaf28cc58..4c7ab877cbc65 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java @@ -27,6 +27,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.TransformField.BASIC_STATS; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasEntry; @@ -460,45 +461,7 @@ public void testGetStatsWithContinuous() throws Exception { String transformDest = transformId + "_idx"; String transformSrc = "reviews_cont_pivot_test"; createReviewsIndex(transformSrc); - final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); - String config = Strings.format(""" - { - "dest": { - "index": "%s" - }, - "source": { - "index": "%s" - }, - "frequency": "1s", - "sync": { - "time": { - "field": "timestamp", - "delay": "1s" - } - }, - "pivot": { - "group_by": { - "reviewer": { - "terms": { - "field": "user_id" - } - } - }, - "aggregations": { - "avg_rating": { - "avg": { - "field": "stars" - } - } - } - } - }""", transformDest, transformSrc); - - createTransformRequest.setJsonEntity(config); - - Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); - assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - startAndWaitForContinuousTransform(transformId, transformDest, null); + createAndStartTransform(transformId, transformSrc, transformDest); Request getRequest = createRequestWithAuthAndTimeout( "GET", @@ -577,6 +540,99 @@ public void testGetStatsWithContinuous() throws Exception { }, 120, TimeUnit.SECONDS); } + private void createAndStartTransform(String transformId, String transformSrc, String transformDest) throws Exception { + var createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); + var config = Strings.format(""" + { + "dest": { + "index": "%s" + }, + "source": { + "index": "%s" + }, + "frequency": "1s", + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + }, + "pivot": { + "group_by": { + "reviewer": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", transformDest, transformSrc); + + createTransformRequest.setJsonEntity(config); + + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + startAndWaitForContinuousTransform(transformId, transformDest, null); + } + + /** + * For Github Issue #134263 + * https://github.com/elastic/elasticsearch/issues/134263 + */ + public void testGetTransformsDoesNotErrorOnPageSize() throws Exception { + var transformId1 = "multiple-transforms-1"; + var transformSrc = "reviews_multiple_transforms_test"; + createReviewsIndex(transformSrc); + createAndStartTransform(transformId1, transformSrc, transformId1 + "_idx"); + + var transformId2 = "multiple-transforms-2"; + createAndStartTransform(transformId2, transformSrc, transformId2 + "_idx"); + + // getting transform 1 on page 1 will not have any errors for transform 2 + assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1)); + // getting transform 2 on page 2 will not have any errors for transform 1 + assertThat(getTransformIdFromAll(1, 1), equalTo(transformId2)); + + // getting transform 1 by id will not have any errors for transform 2 + getTransformConfig(transformId1, null, null); + // getting transform 2 by id will not have any errors for transform 1 + getTransformConfig(transformId2, null, null); + + // getting all transform will not have any errors + assertThat(getAllTransformIds(), containsInAnyOrder(transformId1, transformId2)); + + // getting a stopped transform 1 will not have any errors for transform 2 + stopTransform(transformId1, false); + assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1)); + } + + @SuppressWarnings("unchecked") + private String getTransformIdFromAll(int from, int size) throws IOException { + var params = Strings.format("?from=%d&size=%d", from, size); + var request = new Request("GET", getTransformEndpoint() + "_all" + params); + var response = adminClient().performRequest(request); + var transforms = entityAsMap(response); + var transformConfigs = (List>) XContentMapValues.extractValue("transforms", transforms); + var errors = (List>) XContentMapValues.extractValue("errors", transforms); + assertThat(errors, is(nullValue())); + assertThat(transformConfigs, hasSize(1)); + return (String) transformConfigs.get(0).get("id"); + } + + @SuppressWarnings("unchecked") + private List getAllTransformIds() throws IOException { + var transforms = getTransforms(0, 1_000); + var configs = (List>) transforms.get("transforms"); + return configs.stream().map(transform -> transform.get("id")).map(Object::toString).toList(); + } + @SuppressWarnings("unchecked") public void testManyTransforms() throws IOException { String config = transformConfig(); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 20ec649f74811..14438d59626ce 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -590,6 +590,17 @@ protected Map getTransformConfig(String transformId, String auth return transformConfig; } + @SuppressWarnings("unchecked") + protected Map getTransformConfig(String transformId, String authHeader, List> expectedErrors) + throws IOException { + Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, authHeader); + Map transforms = entityAsMap(client().performRequest(getRequest)); + assertEquals(1, XContentMapValues.extractValue("count", transforms)); + List> errors = (List>) XContentMapValues.extractValue("errors", transforms); + assertThat(errors, is(equalTo(expectedErrors))); + return ((List>) transforms.get("transforms")).get(0); + } + protected static String getTransformState(String transformId) throws IOException { Map transformStatsAsMap = getTransformStateAndStats(transformId); return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java index cb2985b5b1b3a..871098bebd354 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java @@ -9,11 +9,15 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.Strings; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.injection.guice.Inject; @@ -26,9 +30,10 @@ import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.action.AbstractTransportGetResourcesAction; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.common.time.RemainingTime; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; @@ -39,13 +44,17 @@ import org.elasticsearch.xpack.transform.transforms.TransformNodes; import org.elasticsearch.xpack.transform.transforms.TransformTask; -import java.util.Collection; -import java.util.List; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; import static java.util.function.Predicate.not; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE; public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { @@ -54,6 +63,7 @@ public class TransportGetTransformAction extends AbstractTransportGetResourcesAc "Found task for transform [%s], but no configuration for it. To delete this transform use DELETE with force=true."; private final ClusterService clusterService; + private final Client client; @Inject public TransportGetTransformAction( @@ -65,6 +75,7 @@ public TransportGetTransformAction( ) { super(GetTransformAction.NAME, transportService, actionFilters, Request::new, client, xContentRegistry); this.clusterService = clusterService; + this.client = client; } @Override @@ -73,20 +84,29 @@ protected void doExecute(Task task, Request request, ActionListener li final ClusterState clusterState = clusterService.state(); TransformNodes.warnIfNoTransformNodes(clusterState); + RemainingTime remainingTime = RemainingTime.from(Instant::now, request.timeout()); + // Step 2: Search for all the transform tasks (matching the request) that *do not* have corresponding transform config. - ActionListener> searchTransformConfigsListener = ActionListener.wrap(r -> { - Set transformConfigIds = r.results().stream().map(TransformConfig::getId).collect(toSet()); - Collection> transformTasks = TransformTask.findTransformTasks( - request.getId(), - clusterState - ); - List errors = transformTasks.stream() - .map(PersistentTasksCustomMetadata.PersistentTask::getId) - .filter(not(transformConfigIds::contains)) - .map(transformId -> new Response.Error("dangling_task", Strings.format(DANGLING_TASK_ERROR_MESSAGE_FORMAT, transformId))) - .collect(toList()); - listener.onResponse(new Response(r.results(), r.count(), errors.isEmpty() ? null : errors)); - }, listener::onFailure); + ActionListener> searchTransformConfigsListener = listener.delegateFailureAndWrap((l, r) -> { + if (request.checkForDanglingTasks()) { + getAllTransformIds(request, r, remainingTime, l.delegateFailureAndWrap((ll, transformConfigIds) -> { + var errors = TransformTask.findTransformTasks(request.getId(), clusterState) + .stream() + .map(PersistentTasksCustomMetadata.PersistentTask::getId) + .filter(not(transformConfigIds::contains)) + .map( + transformId -> new Response.Error( + "dangling_task", + Strings.format(DANGLING_TASK_ERROR_MESSAGE_FORMAT, transformId) + ) + ) + .toList(); + ll.onResponse(new Response(r.results(), r.count(), errors.isEmpty() ? null : errors)); + })); + } else { + l.onResponse(new Response(r.results(), r.count(), null)); + } + }); // Step 1: Search for all the transform configs matching the request. searchResources(request, parentTaskId, searchTransformConfigsListener); @@ -116,7 +136,7 @@ protected ResourceNotFoundException notFoundException(String resourceId) { @Override protected String executionOrigin() { - return ClientHelper.TRANSFORM_ORIGIN; + return TRANSFORM_ORIGIN; } @Override @@ -131,7 +151,119 @@ protected QueryBuilder additionalQuery() { @Override protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) { - return searchSourceBuilder.sort("_index", SortOrder.DESC); + return searchSourceBuilder.sort("_index", SortOrder.DESC).sort(TransformField.ID.getPreferredName(), SortOrder.ASC); + } + + private void getAllTransformIds( + Request request, + QueryPage initialResults, + RemainingTime remainingTime, + ActionListener> listener + ) { + ActionListener> transformIdListener = listener.map(stream -> stream.collect(toSet())); + var requestedPage = initialResults.results().stream().map(TransformConfig::getId); + + if (initialResults.count() == initialResults.results().size()) { + transformIdListener.onResponse(requestedPage); + } else { + // if we do not have all of our transform ids already, we have to go get them + // we'll read everything after our current page, then we'll reverse and read everything before our current page + var from = request.getPageParams().getFrom(); + var size = request.getPageParams().getSize(); + var idTokens = ExpandedIdsMatcher.tokenizeExpression(request.getResourceId()); + + getAllTransformIds(idTokens, false, from, size, remainingTime, transformIdListener.delegateFailureAndWrap((l, nextPages) -> { + var currentPages = Stream.concat(requestedPage, nextPages); + getAllTransformIds(idTokens, true, from, size, remainingTime, l.map(firstPages -> Stream.concat(firstPages, currentPages))); + })); + } + } + + private void getAllTransformIds( + String[] idTokens, + boolean reverse, + int from, + int size, + RemainingTime remainingTime, + ActionListener> listener + ) { + if (reverse && from <= 0) { + listener.onResponse(Stream.empty()); + return; + } + + var thisPage = reverse ? from - size : from + size; + var thisPageFrom = Math.max(0, thisPage); + var thisPageSize = thisPage < 0 ? from : size; + + SearchRequest request = client.prepareSearch( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ) + .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) + .addSort("_index", SortOrder.DESC) + .setFrom(thisPageFrom) + .setSize(thisPageSize) + .setTimeout(remainingTime.get()) + .setFetchSource(false) + .setTrackTotalHits(true) + .addDocValueField(TransformField.ID.getPreferredName()) + .setQuery(query(idTokens)) + .request(); + + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + listener.delegateFailureAndWrap((l, searchResponse) -> { + var transformIds = Arrays.stream(searchResponse.getHits().getHits()) + .map(hit -> (String) hit.field(TransformField.ID.getPreferredName()).getValue()) + .filter(Predicate.not(org.elasticsearch.common.Strings::isNullOrEmpty)) + .toList() + .stream(); + + if (searchResponse.getHits().getHits().length == size) { + getAllTransformIds( + idTokens, + reverse, + thisPageFrom, + thisPageSize, + remainingTime, + l.map(nextTransformIds -> Stream.concat(transformIds, nextTransformIds)) + ); + } else { + l.onResponse(transformIds); + } + }), + client::search + ); + } + + private static QueryBuilder query(String[] idTokens) { + var queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME)); + + if (org.elasticsearch.common.Strings.isAllOrWildcard(idTokens) == false) { + var shouldQueries = new BoolQueryBuilder(); + var terms = new ArrayList(); + for (String token : idTokens) { + if (Regex.isSimpleMatchPattern(token)) { + shouldQueries.should(QueryBuilders.wildcardQuery(TransformField.ID.getPreferredName(), token)); + } else { + terms.add(token); + } + } + + if (terms.isEmpty() == false) { + shouldQueries.should(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), terms)); + } + + if (shouldQueries.should().isEmpty() == false) { + queryBuilder.filter(shouldQueries); + } + } + + return QueryBuilders.constantScoreQuery(queryBuilder); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java index 1eeafc54098ae..cb11e8e05f1dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.rest.action; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -39,10 +40,11 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { - GetTransformAction.Request request = new GetTransformAction.Request(); + var id = restRequest.param(TransformField.ID.getPreferredName()); + var checkForDanglingTasks = restRequest.paramAsBoolean(TransformField.CHECK_FOR_DANGLING_TASKS.getPreferredName(), true); + var timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); - String id = restRequest.param(TransformField.ID.getPreferredName()); - request.setResourceId(id); + var request = new GetTransformAction.Request(id, checkForDanglingTasks, timeout); request.setAllowNoResources(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true)); if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { request.setPageParams(