Skip to content

Commit af83c2e

Browse files
authored
[Transform] Use seqno when migrating max_page_search_size (elastic#120639) (elastic#120679)
Refetch TransformConfig with SeqNo before migrating max_page_search_size, then include the SeqNo in the index request to ensure we are not overwriting a user's intentional config change. If we are, just drop the migration and try again next time.
1 parent 670faa8 commit af83c2e

File tree

3 files changed

+72
-21
lines changed

3 files changed

+72
-21
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ public Collection<?> createComponents(PluginServices services) {
322322
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler, transformNode));
323323

324324
var transformMeterRegistry = TransformMeterRegistry.create(services.telemetryProvider().getMeterRegistry());
325-
transformConfigAutoMigration.set(new TransformConfigAutoMigration(configManager, auditor, transformMeterRegistry));
325+
transformConfigAutoMigration.set(
326+
new TransformConfigAutoMigration(configManager, auditor, transformMeterRegistry, services.threadPool())
327+
);
326328

327329
return List.of(
328330
transformServices.get(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformConfigAutoMigration.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.SubscribableListener;
1314
import org.elasticsearch.common.logging.DeprecationCategory;
1415
import org.elasticsearch.common.logging.DeprecationLogger;
16+
import org.elasticsearch.core.Tuple;
17+
import org.elasticsearch.threadpool.ThreadPool;
1518
import org.elasticsearch.xpack.core.transform.TransformField;
1619
import org.elasticsearch.xpack.core.transform.TransformMessages;
1720
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
1821
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
22+
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
1923
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
2024
import org.elasticsearch.xpack.transform.telemetry.TransformMeterRegistry;
2125

@@ -34,15 +38,18 @@ public class TransformConfigAutoMigration {
3438
private final TransformConfigManager transformConfigManager;
3539
private final TransformAuditor auditor;
3640
private final TransformMeterRegistry transformMeterRegistry;
41+
private final ThreadPool threadPool;
3742

3843
TransformConfigAutoMigration(
3944
TransformConfigManager transformConfigManager,
4045
TransformAuditor auditor,
41-
TransformMeterRegistry transformMeterRegistry
46+
TransformMeterRegistry transformMeterRegistry,
47+
ThreadPool threadPool
4248
) {
4349
this.transformConfigManager = transformConfigManager;
4450
this.auditor = auditor;
4551
this.transformMeterRegistry = transformMeterRegistry;
52+
this.threadPool = threadPool;
4653
}
4754

4855
public TransformConfig migrate(TransformConfig currentConfig) {
@@ -75,18 +82,26 @@ public TransformConfig migrate(TransformConfig currentConfig) {
7582
}
7683

7784
public void migrateAndSave(TransformConfig currentConfig, ActionListener<TransformConfig> listener) {
78-
var updatedConfig = migrate(currentConfig);
79-
if (currentConfig == updatedConfig) {
85+
// most transforms shouldn't need to migrate, so let's exit early
86+
if (currentConfig.shouldAutoMigrateMaxPageSearchSize() == false) {
8087
listener.onResponse(currentConfig);
81-
} else {
82-
ActionListener<Boolean> putConfigListener = ActionListener.wrap(r -> listener.onResponse(updatedConfig), e -> {
83-
var errorMessage = "Failed to persist auto-migrated Config. Please see Elasticsearch logs. Continuing with old config.";
84-
logger.atWarn().withThrowable(e).log(errorMessage);
85-
auditor.warning(currentConfig.getId(), errorMessage);
86-
listener.onResponse(currentConfig);
87-
});
88-
89-
transformConfigManager.putTransformConfiguration(updatedConfig, putConfigListener);
88+
return;
9089
}
90+
91+
SubscribableListener.<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>>newForked(l -> {
92+
transformConfigManager.getTransformConfigurationForUpdate(currentConfig.getId(), l);
93+
}).<TransformConfig>andThen(threadPool.generic(), threadPool.getThreadContext(), (l, configAndVersion) -> {
94+
var updatedConfig = migrate(configAndVersion.v1());
95+
if (configAndVersion.v1() == updatedConfig) {
96+
l.onResponse(currentConfig);
97+
} else {
98+
transformConfigManager.updateTransformConfiguration(updatedConfig, configAndVersion.v2(), l.map(ignored -> updatedConfig));
99+
}
100+
}).addListener(listener.delegateResponse((l, e) -> {
101+
var errorMessage = "Failed to auto-migrate Config. Please see Elasticsearch logs. Continuing with old config.";
102+
logger.atWarn().withThrowable(e).log(errorMessage);
103+
auditor.warning(currentConfig.getId(), errorMessage);
104+
l.onResponse(currentConfig);
105+
}), threadPool.generic(), threadPool.getThreadContext());
91106
}
92107
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformConfigAutoMigrationTests.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
package org.elasticsearch.xpack.transform;
99

1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1112
import org.elasticsearch.core.CheckedConsumer;
13+
import org.elasticsearch.core.Tuple;
1214
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.threadpool.ThreadPool;
1316
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
1417
import org.elasticsearch.xpack.core.transform.TransformMessages;
1518
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
@@ -18,6 +21,7 @@
1821
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
1922
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
2023
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
24+
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
2125
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
2226
import org.elasticsearch.xpack.transform.telemetry.TransformMeterRegistry;
2327
import org.junit.Before;
@@ -40,6 +44,7 @@
4044
import static org.mockito.Mockito.only;
4145
import static org.mockito.Mockito.verify;
4246
import static org.mockito.Mockito.verifyNoInteractions;
47+
import static org.mockito.Mockito.when;
4348

4449
public class TransformConfigAutoMigrationTests extends ESTestCase {
4550
private TransformConfigManager transformConfigManager;
@@ -51,8 +56,10 @@ public void setUp() throws Exception {
5156
super.setUp();
5257
transformConfigManager = mock();
5358
auditor = mock();
59+
ThreadPool threadPool = mock();
60+
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
5461

55-
autoMigration = new TransformConfigAutoMigration(transformConfigManager, auditor, TransformMeterRegistry.noOp());
62+
autoMigration = new TransformConfigAutoMigration(transformConfigManager, auditor, TransformMeterRegistry.noOp(), threadPool);
5663
}
5764

5865
protected boolean enableWarningsCheck() {
@@ -127,32 +134,59 @@ private void testMigration(TransformConfig config, CheckedConsumer<TransformConf
127134

128135
public void testMigrateAndSaveSuccess() throws InterruptedException {
129136
doAnswer(ans -> {
130-
ActionListener<Boolean> listener = ans.getArgument(1);
137+
ActionListener<Boolean> listener = ans.getArgument(2);
131138
listener.onResponse(true);
132139
return null;
133-
}).when(transformConfigManager).putTransformConfiguration(any(), any());
140+
}).when(transformConfigManager).updateTransformConfiguration(any(), any(), any());
134141
var originalConfig = randomTransformConfigWithDeprecatedSettings();
142+
doAnswer(ans -> {
143+
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> listener = ans.getArgument(1);
144+
listener.onResponse(Tuple.tuple(originalConfig, mock()));
145+
return null;
146+
}).when(transformConfigManager).getTransformConfigurationForUpdate(any(), any());
135147

136148
testMigration(originalConfig, updatedConfig -> {
137149
assertThatMaxPageSearchSizeMigrated(updatedConfig, originalConfig);
138150
verify(auditor, only()).info(eq(updatedConfig.getId()), eq(TransformMessages.MAX_PAGE_SEARCH_SIZE_MIGRATION));
139151
});
140152
}
141153

142-
public void testMigrateAndSaveWithErrors() throws InterruptedException {
154+
public void testMigrateAndSaveWithGetSequenceError() throws InterruptedException {
155+
var originalConfig = randomTransformConfigWithDeprecatedSettings();
143156
doAnswer(ans -> {
144-
ActionListener<Boolean> listener = ans.getArgument(1);
145-
listener.onFailure(new IllegalStateException("This is a failure"));
157+
ActionListener<?> listener = ans.getArgument(1);
158+
listener.onFailure(new IllegalStateException("This is a getSequence failure."));
146159
return null;
147-
}).when(transformConfigManager).putTransformConfiguration(any(), any());
160+
}).when(transformConfigManager).getTransformConfigurationForUpdate(any(), any());
161+
162+
testMigration(originalConfig, updatedConfig -> {
163+
assertThat(updatedConfig, sameInstance(originalConfig));
164+
verify(auditor, only()).warning(
165+
eq(originalConfig.getId()),
166+
eq("Failed to auto-migrate Config. Please see Elasticsearch logs. Continuing with old config.")
167+
);
168+
});
169+
}
170+
171+
public void testMigrateAndSaveWithUpdateError() throws InterruptedException {
172+
doAnswer(ans -> {
173+
ActionListener<Boolean> listener = ans.getArgument(2);
174+
listener.onFailure(new IllegalStateException("This is an update failure"));
175+
return null;
176+
}).when(transformConfigManager).updateTransformConfiguration(any(), any(), any());
148177
var originalConfig = randomTransformConfigWithDeprecatedSettings();
178+
doAnswer(ans -> {
179+
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> listener = ans.getArgument(1);
180+
listener.onResponse(Tuple.tuple(originalConfig, mock()));
181+
return null;
182+
}).when(transformConfigManager).getTransformConfigurationForUpdate(any(), any());
149183

150184
testMigration(originalConfig, updatedConfig -> {
151185
assertThat(updatedConfig, sameInstance(originalConfig));
152186
verify(auditor).info(eq(updatedConfig.getId()), eq(TransformMessages.MAX_PAGE_SEARCH_SIZE_MIGRATION));
153187
verify(auditor).warning(
154188
eq(originalConfig.getId()),
155-
eq("Failed to persist auto-migrated Config. Please see Elasticsearch logs. Continuing with old config.")
189+
eq("Failed to auto-migrate Config. Please see Elasticsearch logs. Continuing with old config.")
156190
);
157191
});
158192
}

0 commit comments

Comments
 (0)