Skip to content

Commit cde98aa

Browse files
authored
[Transform] ResolveIndex from TransformUpdate (#144208)
Both CCS and CPS need to call ResolveIndexAction in order to get source indices from a remote cluster or project.
1 parent b9fd27c commit cde98aa

File tree

3 files changed

+323
-19
lines changed

3 files changed

+323
-19
lines changed

x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCCSCanMatchIT.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionListener;
1515
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest;
1616
import org.elasticsearch.action.admin.cluster.snapshots.features.TransportResetFeatureStateAction;
17+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1718
import org.elasticsearch.action.search.SearchRequest;
1819
import org.elasticsearch.action.support.IndicesOptions;
1920
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.node.NodeRoleSettings;
3839
import org.elasticsearch.plugins.EnginePlugin;
3940
import org.elasticsearch.plugins.Plugin;
41+
import org.elasticsearch.reindex.ReindexPlugin;
4042
import org.elasticsearch.search.SearchModule;
4143
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
4244
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -52,14 +54,19 @@
5254
import org.elasticsearch.xpack.core.transform.MockDeprecatedAggregationBuilder;
5355
import org.elasticsearch.xpack.core.transform.MockDeprecatedQueryBuilder;
5456
import org.elasticsearch.xpack.core.transform.TransformNamedXContentProvider;
57+
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
5558
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
5659
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
5760
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
5861
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
62+
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
63+
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
5964
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
6065
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
6166
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
67+
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
6268
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
69+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
6370
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
6471
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig;
6572
import org.elasticsearch.xpack.transform.LocalStateTransform;
@@ -84,6 +91,7 @@
8491
import static org.hamcrest.Matchers.equalTo;
8592
import static org.hamcrest.Matchers.hasSize;
8693
import static org.hamcrest.Matchers.is;
94+
import static org.hamcrest.Matchers.notNullValue;
8795
import static org.hamcrest.Matchers.nullValue;
8896

8997
public class TransformCCSCanMatchIT extends AbstractMultiClustersTestCase {
@@ -392,6 +400,72 @@ private void testTransformLifecycle(QueryBuilder query, long expectedHitCount) t
392400
});
393401
}
394402

403+
public void testUpdateTransformCreatesDestIndexWhenRunningAndDestMissingWithRemoteSource() throws Exception {
404+
String transformId = "test-ccs-transform-update-dest";
405+
String destIndex = transformId + "-dest";
406+
407+
// Create a continuous transform sourced from both local and remote indices (CCS).
408+
// ZERO delay ensures the already-indexed documents are included in the first checkpoint.
409+
TransformConfig transformConfig = TransformConfig.builder()
410+
.setId(transformId)
411+
.setSource(new SourceConfig(new String[] { "local_*", "*:remote_*" }, QueryConfig.matchAll(), Map.of(), null))
412+
.setDest(new DestConfig(destIndex, null, null))
413+
.setFrequency(TimeValue.timeValueMinutes(1))
414+
.setSyncConfig(new TimeSyncConfig("@timestamp", TimeValue.ZERO))
415+
.setLatestConfig(new LatestConfig(List.of("position"), "@timestamp"))
416+
.build();
417+
418+
client().execute(PutTransformAction.INSTANCE, new PutTransformAction.Request(transformConfig, false, TIMEOUT)).actionGet(TIMEOUT);
419+
client().execute(StartTransformAction.INSTANCE, new StartTransformAction.Request(transformId, null, TIMEOUT)).actionGet(TIMEOUT);
420+
421+
// Wait for the first checkpoint to create the destination index.
422+
assertBusy(
423+
() -> assertThat(
424+
"dest index should be created after the first checkpoint",
425+
cluster(LOCAL_CLUSTER).clusterService().state().metadata().getProject().index(destIndex),
426+
is(notNullValue())
427+
)
428+
);
429+
430+
// Delete the destination index to simulate it going missing while the transform is still running.
431+
client(LOCAL_CLUSTER).admin().indices().delete(new DeleteIndexRequest(destIndex)).actionGet(TIMEOUT);
432+
assertThat(cluster(LOCAL_CLUSTER).clusterService().state().metadata().getProject().index(destIndex), is(nullValue()));
433+
434+
// Update the transform. A description change ensures we go through updateTransformConfiguration
435+
// (an EMPTY update against a current-version config returns NONE early without touching the dest index).
436+
client().execute(
437+
UpdateTransformAction.INSTANCE,
438+
new UpdateTransformAction.Request(
439+
new TransformConfigUpdate(null, null, null, null, "post-delete-update", null, null, null),
440+
transformId,
441+
false,
442+
TIMEOUT
443+
)
444+
).actionGet(TIMEOUT);
445+
446+
// The update should have invoked resolveSourceIndicesAndCreateDestIfNeeded, which calls ResolveIndexAction
447+
// to resolve the remote CCS source indices and then recreates the destination index.
448+
assertThat(
449+
"update should have recreated the destination index using ResolveIndexAction for CCS source resolution",
450+
cluster(LOCAL_CLUSTER).clusterService().state().metadata().getProject().index(destIndex),
451+
is(notNullValue())
452+
);
453+
454+
// Cleanup
455+
stopTransform(transformId);
456+
deleteTransform(transformId);
457+
}
458+
459+
private void stopTransform(String transformId) {
460+
client().execute(StopTransformAction.INSTANCE, new StopTransformAction.Request(transformId, true, true, TIMEOUT, false, false))
461+
.actionGet(TIMEOUT);
462+
}
463+
464+
private void deleteTransform(String transformId) {
465+
client().execute(DeleteTransformAction.INSTANCE, new DeleteTransformAction.Request(transformId, true, false, TIMEOUT))
466+
.actionGet(TIMEOUT);
467+
}
468+
395469
@Override
396470
protected NamedXContentRegistry xContentRegistry() {
397471
return namedXContentRegistry;
@@ -405,7 +479,10 @@ protected List<String> remoteClusterAlias() {
405479
@Override
406480
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
407481
return CollectionUtils.appendToCopy(
408-
CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), LocalStateTransform.class),
482+
CollectionUtils.appendToCopy(
483+
CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), LocalStateTransform.class),
484+
ReindexPlugin.class
485+
),
409486
ExposingTimestampEnginePlugin.class
410487
);
411488
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.lucene.util.SetOnce;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
1415
import org.elasticsearch.action.support.IndicesOptions;
1516
import org.elasticsearch.client.internal.Client;
1617
import org.elasticsearch.cluster.ClusterState;
@@ -21,7 +22,6 @@
2122
import org.elasticsearch.core.TimeValue;
2223
import org.elasticsearch.index.engine.VersionConflictEngineException;
2324
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
24-
import org.elasticsearch.transport.RemoteClusterAware;
2525
import org.elasticsearch.xpack.core.ClientHelper;
2626
import org.elasticsearch.xpack.core.XPackSettings;
2727
import org.elasticsearch.xpack.core.security.SecurityContext;
@@ -37,7 +37,6 @@
3737
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
3838
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
3939

40-
import java.util.Arrays;
4140
import java.util.Map;
4241

4342
/**
@@ -337,21 +336,13 @@ private static void updateTransformConfiguration(
337336
final String destinationIndex = config.getDestination().getIndex();
338337
String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destinationIndex);
339338

340-
// FIXME: what do we do with remote indices?
341-
String[] sourceIndices = Arrays.stream(config.getSource().getIndex())
342-
.filter(ind -> RemoteClusterAware.isRemoteIndexName(ind) == false)
343-
.toArray(String[]::new);
344-
String[] src = sourceIndices.length > 0
345-
? indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), true, sourceIndices)
346-
: null;
347339
// If we are running, we should verify that the destination index exists and create it if it does not
348-
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, config.getId()) != null && dest.length == 0
349-
// Verify we have source indices. The user could defer_validations and if the task is already running
350-
// we allow source indices to disappear. If the source and destination indices do not exist, don't do anything
351-
// the transform will just have to dynamically create the destination index without special mapping.
352-
&& src != null
353-
&& src.length > 0) {
354-
TransformIndex.createDestinationIndex(
340+
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, config.getId()) != null && dest.length == 0) {
341+
// Resolve source indices (including remote) to verify they exist before creating the dest index.
342+
// The user could defer_validations and if the task is already running we allow source indices to
343+
// disappear. If the source and destination indices do not exist, don't do anything -- the transform
344+
// will just have to dynamically create the destination index without special mapping.
345+
resolveSourceIndicesAndCreateDestIfNeeded(
355346
client,
356347
auditor,
357348
indexNameExpressionResolver,
@@ -366,5 +357,52 @@ private static void updateTransformConfiguration(
366357
}
367358
}
368359

360+
private static void resolveSourceIndicesAndCreateDestIfNeeded(
361+
Client client,
362+
TransformAuditor auditor,
363+
IndexNameExpressionResolver indexNameExpressionResolver,
364+
ClusterState clusterState,
365+
TransformConfig config,
366+
Settings destIndexSettings,
367+
Map<String, String> destIndexMappings,
368+
ActionListener<Boolean> listener
369+
) {
370+
ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request(
371+
config.getSource().getIndex(),
372+
config.getSource().indicesOptions()
373+
);
374+
ClientHelper.executeAsyncWithOrigin(
375+
client,
376+
ClientHelper.TRANSFORM_ORIGIN,
377+
ResolveIndexAction.INSTANCE,
378+
resolveRequest,
379+
ActionListener.wrap(resolveResponse -> {
380+
boolean hasSourceIndices = resolveResponse.getIndices().isEmpty() == false
381+
|| resolveResponse.getAliases().isEmpty() == false
382+
|| resolveResponse.getDataStreams().isEmpty() == false;
383+
if (hasSourceIndices) {
384+
TransformIndex.createDestinationIndex(
385+
client,
386+
auditor,
387+
indexNameExpressionResolver,
388+
clusterState,
389+
config,
390+
destIndexSettings,
391+
destIndexMappings,
392+
listener
393+
);
394+
} else {
395+
listener.onResponse(null);
396+
}
397+
}, e -> {
398+
logger.debug(
399+
() -> "[" + config.getId() + "] failed to resolve source indices during update, skipping dest index creation",
400+
e
401+
);
402+
listener.onResponse(null);
403+
})
404+
);
405+
}
406+
369407
private TransformUpdater() {}
370408
}

0 commit comments

Comments
 (0)