Skip to content

Commit 0971f43

Browse files
authored
[Transform] Unblock after update (#120144)
If an update changes the destination index, optimistically assume the new destination index does not have a write block and try to run the transform. If the new destination index has a write block, the transform will drop the run and move back into a blocked state. Fix #120065
1 parent 1c13465 commit 0971f43

File tree

8 files changed

+160
-7
lines changed

8 files changed

+160
-7
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ public boolean changesHeaders(TransformConfig config) {
239239
return isNullOrEqual(headers, config.getHeaders()) == false;
240240
}
241241

242+
public boolean changesDestIndex(TransformConfig config) {
243+
var updatedIndex = dest == null ? null : dest.getIndex();
244+
return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false;
245+
}
246+
242247
private static boolean isNullOrEqual(Object lft, Object rgt) {
243248
return lft == null || lft.equals(rgt);
244249
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,19 @@ public static TransformConfig randomTransformConfigWithoutHeaders(String id) {
7878
}
7979

8080
public static TransformConfig randomTransformConfigWithoutHeaders(String id, PivotConfig pivotConfig, LatestConfig latestConfig) {
81+
return randomTransformConfigWithoutHeaders(id, pivotConfig, latestConfig, randomDestConfig());
82+
}
83+
84+
public static TransformConfig randomTransformConfigWithoutHeaders(
85+
String id,
86+
PivotConfig pivotConfig,
87+
LatestConfig latestConfig,
88+
DestConfig destConfig
89+
) {
8190
return new TransformConfig(
8291
id,
8392
randomSourceConfig(),
84-
randomDestConfig(),
93+
destConfig,
8594
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
8695
randomBoolean() ? null : randomSyncConfig(),
8796
null,

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,20 @@ public void testChangesHeaders() {
126126
assertTrue("true update changes headers", update.changesHeaders(config));
127127
}
128128

129+
public void testChangesDestIndex() {
130+
TransformConfig config = randomTransformConfig();
131+
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null);
132+
assertFalse("null update does not change destination index", update.changesDestIndex(config));
133+
134+
var newDestWithSameIndex = new DestConfig(config.getDestination().getIndex(), null, null);
135+
update = new TransformConfigUpdate(null, newDestWithSameIndex, null, null, null, null, null, null);
136+
assertFalse("equal update does not change destination index", update.changesDestIndex(config));
137+
138+
var newDestWithNewIndex = new DestConfig(config.getDestination().getIndex() + "-new", null, null);
139+
update = new TransformConfigUpdate(null, newDestWithNewIndex, null, null, null, null, null, null);
140+
assertTrue("true update changes destination index", update.changesDestIndex(config));
141+
}
142+
129143
public void testApply() {
130144
TransformConfig config = new TransformConfig(
131145
"time-transform",

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.time.Instant;
4040
import java.util.HashMap;
4141
import java.util.Map;
42+
import java.util.Set;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.atomic.AtomicInteger;
4445

@@ -50,6 +51,7 @@
5051
import static org.hamcrest.Matchers.greaterThan;
5152
import static org.hamcrest.Matchers.hasKey;
5253
import static org.hamcrest.Matchers.hasSize;
54+
import static org.hamcrest.Matchers.in;
5355
import static org.hamcrest.Matchers.is;
5456
import static org.hamcrest.Matchers.lessThan;
5557
import static org.hamcrest.Matchers.not;
@@ -245,10 +247,8 @@ public void testDestinationIndexBlocked() throws Exception {
245247
assertAcknowledged(adminClient().performRequest(request));
246248

247249
// index more docs so the checkpoint tries to run, wait until transform stops
248-
assertBusy(() -> {
249-
indexDoc(42, sourceIndexName);
250-
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
251-
}, 30, TimeUnit.SECONDS);
250+
indexDoc(42, sourceIndexName);
251+
assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS);
252252

253253
// unblock index
254254
request = new Request("PUT", destIndexName + "/_settings");
@@ -266,6 +266,46 @@ public void testDestinationIndexBlocked() throws Exception {
266266
deleteTransform(transformId);
267267
}
268268

269+
public void testUnblockWithNewDestinationIndex() throws Exception {
270+
var transformId = "transform-continuous-unblock-destination";
271+
var sourceIndexName = "source-reviews";
272+
var destIndexName = "destination-reviews-old";
273+
var newDestIndexName = "destination-reviews-new";
274+
275+
// create transform & indices, wait until 1st checkpoint is finished
276+
createReviewsIndex(newDestIndexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
277+
createContinuousTransform(sourceIndexName, transformId, destIndexName);
278+
279+
// block destination index
280+
Request request = new Request("PUT", destIndexName + "/_block/write");
281+
assertAcknowledged(adminClient().performRequest(request));
282+
283+
// index more docs so the checkpoint tries to run, wait until transform stops
284+
indexDoc(42, sourceIndexName);
285+
assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS);
286+
287+
// change destination index
288+
var update = format("""
289+
{
290+
"description": "updated config",
291+
"dest": {
292+
"index": "%s"
293+
}
294+
}
295+
""", newDestIndexName);
296+
updateConfig(transformId, update, true, RequestOptions.DEFAULT);
297+
298+
assertBusy(() -> {
299+
assertThat(
300+
getTransformState(transformId),
301+
in(Set.of(TransformStats.State.STARTED.value(), TransformStats.State.INDEXING.value()))
302+
);
303+
}, 30, TimeUnit.SECONDS);
304+
305+
stopTransform(transformId);
306+
deleteTransform(transformId);
307+
}
308+
269309
public void testTransformLifecycleInALoop() throws Exception {
270310
String transformId = "lifecycle-in-a-loop";
271311
String indexName = transformId + "-src";

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ protected void createReviewsIndex(
437437
Request req = new Request("PUT", indexName);
438438
req.setEntity(indexMappings);
439439
req.setOptions(RequestOptions.DEFAULT);
440-
assertAcknowledged(adminClient().performRequest(req));
440+
assertOKAndConsume(adminClient().performRequest(req));
441441
}
442442

443443
// create index

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
171171

172172
boolean updateChangesSettings = update.changesSettings(originalConfig);
173173
boolean updateChangesHeaders = update.changesHeaders(originalConfig);
174-
if (updateChangesSettings || updateChangesHeaders) {
174+
boolean updateChangesDestIndex = update.changesDestIndex(originalConfig);
175+
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) {
175176
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
176177
request.getId(),
177178
clusterState
@@ -256,6 +257,7 @@ protected void taskOperation(
256257
) {
257258
transformTask.applyNewSettings(request.getConfig().getSettings());
258259
transformTask.applyNewAuthState(request.getAuthState());
260+
transformTask.checkAndResetDestinationIndexBlock(request.getConfig());
259261
listener.onResponse(new Response(request.getConfig()));
260262
}
261263

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
3636
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
3737
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
38+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
3839
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
3940
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
4041
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -415,6 +416,16 @@ public void applyNewAuthState(AuthorizationState authState) {
415416
}
416417
}
417418

419+
public void checkAndResetDestinationIndexBlock(TransformConfig config) {
420+
if (context.isWaitingForIndexToUnblock()) {
421+
var currentIndex = getIndexer() == null ? null : getIndexer().getConfig().getDestination().getIndex();
422+
var updatedIndex = config.getDestination().getIndex();
423+
if (updatedIndex.equals(currentIndex) == false) {
424+
context.setIsWaitingForIndexToUnblock(false);
425+
}
426+
}
427+
}
428+
418429
@Override
419430
protected void init(
420431
PersistentTasksService persistentTasksService,

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.xpack.core.indexing.IndexerState;
3737
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
3838
import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState;
39+
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
3940
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
4041
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
4142
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -45,6 +46,8 @@
4546
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
4647
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
4748
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
49+
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfigTests;
50+
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
4851
import org.elasticsearch.xpack.transform.DefaultTransformExtension;
4952
import org.elasticsearch.xpack.transform.TransformNode;
5053
import org.elasticsearch.xpack.transform.TransformServices;
@@ -553,6 +556,75 @@ public void testDeriveBasicCheckpointingInfoWithNoIndexer() {
553556
assertThat(checkpointingInfo, sameInstance(TransformCheckpointingInfo.EMPTY));
554557
}
555558

559+
public void testCheckAndResetDestinationIndexBlock() {
560+
var currentConfig = randomConfigForDestIndex("oldDestination");
561+
var indexer = mock(ClientTransformIndexer.class);
562+
when(indexer.getConfig()).thenReturn(currentConfig);
563+
564+
var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
565+
transformTask.initializeIndexer(indexer);
566+
567+
transformTask.getContext().setIsWaitingForIndexToUnblock(true);
568+
var updatedConfig = randomConfigForDestIndex("newDestination");
569+
570+
transformTask.checkAndResetDestinationIndexBlock(updatedConfig);
571+
572+
assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
573+
}
574+
575+
public void testCheckAndResetDestinationIndexBlock_NoChangeToDest() {
576+
var currentConfig = randomConfigForDestIndex("oldDestination");
577+
var indexer = mock(ClientTransformIndexer.class);
578+
when(indexer.getConfig()).thenReturn(currentConfig);
579+
580+
var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
581+
transformTask.initializeIndexer(indexer);
582+
583+
transformTask.getContext().setIsWaitingForIndexToUnblock(true);
584+
var updatedConfig = randomConfigForDestIndex("oldDestination");
585+
586+
transformTask.checkAndResetDestinationIndexBlock(updatedConfig);
587+
588+
assertTrue(transformTask.getContext().isWaitingForIndexToUnblock());
589+
}
590+
591+
public void testCheckAndResetDestinationIndexBlock_NotBlocked() {
592+
var currentConfig = randomConfigForDestIndex("oldDestination");
593+
var indexer = mock(ClientTransformIndexer.class);
594+
when(indexer.getConfig()).thenReturn(currentConfig);
595+
596+
var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
597+
transformTask.initializeIndexer(indexer);
598+
599+
var updatedConfig = randomConfigForDestIndex("newDestination");
600+
601+
transformTask.checkAndResetDestinationIndexBlock(updatedConfig);
602+
603+
assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
604+
}
605+
606+
public void testCheckAndResetDestinationIndexBlock_NullIndexer() {
607+
var currentConfig = randomConfigForDestIndex("oldDestination");
608+
var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
609+
transformTask.getContext().setIsWaitingForIndexToUnblock(true);
610+
611+
var updatedConfig = randomConfigForDestIndex("oldDestination");
612+
613+
transformTask.checkAndResetDestinationIndexBlock(updatedConfig);
614+
615+
assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
616+
}
617+
618+
private TransformConfig randomConfigForDestIndex(String indexName) {
619+
var pivotOrLatest = randomBoolean();
620+
return TransformConfigTests.randomTransformConfigWithoutHeaders(
621+
randomAlphaOfLengthBetween(1, 10),
622+
pivotOrLatest ? null : PivotConfigTests.randomPivotConfig(),
623+
pivotOrLatest ? LatestConfigTests.randomLatestConfig() : null,
624+
new DestConfig(indexName, null, null)
625+
);
626+
}
627+
556628
private TransformTask createTransformTask(TransformConfig transformConfig, MockTransformAuditor auditor) {
557629
var threadPool = mock(ThreadPool.class);
558630

0 commit comments

Comments
 (0)