Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.utils.TransportVersionUtils;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.transform.TransformMetadata;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction;
Expand Down Expand Up @@ -107,7 +106,7 @@ protected void masterOperation(Task ignoredTask, Request request, ClusterState s
}

// do not allow in mixed clusters
if (TransportVersionUtils.isMinTransportVersionSameAsCurrent(state) == false) {
if (state.nodes().isMixedVersionCluster()) {
listener.onFailure(
new ElasticsearchStatusException("Cannot upgrade transforms while cluster upgrade is in progress.", RestStatus.CONFLICT)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,19 @@ public void deleteOldIndices(ActionListener<Boolean> listener) {

// use the transform context as we access system indexes
try (ThreadContext.StoredContext ctx = client.threadPool().getThreadContext().stashWithOrigin(TRANSFORM_ORIGIN)) {
indicesToDelete.addAll(
Arrays.asList(
indexNameExpressionResolver.concreteIndexNames(
state,
IndicesOptions.lenientExpandHidden(),
TransformInternalIndexConstants.INDEX_NAME_PATTERN
)
)
var matchingIndexes = indexNameExpressionResolver.concreteIndices(
state,
IndicesOptions.lenientExpandHidden(),
TransformInternalIndexConstants.INDEX_NAME_PATTERN
);

for (var index : matchingIndexes) {
var meta = state.getMetadata().getIndexSafe(index);
if (meta.isSystem() == false) { // ignore system indices as these are automatically managed
indicesToDelete.add(meta.getIndex().getName());
}
}

indicesToDelete.addAll(
Arrays.asList(
indexNameExpressionResolver.concreteIndexNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ public void setUpLogging() throws IOException {
request.setJsonEntity("""
{
"persistent": {
"logger.org.elasticsearch.xpack.ml.inference": "TRACE",
"logger.org.elasticsearch.xpack.ml.inference": "DEBUG",
"logger.org.elasticsearch.xpack.ml.inference.assignments": "DEBUG",
"logger.org.elasticsearch.xpack.ml.process": "DEBUG",
"logger.org.elasticsearch.xpack.ml.action": "TRACE"
"logger.org.elasticsearch.xpack.ml.action": "DEBUG"
}
}
""");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_INTERNAL_INDEX_PREFIX;
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED;
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_TASK_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -96,7 +95,6 @@ public void testTransformRollingUpgrade() throws Exception {
lastCheckpoint = 2;
}
verifyContinuousTransformHandlesData(lastCheckpoint);
verifyUpgradeFailsIfMixedCluster();
}
case UPGRADED -> {
client().performRequest(waitForYellow);
Expand Down Expand Up @@ -131,11 +129,11 @@ private void createAndStartContinuousTransform() throws Exception {

assertBusy(() -> {
var stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
assertThat((Integer) XContentMapValues.extractValue("stats.documents_indexed", stateAndStats), equalTo(ENTITIES.size()));
assertThat(
((Integer) XContentMapValues.extractValue("stats.documents_indexed", stateAndStats)).longValue(),
equalTo(ENTITIES.size())
((Integer) XContentMapValues.extractValue("stats.documents_processed", stateAndStats)).longValue(),
equalTo(totalDocsWritten)
);
assertThat((Integer) XContentMapValues.extractValue("stats.documents_processed", stateAndStats), equalTo(totalDocsWritten));
// Even if we get back to started, we may periodically get set back to `indexing` when triggered.
// Though short lived due to no changes on the source indices, it could result in flaky test behavior
assertThat(stateAndStats.get("state"), oneOf("started", "indexing"));
Expand Down Expand Up @@ -232,17 +230,6 @@ private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) t
});
}

private void verifyUpgradeFailsIfMixedCluster() {
// upgrade tests by design are also executed with the same version, this check must be skipped in this case, see gh#39102.
if (isOriginalClusterCurrent()) {
return;
}
final Request upgradeTransformRequest = new Request("POST", getTransformEndpoint() + "_upgrade");

Exception ex = expectThrows(Exception.class, () -> client().performRequest(upgradeTransformRequest));
assertThat(ex.getMessage(), containsString("All nodes must be the same version"));
}

private void verifyUpgrade() throws IOException {
final Request upgradeTransformRequest = new Request("POST", getTransformEndpoint() + "_upgrade");
Response response = client().performRequest(upgradeTransformRequest);
Expand Down