Skip to content

Commit c0669ad

Browse files
committed
Merge remote-tracking branch 'origin/try1-limit-byte-sizes' into try1-limit-byte-sizes
2 parents b222fc0 + fa13069 commit c0669ad

File tree

14 files changed

+277
-25
lines changed

14 files changed

+277
-25
lines changed

docs/changelog/119542.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119542
2+
summary: Wait while index is blocked
3+
area: Transform
4+
type: enhancement
5+
issues: []

server/src/test/java/org/elasticsearch/index/mapper/blockloader/KeywordFieldBlockLoaderTests.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
import org.elasticsearch.index.mapper.BlockLoaderTestCase;
1414
import org.elasticsearch.logsdb.datageneration.FieldType;
1515

16+
import java.util.HashSet;
1617
import java.util.List;
1718
import java.util.Map;
1819
import java.util.Objects;
19-
import java.util.stream.Collectors;
20+
import java.util.function.Function;
21+
import java.util.stream.Stream;
2022

2123
public class KeywordFieldBlockLoaderTests extends BlockLoaderTestCase {
2224
public KeywordFieldBlockLoaderTests() {
@@ -29,25 +31,30 @@ protected Object expected(Map<String, Object> fieldMapping, Object value, boolea
2931
if (value == null) {
3032
return null;
3133
}
34+
35+
var ignoreAbove = fieldMapping.get("ignore_above") == null
36+
? Integer.MAX_VALUE
37+
: ((Number) fieldMapping.get("ignore_above")).intValue();
38+
3239
if (value instanceof String s) {
33-
return convert(s);
40+
return convert(s, ignoreAbove);
3441
}
3542

36-
var nonNullStream = ((List<String>) value).stream().filter(Objects::nonNull);
43+
Function<Stream<String>, Stream<BytesRef>> convertValues = s -> s.map(v -> convert(v, ignoreAbove)).filter(Objects::nonNull);
3744

3845
if ((boolean) fieldMapping.getOrDefault("doc_values", false)) {
3946
// Sorted and no duplicates
40-
return maybeFoldList(nonNullStream.collect(Collectors.toSet()).stream().sorted().map(this::convert).toList());
41-
}
4247

43-
if ((boolean) fieldMapping.getOrDefault("store", false)) {
44-
return maybeFoldList(nonNullStream.map(this::convert).toList());
48+
var values = new HashSet<>((List<String>) value);
49+
var resultList = convertValues.compose(s -> values.stream().filter(Objects::nonNull).sorted())
50+
.andThen(Stream::toList)
51+
.apply(values.stream());
52+
return maybeFoldList(resultList);
4553
}
4654

47-
// Using source (either stored or synthetic).
48-
// Original order is preserved and values longer than ignore_above are returned.
49-
// TODO actual ignore_above support in data generation
50-
return maybeFoldList(nonNullStream.map(this::convert).toList());
55+
// store: "true" and source
56+
var resultList = convertValues.andThen(Stream::toList).apply(((List<String>) value).stream());
57+
return maybeFoldList(resultList);
5158
}
5259

5360
private Object maybeFoldList(List<?> list) {
@@ -62,7 +69,11 @@ private Object maybeFoldList(List<?> list) {
6269
return list;
6370
}
6471

65-
private BytesRef convert(String value) {
66-
return new BytesRef(value);
72+
private BytesRef convert(String value, int ignoreAbove) {
73+
if (value == null) {
74+
return null;
75+
}
76+
77+
return value.length() <= ignoreAbove ? new BytesRef(value) : null;
6778
}
6879
}

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DefaultMappingParametersHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private Supplier<Map<String, Object>> keywordMapping(
6666
}
6767

6868
if (ESTestCase.randomDouble() <= 0.2) {
69-
injected.put("ignore_above", ESTestCase.randomIntBetween(1, 10000));
69+
injected.put("ignore_above", ESTestCase.randomIntBetween(1, 100));
7070
}
7171

7272
return injected;

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.settings.ClusterSettings;
1818
import org.elasticsearch.common.settings.IndexScopedSettings;
19+
import org.elasticsearch.common.settings.Setting;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.settings.SettingsFilter;
2122
import org.elasticsearch.common.settings.SettingsModule;
@@ -58,6 +59,7 @@
5859
import java.util.function.Supplier;
5960

6061
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
62+
import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING;
6163

6264
public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
6365

@@ -153,4 +155,11 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
153155
return List.of();
154156
}
155157
}
158+
159+
@Override
160+
public List<Setting<?>> getSettings() {
161+
List<Setting<?>> pluginSettings = new ArrayList<>();
162+
pluginSettings.add(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
163+
return pluginSettings;
164+
}
156165
}

x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.migrate.task;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.ElasticsearchException;
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.cluster.metadata.DataStream;
2022
import org.elasticsearch.cluster.metadata.DataStreamAction;
2123
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.settings.Setting;
2225
import org.elasticsearch.core.Nullable;
2326
import org.elasticsearch.core.TimeValue;
2427
import org.elasticsearch.index.Index;
@@ -39,6 +42,19 @@
3942
import static org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate.getReindexRequiredPredicate;
4043

4144
public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
45+
/*
46+
* This setting controls how many indices we reindex concurrently for a single data stream. This is not an overall limit -- if five
47+
* data streams are being reindexed, then each of them can have this many indices being reindexed at once. This setting is dynamic,
48+
* but changing it does not have an impact if the task is already running (unless the task is restarted or moves to another node).
49+
*/
50+
public static final Setting<Integer> MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING = Setting.intSetting(
51+
"migrate.max_concurrent_indices_reindexed_per_data_stream",
52+
1,
53+
1,
54+
Setting.Property.Dynamic,
55+
Setting.Property.NodeScope
56+
);
57+
private static final Logger logger = LogManager.getLogger(ReindexDataStreamPersistentTaskExecutor.class);
4258
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
4359
private final Client client;
4460
private final ClusterService clusterService;
@@ -165,8 +181,9 @@ private void reindexIndices(
165181
CountDownActionListener listener = new CountDownActionListener(indicesToBeReindexed.size() + 1, ActionListener.wrap(response1 -> {
166182
completeSuccessfulPersistentTask(reindexDataStreamTask, updatedState);
167183
}, exception -> { completeFailedPersistentTask(reindexDataStreamTask, updatedState, exception); }));
184+
final int maxConcurrentIndices = clusterService.getClusterSettings().get(MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING);
168185
List<Index> indicesRemaining = Collections.synchronizedList(new ArrayList<>(indicesToBeReindexed));
169-
final int maxConcurrentIndices = 1;
186+
logger.debug("Reindexing {} indices, with up to {} handled concurrently", indicesRemaining.size(), maxConcurrentIndices);
170187
for (int i = 0; i < maxConcurrentIndices; i++) {
171188
maybeProcessNextIndex(indicesRemaining, reindexDataStreamTask, reindexClient, sourceDataStream, listener, parentTaskId);
172189
}

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
2929
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
3030
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
31+
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
3132
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
3233
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
3334
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
@@ -152,7 +153,7 @@ public void testBasicTransformStats() throws Exception {
152153
public void testContinuousTransformCrud() throws Exception {
153154
var transformId = "transform-continuous-crud";
154155
var indexName = "continuous-crud-reviews";
155-
createContinuousTransform(indexName, transformId);
156+
createContinuousTransform(indexName, transformId, "reviews-by-user-business-day");
156157
var transformStats = getBasicTransformStats(transformId);
157158
assertThat(transformStats.get("state"), equalTo("started"));
158159

@@ -181,7 +182,7 @@ public void testContinuousTransformCrud() throws Exception {
181182
deleteTransform(transformId);
182183
}
183184

184-
private void createContinuousTransform(String indexName, String transformId) throws Exception {
185+
private void createContinuousTransform(String indexName, String transformId, String destinationIndex) throws Exception {
185186
createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
186187

187188
var groups = Map.of(
@@ -197,8 +198,9 @@ private void createContinuousTransform(String indexName, String transformId) thr
197198
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
198199
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
199200

200-
var config = createTransformConfigBuilder(transformId, "reviews-by-user-business-day", QueryConfig.matchAll(), indexName)
201-
.setPivotConfig(createPivotConfig(groups, aggs))
201+
var config = createTransformConfigBuilder(transformId, destinationIndex, QueryConfig.matchAll(), indexName).setPivotConfig(
202+
createPivotConfig(groups, aggs)
203+
)
202204
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
203205
.setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).build())
204206
.build();
@@ -216,7 +218,7 @@ private void createContinuousTransform(String indexName, String transformId) thr
216218
@SuppressWarnings("unchecked")
217219
public void testBasicContinuousTransformStats() throws Exception {
218220
var transformId = "transform-continuous-basic-stats";
219-
createContinuousTransform("continuous-basic-stats-reviews", transformId);
221+
createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
220222
var transformStats = getBasicTransformStats(transformId);
221223

222224
assertEquals("started", XContentMapValues.extractValue("state", transformStats));
@@ -230,6 +232,40 @@ public void testBasicContinuousTransformStats() throws Exception {
230232
deleteTransform(transformId);
231233
}
232234

235+
public void testDestinationIndexBlocked() throws Exception {
236+
var transformId = "transform-continuous-blocked-destination";
237+
var sourceIndexName = "source-reviews";
238+
var destIndexName = "destination-reviews";
239+
240+
// create transform & indices, wait until 1st checkpoint is finished
241+
createContinuousTransform(sourceIndexName, transformId, destIndexName);
242+
243+
// block destination index
244+
Request request = new Request("PUT", destIndexName + "/_block/write");
245+
assertAcknowledged(adminClient().performRequest(request));
246+
247+
// index more docs so the checkpoint tries to run, wait until transform stops
248+
assertBusy(() -> {
249+
indexDoc(42, sourceIndexName);
250+
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
251+
}, 30, TimeUnit.SECONDS);
252+
253+
// unblock index
254+
request = new Request("PUT", destIndexName + "/_settings");
255+
request.setJsonEntity("""
256+
{ "blocks.write": false }
257+
""");
258+
assertAcknowledged(adminClient().performRequest(request));
259+
260+
assertBusy(() -> {
261+
indexDoc(42, sourceIndexName);
262+
assertEquals(TransformStats.State.STARTED.value(), getTransformState(transformId));
263+
}, 30, TimeUnit.SECONDS);
264+
265+
stopTransform(transformId);
266+
deleteTransform(transformId);
267+
}
268+
233269
public void testTransformLifecycleInALoop() throws Exception {
234270
String transformId = "lifecycle-in-a-loop";
235271
String indexName = transformId + "-src";
@@ -652,4 +688,17 @@ private void indexMoreDocs(long timestamp, long userId, String index) throws Exc
652688
bulkBuilder.append("\r\n");
653689
doBulk(bulkBuilder.toString(), true);
654690
}
691+
692+
private void indexDoc(long userId, String index) throws Exception {
693+
StringBuilder bulkBuilder = new StringBuilder();
694+
bulkBuilder.append(format("""
695+
{"create":{"_index":"%s"}}
696+
""", index));
697+
String source = format("""
698+
{"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s}
699+
""", userId, 1, 2, 5, Instant.now().toEpochMilli());
700+
bulkBuilder.append(source);
701+
bulkBuilder.append("\r\n");
702+
doBulk(bulkBuilder.toString(), true);
703+
}
655704
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ static TransformStats deriveStats(TransformTask transformTask, @Nullable Transfo
270270
&& derivedState.equals(TransformStats.State.FAILED) == false) {
271271
derivedState = TransformStats.State.STOPPING;
272272
reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason;
273+
} else if (derivedState.equals(TransformStats.State.STARTED) && transformTask.getContext().isWaitingForIndexToUnblock()) {
274+
derivedState = TransformStats.State.WAITING;
275+
reason = Strings.isNullOrEmpty(reason) ? "transform is paused while destination index is blocked" : reason;
273276
}
274277
return new TransformStats(
275278
transformTask.getTransformId(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportScheduleNowTransformAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ protected void taskOperation(
128128
TransformTask transformTask,
129129
ActionListener<Response> listener
130130
) {
131+
if (transformTask.getContext().isWaitingForIndexToUnblock()) {
132+
logger.debug("[{}] Destination index is blocked. User requested a retry.", transformTask.getTransformId());
133+
transformTask.getContext().setIsWaitingForIndexToUnblock(false);
134+
}
131135
transformScheduler.scheduleNow(request.getId());
132136
listener.onResponse(Response.TRUE);
133137
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
2626
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
2727
import org.elasticsearch.action.search.TransportSearchAction;
28+
import org.elasticsearch.action.support.IndicesOptions;
2829
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2930
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
31+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3032
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3133
import org.elasticsearch.cluster.service.ClusterService;
3234
import org.elasticsearch.common.bytes.BytesReference;
@@ -443,9 +445,36 @@ public boolean maybeTriggerAsyncJob(long now) {
443445
logger.debug("[{}] schedule was triggered but the Transform is upgrading. Ignoring trigger.", getJobId());
444446
return false;
445447
}
448+
if (context.isWaitingForIndexToUnblock()) {
449+
if (destinationIndexHasWriteBlock()) {
450+
logger.debug("[{}] schedule was triggered but the destination index has a write block. Ignoring trigger.", getJobId());
451+
return false;
452+
}
453+
logger.debug("[{}] destination index is no longer blocked.", getJobId());
454+
context.setIsWaitingForIndexToUnblock(false);
455+
}
456+
446457
return super.maybeTriggerAsyncJob(now);
447458
}
448459

460+
private boolean destinationIndexHasWriteBlock() {
461+
var clusterState = clusterService.state();
462+
if (clusterState == null) {
463+
// if we can't determine if the index is blocked, we assume it isn't, even though the bulk request may fail again
464+
return false;
465+
}
466+
467+
var destinationIndexName = transformConfig.getDestination().getIndex();
468+
var destinationIndex = indexNameExpressionResolver.concreteWriteIndex(
469+
clusterState,
470+
IndicesOptions.lenientExpandOpen(),
471+
destinationIndexName,
472+
true,
473+
false
474+
);
475+
return destinationIndex != null && clusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, destinationIndex.getName());
476+
}
477+
449478
@Override
450479
protected void onStop() {
451480
closePointInTime();

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ public interface Listener {
4949
private volatile AuthorizationState authState;
5050
private volatile int pageSize = 0;
5151

52+
/**
53+
* If the destination index is blocked (e.g. during a reindex), the Transform will fail to write to it.
54+
* {@link TransformFailureHandler} will silence the error so the Transform automatically retries.
55+
* Every time the Transform runs, it will check if the index is unblocked and reset this to false.
56+
* Users can override this via the `_schedule_now` API.
57+
*/
58+
private volatile boolean isWaitingForIndexToUnblock = false;
59+
5260
// the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_
5361
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
5462
private final AtomicLong currentCheckpoint;
@@ -183,6 +191,14 @@ public void setShouldRecreateDestinationIndex(boolean shouldRecreateDestinationI
183191
this.shouldRecreateDestinationIndex = shouldRecreateDestinationIndex;
184192
}
185193

194+
public boolean isWaitingForIndexToUnblock() {
195+
return isWaitingForIndexToUnblock;
196+
}
197+
198+
public void setIsWaitingForIndexToUnblock(boolean isWaitingForIndexToUnblock) {
199+
this.isWaitingForIndexToUnblock = isWaitingForIndexToUnblock;
200+
}
201+
186202
public AuthorizationState getAuthState() {
187203
return authState;
188204
}

0 commit comments

Comments
 (0)