Skip to content

Commit b3708dd

Browse files
[7.16] [ML] Parent datafeed actions to the datafeed's persistent task (#81143) (#81153)
* [ML] Parent datafeed actions to the datafeed's persistent task (#81143) The vast majority of a datafeed's actions are executed from the data extractor. This includes the heaviest actions which are the searches. This commit passes a `ParentTaskAssigningClient` to `DataExtractorFactory.create` which ensures the client used by any extractor will be setting the corresponding task id: the action task id for preview datafeed and the master operation stage of the start datafeed action, and the persistent task id for the datafeed operations after it has started. * Can't parent on the start datafeed master op as we don't have task
1 parent deb0d4c commit b3708dd

File tree

3 files changed

+45
-8
lines changed

3 files changed

+45
-8
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.action.support.HandledTransportAction;
1515
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.client.ParentTaskAssigningClient;
17+
import org.elasticsearch.cluster.service.ClusterService;
1618
import org.elasticsearch.common.bytes.BytesArray;
1719
import org.elasticsearch.common.inject.Inject;
1820
import org.elasticsearch.common.settings.Settings;
@@ -53,6 +55,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
5355

5456
private final ThreadPool threadPool;
5557
private final Client client;
58+
private final ClusterService clusterService;
5659
private final JobConfigProvider jobConfigProvider;
5760
private final DatafeedConfigProvider datafeedConfigProvider;
5861
private final NamedXContentRegistry xContentRegistry;
@@ -65,13 +68,15 @@ public TransportPreviewDatafeedAction(
6568
TransportService transportService,
6669
ActionFilters actionFilters,
6770
Client client,
71+
ClusterService clusterService,
6872
JobConfigProvider jobConfigProvider,
6973
DatafeedConfigProvider datafeedConfigProvider,
7074
NamedXContentRegistry xContentRegistry
7175
) {
7276
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
7377
this.threadPool = threadPool;
7478
this.client = client;
79+
this.clusterService = clusterService;
7580
this.jobConfigProvider = jobConfigProvider;
7681
this.datafeedConfigProvider = datafeedConfigProvider;
7782
this.xContentRegistry = xContentRegistry;
@@ -84,12 +89,12 @@ public TransportPreviewDatafeedAction(
8489
protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
8590
ActionListener<DatafeedConfig> datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> {
8691
if (request.getJobConfig() != null) {
87-
previewDatafeed(datafeedConfig, request.getJobConfig().build(new Date()), listener);
92+
previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), listener);
8893
return;
8994
}
9095
jobConfigProvider.getJob(
9196
datafeedConfig.getJobId(),
92-
ActionListener.wrap(jobBuilder -> previewDatafeed(datafeedConfig, jobBuilder.build(), listener), listener::onFailure)
97+
ActionListener.wrap(jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), listener), listener::onFailure)
9398
);
9499
}, listener::onFailure);
95100
if (request.getDatafeedConfig() != null) {
@@ -102,7 +107,12 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
102107
}
103108
}
104109

105-
private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListener<PreviewDatafeedAction.Response> listener) {
110+
private void previewDatafeed(
111+
Task task,
112+
DatafeedConfig datafeedConfig,
113+
Job job,
114+
ActionListener<PreviewDatafeedAction.Response> listener
115+
) {
106116
DatafeedConfig.Builder previewDatafeedBuilder = buildPreviewDatafeed(datafeedConfig);
107117
useSecondaryAuthIfAvailable(securityContext, () -> {
108118
previewDatafeedBuilder.setHeaders(filterSecurityHeaders(threadPool.getThreadContext().getHeaders()));
@@ -111,7 +121,7 @@ private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListe
111121
// requesting the preview doesn't have permission to search the relevant indices.
112122
DatafeedConfig previewDatafeedConfig = previewDatafeedBuilder.build();
113123
DataExtractorFactory.create(
114-
client,
124+
new ParentTaskAssigningClient(client, clusterService.localNode(), task),
115125
previewDatafeedConfig,
116126
job,
117127
xContentRegistry,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class DatafeedJobBuilder {
4444
private final Supplier<Long> currentTimeSupplier;
4545
private final JobResultsPersister jobResultsPersister;
4646
private final boolean remoteClusterClient;
47-
private final String nodeName;
47+
private final ClusterService clusterService;
4848

4949
private volatile long delayedDataCheckFreq;
5050

@@ -65,8 +65,8 @@ public DatafeedJobBuilder(
6565
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
6666
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
6767
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
68-
this.nodeName = clusterService.getNodeName();
6968
this.delayedDataCheckFreq = DELAYED_DATA_CHECK_FREQ.get(settings).millis();
69+
this.clusterService = Objects.requireNonNull(clusterService);
7070
clusterService.getClusterSettings().addSettingsUpdateConsumer(DELAYED_DATA_CHECK_FREQ, this::setDelayedDataCheckFreq);
7171
}
7272

@@ -75,7 +75,7 @@ private void setDelayedDataCheckFreq(TimeValue value) {
7575
}
7676

7777
void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener<DatafeedJob> listener) {
78-
final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, task.getParentTaskId());
78+
final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
7979
final DatafeedConfig datafeedConfig = context.getDatafeedConfig();
8080
final Job job = context.getJob();
8181
final long latestFinalBucketEndMs = context.getRestartTimeInfo().getLatestFinalBucketTimeMs() == null
@@ -155,7 +155,12 @@ private void checkRemoteIndicesAreAvailable(DatafeedConfig datafeedConfig) {
155155
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfig.getIndices());
156156
if (remoteIndices.isEmpty() == false) {
157157
throw ExceptionsHelper.badRequestException(
158-
Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, datafeedConfig.getId(), remoteIndices, nodeName)
158+
Messages.getMessage(
159+
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
160+
datafeedConfig.getId(),
161+
remoteIndices,
162+
clusterService.getNodeName()
163+
)
159164
);
160165
}
161166
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@
66
*/
77
package org.elasticsearch.xpack.ml.datafeed;
88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.ClusterName;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.block.ClusterBlocks;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1117
import org.elasticsearch.cluster.routing.OperationRouting;
1218
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
1319
import org.elasticsearch.cluster.service.ClusterApplierService;
@@ -39,6 +45,8 @@
3945
import java.util.HashSet;
4046
import java.util.concurrent.atomic.AtomicBoolean;
4147

48+
import static java.util.Collections.emptyMap;
49+
import static java.util.Collections.emptySet;
4250
import static org.elasticsearch.test.NodeRoles.nonRemoteClusterClientNode;
4351
import static org.hamcrest.Matchers.equalTo;
4452
import static org.hamcrest.Matchers.is;
@@ -80,11 +88,25 @@ public void init() {
8088
)
8189
)
8290
);
91+
final DiscoveryNode localNode = new DiscoveryNode(
92+
"test_node",
93+
buildNewFakeTransportAddress(),
94+
emptyMap(),
95+
emptySet(),
96+
Version.CURRENT
97+
);
8398
clusterService = new ClusterService(
8499
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(),
85100
clusterSettings,
86101
threadPool
87102
);
103+
clusterService.getClusterApplierService()
104+
.setInitialState(
105+
ClusterState.builder(new ClusterName("DatafeedJobBuilderTests"))
106+
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
107+
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
108+
.build()
109+
);
88110

89111
datafeedJobBuilder = new DatafeedJobBuilder(
90112
client,

0 commit comments

Comments
 (0)