Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/122074.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 122074
summary: Delete Alias Write Index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the summary explain what is the visible impact of this PR for the customer.
Maybe sth like "Fix Delete API with delete_dest_index parameter when the destination index alias is used"?

area: Transform
type: bug
issues:
- 121913
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testDeleteWithParamDeletesAutoCreatedDestinationIndex() throws Excep

deleteTransform(transformId, false, true);
assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDest));
assertFalse(aliasExists(transformDestAlias));
}

public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception {
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws E
assertFalse(aliasExists(transformDestAlias));
}

public void testDeleteWithParamDoesNotDeleteManuallySetUpAlias() throws Exception {
public void testDeleteWithManuallyCreatedIndexAndManuallyCreatedAlias() throws Exception {
String transformId = "transform-4";
String transformDest = transformId + "_idx";
String transformDestAlias = transformId + "_alias";
Expand All @@ -158,31 +158,106 @@ public void testDeleteWithParamDoesNotDeleteManuallySetUpAlias() throws Exceptio
assertTrue(indexExists(transformDest));
assertTrue(aliasExists(transformDestAlias));

deleteTransform(transformId, false, true);
assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDestAlias));
}

public void testDeleteDestinationIndexIsNoOpWhenNoDestinationIndexExists() throws Exception {
String transformId = "transform-5";
String transformDest = transformId + "_idx";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);

createTransform(transformId, transformDest, transformDestAlias);
assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDestAlias));

deleteTransform(transformId, false, true);
assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDestAlias));
}

public void testDeleteWithAliasPointingToManyIndices() throws Exception {
var transformId = "transform-6";
var transformDest = transformId + "_idx";
var otherIndex = "some-other-index-6";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, otherIndex, transformDestAlias);

createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }");
createIndex(otherIndex, null, null, "\"" + transformDestAlias + "\": {}");

assertTrue(indexExists(transformDest));
assertTrue(indexExists(otherIndex));
assertTrue(aliasExists(transformDestAlias));

createTransform(transformId, transformDestAlias, null);

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);

stopTransform(transformId, false);

assertTrue(indexExists(transformDest));
assertTrue(indexExists(otherIndex));
assertTrue(aliasExists(transformDestAlias));

deleteTransform(transformId, false, true);

assertFalse(indexExists(transformDest));
assertTrue(indexExists(otherIndex));
assertTrue(aliasExists(transformDestAlias));
}

public void testDeleteWithNoWriteIndexThrowsException() throws Exception {
var transformId = "transform-7";
var transformDest = transformId + "_idx";
var otherIndex = "some-other-index-7";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, otherIndex, transformDestAlias);

createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": {}");

assertTrue(indexExists(transformDest));
assertTrue(aliasExists(transformDestAlias));

createTransform(transformId, transformDestAlias, null);

createIndex(otherIndex, null, null, "\"" + transformDestAlias + "\": {}");
assertTrue(indexExists(otherIndex));

ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, false, true));
assertThat(
e.getMessage(),
containsString(
Strings.format(
"The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.",
"Cannot disambiguate destination index alias [%s]. Alias points to many indices with no clear write alias."
+ " Retry with delete_dest_index=false and manually clean up destination index.",
transformDestAlias
)
)
);
}

public void testDeleteDestinationIndexIsNoOpWhenNoDestinationIndexExists() throws Exception {
String transformId = "transform-5";
String transformDest = transformId + "_idx";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
public void testDeleteWithAlreadyDeletedIndex() throws Exception {
var transformId = "transform-8";
var transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);

createIndex(transformDest);

assertTrue(indexExists(transformDest));

createTransform(transformId, transformDest, null);

deleteIndex(transformDest);

createTransform(transformId, transformDest, transformDestAlias);
assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDestAlias));

deleteTransform(transformId, false, true);

assertFalse(indexExists(transformDest));
assertFalse(aliasExists(transformDestAlias));
}

private void createTransform(String transformId, String destIndex, String destAlias) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ protected void updateTransform(String transformId, String update, boolean deferV
}
updateTransformRequest.setJsonEntity(update);

client().performRequest(updateTransformRequest);
assertOKAndConsume(client().performRequest(updateTransformRequest));
}

protected void startTransform(String transformId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.client.internal.Client;
Expand All @@ -27,6 +31,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -42,6 +47,8 @@
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformTask;

import java.util.Objects;

import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
Expand Down Expand Up @@ -146,20 +153,31 @@ private void deleteDestinationIndex(
TimeValue timeout,
ActionListener<AcknowledgedResponse> listener
) {
// <3> Check if the error is "index not found" error. If so, just move on. The index is already deleted.
ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(listener::onResponse, e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(AcknowledgedResponse.TRUE);
} else {
listener.onFailure(e);
}
});
getTransformConfig(transformId).<AcknowledgedResponse>andThen((l, r) -> deleteDestinationIndex(r.v1(), parentTaskId, timeout, l))
.addListener(listener.delegateResponse((l, e) -> {
if (e instanceof IndexNotFoundException) {
l.onResponse(AcknowledgedResponse.TRUE);
} else {
l.onFailure(e);
}
}));
}

// <2> Delete destination index
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
transformConfigAndVersion -> {
TransformConfig config = transformConfigAndVersion.v1();
String destIndex = config.getDestination().getIndex();
private SubscribableListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfig(String transformId) {
return SubscribableListener.newForked(l -> transformConfigManager.getTransformConfigurationForUpdate(transformId, l));
}

/**
* Delete the destination index. If the Transform is configured to write to an alias, then follow that alias to the concrete index.
*/
private void deleteDestinationIndex(
TransformConfig config,
TaskId parentTaskId,
TimeValue timeout,
ActionListener<AcknowledgedResponse> listener
) {
SubscribableListener.<String>newForked(l -> resolveDestinationIndex(config, parentTaskId, timeout, l))
.<AcknowledgedResponse>andThen((l, destIndex) -> {
DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
deleteDestIndexRequest.ackTimeout(timeout);
deleteDestIndexRequest.setParentTask(parentTaskId);
Expand All @@ -169,14 +187,57 @@ private void deleteDestinationIndex(
client,
TransportDeleteIndexAction.TYPE,
deleteDestIndexRequest,
deleteDestIndexListener
l
);
},
listener::onFailure
);
})
.addListener(listener);
}

private void resolveDestinationIndex(TransformConfig config, TaskId parentTaskId, TimeValue timeout, ActionListener<String> listener) {
var destIndex = config.getDestination().getIndex();
var responseListener = ActionListener.<GetAliasesResponse>wrap(r -> findDestinationIndexInAliases(r, destIndex, listener), e -> {
if (e instanceof AliasesNotFoundException) {
// no alias == the destIndex is our concrete index
listener.onResponse(destIndex);
} else {
listener.onFailure(e);
}
});

GetAliasesRequest request = new GetAliasesRequest(timeout, config.getDestination().getIndex());
request.setParentTask(parentTaskId);
executeWithHeadersAsync(config.getHeaders(), TRANSFORM_ORIGIN, client, GetAliasesAction.INSTANCE, request, responseListener);
}

// <1> Fetch transform configuration
transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener);
private void findDestinationIndexInAliases(GetAliasesResponse aliases, String destIndex, ActionListener<String> listener) {
var indexToAliases = aliases.getAliases();
if (indexToAliases.isEmpty()) {
// if the alias list is empty, that means the index is a concrete index
listener.onResponse(destIndex);
} else if (indexToAliases.size() == 1) {
// if there is one value, the alias will treat it as the write index, so it's our destination index
listener.onResponse(indexToAliases.keySet().iterator().next());
} else {
// if there is more than one index, there may be more than one alias for each index
// we have to search for the alias that matches our destination index name AND is declared the write index for that alias
indexToAliases.entrySet().stream().map(entry -> {
if (entry.getValue().stream().anyMatch(md -> destIndex.equals(md.getAlias()) && Boolean.TRUE.equals(md.writeIndex()))) {
return entry.getKey();
} else {
return null;
}
}).filter(Objects::nonNull).findFirst().ifPresentOrElse(listener::onResponse, () -> {
listener.onFailure(
new ElasticsearchStatusException(
"Cannot disambiguate destination index alias ["
+ destIndex
+ "]. Alias points to many indices with no clear write alias. Retry with delete_dest_index=false and manually"
+ " clean up destination index.",
RestStatus.CONFLICT
)
);
});
}
}

@Override
Expand Down