Skip to content

Commit 37bccfb

Browse files
committed
[Transform] Reduce task match load
Move `TransportGetTransformStatsAction` to the generic threadpool, and match `Task` based on java type first before comparing the description to prevent iterating over a large abount of transform ids for tasks that are not `TransformTask`. Resolve elastic#139252
1 parent 3b3cc47 commit 37bccfb

File tree

12 files changed

+81
-35
lines changed

12 files changed

+81
-35
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformStatsAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public Request(StreamInput in) throws IOException {
8282
@Override
8383
public boolean match(Task task) {
8484
// Only get tasks that we have expanded to
85-
return expandedIds.stream()
86-
.anyMatch(transformId -> task.getDescription().equals(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transformId));
85+
return PutTransformAction.TransformTaskMatcher.match(task, expandedIds);
8786
}
8887

8988
public String getId() {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.xpack.core.transform.utils.TransformStrings;
2626

2727
import java.io.IOException;
28+
import java.util.Collection;
2829
import java.util.Map;
2930
import java.util.Objects;
3031

@@ -165,4 +166,17 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
165166
}
166167
}
167168

169+
public interface TransformTaskMatcher {
170+
static boolean match(Task task, String expectedTransformId) {
171+
return task instanceof TransformTaskMatcher
172+
&& task.getDescription().equals(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + expectedTransformId);
173+
}
174+
175+
static boolean match(Task task, Collection<String> expectedTransformIds) {
176+
return task instanceof PutTransformAction.TransformTaskMatcher
177+
&& expectedTransformIds.stream()
178+
.anyMatch(transformId -> task.getDescription().equals(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transformId));
179+
}
180+
}
181+
168182
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformAction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,7 @@ public boolean equals(Object obj) {
9898

9999
@Override
100100
public boolean match(Task task) {
101-
if (task.getDescription().startsWith(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX)) {
102-
String taskId = task.getDescription().substring(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX.length());
103-
return taskId.equals(this.id);
104-
}
105-
return false;
101+
return PutTransformAction.TransformTaskMatcher.match(task, id);
106102
}
107103
}
108104

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,7 @@ public boolean equals(Object obj) {
170170

171171
@Override
172172
public boolean match(Task task) {
173-
if (task.getDescription().startsWith(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX)) {
174-
String taskId = task.getDescription().substring(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX.length());
175-
if (expandedIds != null) {
176-
return expandedIds.contains(taskId);
177-
}
178-
}
179-
180-
return false;
173+
return PutTransformAction.TransformTaskMatcher.match(task, expandedIds);
181174
}
182175
}
183176

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,7 @@ public boolean equals(Object obj) {
185185

186186
@Override
187187
public boolean match(Task task) {
188-
if (task.getDescription().startsWith(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX)) {
189-
String taskId = task.getDescription().substring(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX.length());
190-
return taskId.equals(this.id);
191-
}
192-
return false;
188+
return PutTransformAction.TransformTaskMatcher.match(task, id);
193189
}
194190
}
195191

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.transform.action;
9+
10+
import org.elasticsearch.tasks.Task;
11+
import org.elasticsearch.tasks.TaskId;
12+
13+
import java.util.Map;
14+
15+
class FakeTransformTask extends Task implements PutTransformAction.TransformTaskMatcher {
16+
FakeTransformTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
17+
super(id, type, action, description, parentTask, headers);
18+
}
19+
20+
FakeTransformTask(
21+
long id,
22+
String type,
23+
String action,
24+
String description,
25+
TaskId parentTask,
26+
long startTime,
27+
long startTimeNanos,
28+
Map<String, String> headers
29+
) {
30+
super(id, type, action, description, parentTask, startTime, startTimeNanos, headers);
31+
}
32+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformStatsActionRequestTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,11 @@ public void testCreateTask() {
4747
assertThat(task, is(instanceOf(CancellableTask.class)));
4848
assertThat(task.getDescription(), is(equalTo("get_transform_stats[some-transform]")));
4949
}
50+
51+
public void testMatch() {
52+
Request request = new Request("some-transform", null, false);
53+
assertTrue(request.match(new FakeTransformTask(123, "", "", "data_frame_some-transform", null, null)));
54+
assertFalse(request.match(new FakeTransformTask(123, "", "", "some-transform", null, null)));
55+
assertFalse(request.match(new Task(123, "", "", "data_frame_some-transform", null, null)));
56+
}
5057
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ScheduleNowTransformActionRequestTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.elasticsearch.action.ActionRequestValidationException;
1111
import org.elasticsearch.common.io.stream.Writeable;
1212
import org.elasticsearch.core.TimeValue;
13-
import org.elasticsearch.persistent.AllocatedPersistentTask;
13+
import org.elasticsearch.tasks.Task;
1414
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1515
import org.elasticsearch.xpack.core.transform.action.ScheduleNowTransformAction.Request;
1616

@@ -59,9 +59,10 @@ public void testValidationFailure() {
5959

6060
public void testMatch() {
6161
Request request = new Request("my-transform-7", TimeValue.timeValueSeconds(5));
62-
assertTrue(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-7", null, null)));
63-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-", null, null)));
64-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-77", null, null)));
65-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "my-transform-7", null, null)));
62+
assertTrue(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-7", null, null)));
63+
assertFalse(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-", null, null)));
64+
assertFalse(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-77", null, null)));
65+
assertFalse(request.match(new FakeTransformTask(123, "", "", "my-transform-7", null, null)));
66+
assertFalse(request.match(new Task(123, "", "", "data_frame_my-transform-7", null, null)));
6667
}
6768
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ public void testSameButDifferentTimeout() {
6666
public void testMatch() {
6767
String transformId = "transform-id";
6868

69-
Task transformTask = new Task(
69+
Task transformTask = new FakeTransformTask(
7070
1L,
7171
"persistent",
7272
"action",
7373
TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transformId,
7474
TaskId.EMPTY_TASK_ID,
7575
Collections.emptyMap()
76-
);
76+
) {
77+
};
7778

7879
Request request = new Request("unrelated", false, false, null, false, false);
7980
request.setExpandedIds(Set.of("foo", "bar"));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformActionRequestTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.Writeable;
1111
import org.elasticsearch.core.TimeValue;
12-
import org.elasticsearch.persistent.AllocatedPersistentTask;
12+
import org.elasticsearch.tasks.Task;
1313
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request;
1414
import org.elasticsearch.xpack.core.transform.transforms.AuthorizationStateTests;
1515
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
@@ -73,9 +73,10 @@ protected Request mutateInstance(Request instance) {
7373

7474
public void testMatch() {
7575
Request request = new Request(randomTransformConfigUpdate(), "my-transform-7", false, null);
76-
assertTrue(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-7", null, null)));
77-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-", null, null)));
78-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "data_frame_my-transform-77", null, null)));
79-
assertFalse(request.match(new AllocatedPersistentTask(123, "", "", "my-transform-7", null, null)));
76+
assertTrue(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-7", null, null)));
77+
assertFalse(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-", null, null)));
78+
assertFalse(request.match(new FakeTransformTask(123, "", "", "data_frame_my-transform-77", null, null)));
79+
assertFalse(request.match(new FakeTransformTask(123, "", "", "my-transform-7", null, null)));
80+
assertFalse(request.match(new Task(123, "", "", "data_frame_my-transform-7", null, null)));
8081
}
8182
}

0 commit comments

Comments
 (0)