Skip to content

Commit a3e65e7

Browse files
authored
[Transform] Wait while index is blocked (#119542) (#119803)
When the destination index is blocked, the bulk request is ignored. Each subsequent trigger will check the cluster state if the index is unblocked or else drop the trigger. Users can call the ScheduleNow API to skip the index block check and let the trigger start the next checkpoint - if the index is blocked then the bulk request will fail again. While the Transform is skipping triggers, the state will display as `waiting` from the Get Stats API and on the Transform UI in Kibana.
1 parent bba1fff commit a3e65e7

File tree

10 files changed

+225
-10
lines changed

10 files changed

+225
-10
lines changed

docs/changelog/119542.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119542
2+
summary: Wait while index is blocked
3+
area: Transform
4+
type: enhancement
5+
issues: []

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
2929
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
3030
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
31+
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
3132
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
3233
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
3334
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
@@ -152,7 +153,7 @@ public void testBasicTransformStats() throws Exception {
152153
public void testContinuousTransformCrud() throws Exception {
153154
var transformId = "transform-continuous-crud";
154155
var indexName = "continuous-crud-reviews";
155-
createContinuousTransform(indexName, transformId);
156+
createContinuousTransform(indexName, transformId, "reviews-by-user-business-day");
156157
var transformStats = getBasicTransformStats(transformId);
157158
assertThat(transformStats.get("state"), equalTo("started"));
158159

@@ -181,7 +182,7 @@ public void testContinuousTransformCrud() throws Exception {
181182
deleteTransform(transformId);
182183
}
183184

184-
private void createContinuousTransform(String indexName, String transformId) throws Exception {
185+
private void createContinuousTransform(String indexName, String transformId, String destinationIndex) throws Exception {
185186
createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
186187

187188
var groups = Map.of(
@@ -197,8 +198,9 @@ private void createContinuousTransform(String indexName, String transformId) thr
197198
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
198199
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
199200

200-
var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName)
201-
.setPivotConfig(createPivotConfig(groups, aggs))
201+
var config = createTransformConfigBuilder(transformId, destinationIndex, QueryConfig.matchAll(), indexName).setPivotConfig(
202+
createPivotConfig(groups, aggs)
203+
)
202204
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
203205
.setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build())
204206
.build();
@@ -216,7 +218,7 @@ private void createContinuousTransform(String indexName, String transformId) thr
216218
@SuppressWarnings("unchecked")
217219
public void testBasicContinuousTransformStats() throws Exception {
218220
var transformId = "transform-continuous-basic-stats";
219-
createContinuousTransform("continuous-basic-stats-reviews", transformId);
221+
createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
220222
var transformStats = getBasicTransformStats(transformId);
221223

222224
assertEquals("started", XContentMapValues.extractValue("state", transformStats));
@@ -230,6 +232,40 @@ public void testBasicContinuousTransformStats() throws Exception {
230232
deleteTransform(transformId);
231233
}
232234

235+
public void testDestinationIndexBlocked() throws Exception {
236+
var transformId = "transform-continuous-blocked-destination";
237+
var sourceIndexName = "source-reviews";
238+
var destIndexName = "destination-reviews";
239+
240+
// create transform & indices, wait until 1st checkpoint is finished
241+
createContinuousTransform(sourceIndexName, transformId, destIndexName);
242+
243+
// block destination index
244+
Request request = new Request("PUT", destIndexName + "/_block/write");
245+
assertAcknowledged(adminClient().performRequest(request));
246+
247+
// index more docs so the checkpoint tries to run, wait until transform stops
248+
assertBusy(() -> {
249+
indexDoc(42, sourceIndexName);
250+
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
251+
}, 30, TimeUnit.SECONDS);
252+
253+
// unblock index
254+
request = new Request("PUT", destIndexName + "/_settings");
255+
request.setJsonEntity("""
256+
{ "blocks.write": false }
257+
""");
258+
assertAcknowledged(adminClient().performRequest(request));
259+
260+
assertBusy(() -> {
261+
indexDoc(42, sourceIndexName);
262+
assertEquals(TransformStats.State.STARTED.value(), getTransformState(transformId));
263+
}, 30, TimeUnit.SECONDS);
264+
265+
stopTransform(transformId);
266+
deleteTransform(transformId);
267+
}
268+
233269
public void testTransformLifecycleInALoop() throws Exception {
234270
String transformId = "lifecycle-in-a-loop";
235271
String indexName = transformId + "-src";
@@ -652,4 +688,17 @@ private void indexMoreDocs(long timestamp, long userId, String index) throws Exc
652688
bulkBuilder.append("\r\n");
653689
doBulk(bulkBuilder.toString(), true);
654690
}
691+
692+
private void indexDoc(long userId, String index) throws Exception {
693+
StringBuilder bulkBuilder = new StringBuilder();
694+
bulkBuilder.append(format("""
695+
{"create":{"_index":"%s"}}
696+
""", index));
697+
String source = format("""
698+
{"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s}
699+
""", userId, 1, 2, 5, Instant.now().toEpochMilli());
700+
bulkBuilder.append(source);
701+
bulkBuilder.append("\r\n");
702+
doBulk(bulkBuilder.toString(), true);
703+
}
655704
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ static TransformStats deriveStats(TransformTask transformTask, @Nullable Transfo
270270
&& derivedState.equals(TransformStats.State.FAILED) == false) {
271271
derivedState = TransformStats.State.STOPPING;
272272
reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason;
273+
} else if (derivedState.equals(TransformStats.State.STARTED) && transformTask.getContext().isWaitingForIndexToUnblock()) {
274+
derivedState = TransformStats.State.WAITING;
275+
reason = Strings.isNullOrEmpty(reason) ? "transform is paused while destination index is blocked" : reason;
273276
}
274277
return new TransformStats(
275278
transformTask.getTransformId(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ protected void taskOperation(
128128
TransformTask transformTask,
129129
ActionListener<Response> listener
130130
) {
131+
if (transformTask.getContext().isWaitingForIndexToUnblock()) {
132+
logger.debug("[{}] Destination index is blocked. User requested a retry.", transformTask.getTransformId());
133+
transformTask.getContext().setIsWaitingForIndexToUnblock(false);
134+
}
131135
transformScheduler.scheduleNow(request.getId());
132136
listener.onResponse(Response.TRUE);
133137
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
2626
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
2727
import org.elasticsearch.action.search.TransportSearchAction;
28+
import org.elasticsearch.action.support.IndicesOptions;
2829
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2930
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
31+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3032
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3133
import org.elasticsearch.cluster.service.ClusterService;
3234
import org.elasticsearch.common.bytes.BytesReference;
@@ -443,9 +445,36 @@ public boolean maybeTriggerAsyncJob(long now) {
443445
logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId());
444446
return false;
445447
}
448+
if (context.isWaitingForIndexToUnblock()) {
449+
if (destinationIndexHasWriteBlock()) {
450+
logger.debug("[{}] schedule was triggered but the destination index has a write block. Ignoring trigger.", getJobId());
451+
return false;
452+
}
453+
logger.debug("[{}] destination index is no longer blocked.", getJobId());
454+
context.setIsWaitingForIndexToUnblock(false);
455+
}
456+
446457
return super.maybeTriggerAsyncJob(now);
447458
}
448459

460+
private boolean destinationIndexHasWriteBlock() {
461+
var clusterState = clusterService.state();
462+
if (clusterState == null) {
463+
// if we can't determine if the index is blocked, we assume it isn't, even though the bulk request may fail again
464+
return false;
465+
}
466+
467+
var destinationIndexName = transformConfig.getDestination().getIndex();
468+
var destinationIndex = indexNameExpressionResolver.concreteWriteIndex(
469+
clusterState,
470+
IndicesOptions.lenientExpandOpen(),
471+
destinationIndexName,
472+
true,
473+
false
474+
);
475+
return destinationIndex != null && clusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, destinationIndex.getName());
476+
}
477+
449478
@Override
450479
protected void onStop() {
451480
closePointInTime();

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ public interface Listener {
4949
private volatile AuthorizationState authState;
5050
private volatile int pageSize = 0;
5151

52+
/**
53+
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
54+
* {@link TransformFailureHandler} will silence the error so the Transform automatically retries.
55+
* Every time the Transform runs, it will check if the index is unblocked and reset this to false.
56+
* Users can override this via the `_schedule_now` API.
57+
*/
58+
private volatile boolean isWaitingForIndexToUnblock = false;
59+
5260
// the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_
5361
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
5462
private final AtomicLong currentCheckpoint;
@@ -183,6 +191,14 @@ public void setShouldRecreateDestinationIndex(boolean shouldRecreateDestinationI
183191
this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex;
184192
}
185193

194+
public boolean isWaitingForIndexToUnblock() {
195+
return isWaitingForIndexToUnblock;
196+
}
197+
198+
public void setIsWaitingForIndexToUnblock(boolean isWaitingForIndexToUnblock) {
199+
this.isWaitingForIndexToUnblock = isWaitingForIndexToUnblock;
200+
}
201+
186202
public AuthorizationState getAuthState() {
187203
return authState;
188204
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ private void handleScriptException(ScriptException scriptException, boolean unat
170170
*/
171171
private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
172172
if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
173+
context.setIsWaitingForIndexToUnblock(true);
173174
retryWithoutIncrementingFailureCount(
174175
bulkIndexingException,
175176
bulkIndexingException.getDetailedMessage(),

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,46 @@ public void testDeriveStats() {
343343
);
344344
}
345345

346+
public void testDeriveStatsWithIndexBlock() {
347+
String transformId = "transform-with-stats";
348+
String reason = "transform is paused while destination index is blocked";
349+
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
350+
TransformState runningState = new TransformState(
351+
TransformTaskState.STARTED,
352+
IndexerState.STARTED,
353+
null,
354+
0,
355+
null,
356+
null,
357+
null,
358+
false,
359+
null
360+
);
361+
362+
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());
363+
context.setIsWaitingForIndexToUnblock(true);
364+
var task = mock(TransformTask.class);
365+
when(task.getContext()).thenReturn(context);
366+
when(task.getTransformId()).thenReturn(transformId);
367+
when(task.getState()).thenReturn(runningState);
368+
when(task.getStats()).thenReturn(stats);
369+
370+
assertThat(
371+
TransportGetTransformStatsAction.deriveStats(task, null),
372+
equalTo(
373+
new TransformStats(
374+
transformId,
375+
TransformStats.State.WAITING,
376+
reason,
377+
null,
378+
stats,
379+
TransformCheckpointingInfo.EMPTY,
380+
TransformHealth.GREEN
381+
)
382+
)
383+
);
384+
}
385+
346386
private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
347387
when(task.getTransformId()).thenReturn(transformId);
348388
when(task.getState()).thenReturn(state);

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,18 @@
2222
import org.elasticsearch.action.search.ShardSearchFailure;
2323
import org.elasticsearch.action.support.ActionTestUtils;
2424
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
25+
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
27+
import org.elasticsearch.cluster.block.ClusterBlocks;
2528
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
29+
import org.elasticsearch.cluster.metadata.Metadata;
2630
import org.elasticsearch.cluster.service.ClusterService;
2731
import org.elasticsearch.common.bytes.BytesArray;
2832
import org.elasticsearch.common.bytes.CompositeBytesReference;
2933
import org.elasticsearch.common.settings.Settings;
3034
import org.elasticsearch.core.TimeValue;
3135
import org.elasticsearch.core.Tuple;
36+
import org.elasticsearch.index.Index;
3237
import org.elasticsearch.index.IndexNotFoundException;
3338
import org.elasticsearch.search.SearchContextMissingException;
3439
import org.elasticsearch.search.SearchHit;
@@ -44,6 +49,7 @@
4449
import org.elasticsearch.threadpool.ThreadPool;
4550
import org.elasticsearch.transport.ActionNotFoundTransportException;
4651
import org.elasticsearch.xpack.core.indexing.IndexerState;
52+
import org.elasticsearch.xpack.core.transform.TransformMetadata;
4753
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
4854
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
4955
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -53,6 +59,7 @@
5359
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
5460
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
5561
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
62+
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
5663
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
5764
import org.elasticsearch.xpack.transform.TransformExtension;
5865
import org.elasticsearch.xpack.transform.TransformNode;
@@ -77,6 +84,10 @@
7784
import java.util.function.Consumer;
7885
import java.util.stream.IntStream;
7986

87+
import static org.mockito.ArgumentMatchers.any;
88+
import static org.mockito.ArgumentMatchers.anyBoolean;
89+
import static org.mockito.ArgumentMatchers.anyString;
90+
import static org.mockito.ArgumentMatchers.eq;
8091
import static org.mockito.Mockito.mock;
8192
import static org.mockito.Mockito.when;
8293

@@ -467,6 +478,53 @@ public void testHandlePitIndexNotFound() throws InterruptedException {
467478
}
468479
}
469480

481+
public void testIndexBlocked() {
482+
var service = serviceWithBlockCheck(true);
483+
var context = new TransformContext(TransformTaskState.STARTED, "", 0, mock());
484+
485+
var indexer = createTestIndexer(mock(), service, resolver(), context);
486+
context.setIsWaitingForIndexToUnblock(true);
487+
488+
assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
489+
assertTrue(context.isWaitingForIndexToUnblock());
490+
}
491+
492+
public void testIndexUnblocked() {
493+
var service = serviceWithBlockCheck(false);
494+
// set state to failed so that TransformIndexer returns false
495+
var context = new TransformContext(TransformTaskState.FAILED, "", 0, mock());
496+
497+
var indexer = createTestIndexer(mock(), service, resolver(), context);
498+
context.setIsWaitingForIndexToUnblock(true);
499+
500+
assertFalse(indexer.maybeTriggerAsyncJob(Instant.now().toEpochMilli()));
501+
// ClientTransformIndexer's maybeTriggerAsyncJob should reset isWaitingForIndexToUnblock to false
502+
assertFalse(context.isWaitingForIndexToUnblock());
503+
}
504+
505+
private ClusterService serviceWithBlockCheck(boolean checkResponse) {
506+
var clusterBlocks = mock(ClusterBlocks.class);
507+
when(clusterBlocks.indexBlocked(eq(ClusterBlockLevel.WRITE), anyString())).thenReturn(checkResponse);
508+
var metadata = mock(Metadata.class);
509+
when(metadata.custom(eq(TransformMetadata.TYPE))).thenReturn(TransformMetadata.EMPTY_METADATA);
510+
var clusterState = mock(ClusterState.class);
511+
when(clusterState.blocks()).thenReturn(clusterBlocks);
512+
when(clusterState.getMetadata()).thenReturn(metadata);
513+
var clusterService = mock(ClusterService.class);
514+
when(clusterService.state()).thenReturn(clusterState);
515+
return clusterService;
516+
}
517+
518+
private IndexNameExpressionResolver resolver() {
519+
var resolver = mock(IndexNameExpressionResolver.class);
520+
when(resolver.concreteWriteIndex(any(), any(), any(), anyBoolean(), anyBoolean())).thenAnswer(ans -> {
521+
Index destIndex = mock();
522+
when(destIndex.getName()).thenReturn(ans.getArgument(2));
523+
return destIndex;
524+
});
525+
return resolver;
526+
}
527+
470528
private static class MockClientTransformIndexer extends ClientTransformIndexer {
471529

472530
MockClientTransformIndexer(
@@ -627,13 +685,22 @@ private ClientTransformIndexer createTestIndexer() {
627685
}
628686

629687
private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient client) {
688+
return createTestIndexer(client, mock(), mock(), mock(TransformContext.class));
689+
}
690+
691+
private ClientTransformIndexer createTestIndexer(
692+
ParentTaskAssigningClient client,
693+
ClusterService service,
694+
IndexNameExpressionResolver resolver,
695+
TransformContext context
696+
) {
630697
ThreadPool threadPool = mock(ThreadPool.class);
631698
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
632699

633700
return new ClientTransformIndexer(
634701
mock(ThreadPool.class),
635-
mock(ClusterService.class),
636-
mock(IndexNameExpressionResolver.class),
702+
service,
703+
resolver,
637704
mock(TransformExtension.class),
638705
new TransformServices(
639706
mock(IndexBasedTransformConfigManager.class),
@@ -652,7 +719,7 @@ private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient clien
652719
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
653720
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
654721
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
655-
mock(TransformContext.class),
722+
context,
656723
false
657724
);
658725
}

0 commit comments

Comments
 (0)