Skip to content

Commit c97cccb

Browse files
authored
Using BulkProcessor2 in RollupShardIndexer (#94197)
In #91238 we rewrote BulkProcessor to avoid deadlock that had been seen in the IlmHistoryStore. This PR ports TSDB downsampling over to the new BulkProcessor2 implementation.
1 parent 18bd1cd commit c97cccb

File tree

3 files changed

+77
-15
lines changed

3 files changed

+77
-15
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.threadpool.Scheduler;
2424
import org.elasticsearch.threadpool.ThreadPool;
2525

26+
import java.io.Closeable;
2627
import java.util.Objects;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +40,7 @@
3940
* <p>
4041
* In order to create a new bulk processor, use the {@link Builder}.
4142
*/
42-
public class BulkProcessor2 {
43+
public class BulkProcessor2 implements Closeable {
4344

4445
/**
4546
* A listener for the execution.

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
import org.apache.lucene.search.ScoreMode;
1414
import org.apache.lucene.util.BytesRef;
1515
import org.elasticsearch.ElasticsearchException;
16-
import org.elasticsearch.action.bulk.BackoffPolicy;
1716
import org.elasticsearch.action.bulk.BulkItemResponse;
18-
import org.elasticsearch.action.bulk.BulkProcessor;
17+
import org.elasticsearch.action.bulk.BulkProcessor2;
1918
import org.elasticsearch.action.bulk.BulkRequest;
2019
import org.elasticsearch.action.bulk.BulkResponse;
2120
import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -75,6 +74,7 @@ class RollupShardIndexer {
7574
private static final Logger logger = LogManager.getLogger(RollupShardIndexer.class);
7675
public static final int ROLLUP_BULK_ACTIONS = 10000;
7776
public static final ByteSizeValue ROLLUP_BULK_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB);
77+
public static final ByteSizeValue ROLLUP_MAX_BYTES_IN_FLIGHT = new ByteSizeValue(50, ByteSizeUnit.MB);
7878

7979
private final IndexShard indexShard;
8080
private final Client client;
@@ -87,6 +87,8 @@ class RollupShardIndexer {
8787
private final List<FieldValueFetcher> fieldValueFetchers;
8888
private final RollupShardTask task;
8989
private volatile boolean abort = false;
90+
ByteSizeValue rollupBulkSize = ROLLUP_BULK_SIZE;
91+
ByteSizeValue rollupMaxBytesInFlight = ROLLUP_MAX_BYTES_IN_FLIGHT;
9092

9193
RollupShardIndexer(
9294
RollupShardTask task,
@@ -129,7 +131,7 @@ class RollupShardIndexer {
129131

130132
public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOException {
131133
long startTime = System.currentTimeMillis();
132-
BulkProcessor bulkProcessor = createBulkProcessor();
134+
BulkProcessor2 bulkProcessor = createBulkProcessor();
133135
try (searcher; bulkProcessor) {
134136
final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled));
135137
TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor);
@@ -160,6 +162,18 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
160162
);
161163
}
162164

165+
if (task.getNumFailed() > 0) {
166+
throw new ElasticsearchException(
167+
"Shard ["
168+
+ indexShard.shardId()
169+
+ "] failed to index all rollup documents. Sent ["
170+
+ task.getNumSent()
171+
+ "], failed ["
172+
+ task.getNumFailed()
173+
+ "]."
174+
);
175+
}
176+
163177
return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
164178
}
165179

@@ -176,8 +190,8 @@ private void checkCancelled() {
176190
}
177191
}
178192

179-
private BulkProcessor createBulkProcessor() {
180-
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
193+
private BulkProcessor2 createBulkProcessor() {
194+
final BulkProcessor2.Listener listener = new BulkProcessor2.Listener() {
181195
@Override
182196
public void beforeBulk(long executionId, BulkRequest request) {
183197
task.addNumSent(request.numberOfActions());
@@ -206,7 +220,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
206220
}
207221

208222
@Override
209-
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
223+
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
210224
if (failure != null) {
211225
long items = request.numberOfActions();
212226
task.addNumFailed(items);
@@ -218,24 +232,23 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
218232
}
219233
};
220234

221-
return BulkProcessor.builder(client::bulk, listener, "rollup-shard-indexer")
235+
return BulkProcessor2.builder(client::bulk, listener, client.threadPool())
222236
.setBulkActions(ROLLUP_BULK_ACTIONS)
223237
.setBulkSize(ROLLUP_BULK_SIZE)
224-
// execute the bulk request on the same thread
225-
.setConcurrentRequests(0)
226-
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
238+
.setMaxBytesInFlight(rollupMaxBytesInFlight)
239+
.setMaxNumberOfRetries(3)
227240
.build();
228241
}
229242

230243
private class TimeSeriesBucketCollector extends BucketCollector {
231-
private final BulkProcessor bulkProcessor;
244+
private final BulkProcessor2 bulkProcessor;
232245
private final RollupBucketBuilder rollupBucketBuilder;
233246
private long docsProcessed;
234247
private long bucketsCreated;
235248
long lastTimestamp = Long.MAX_VALUE;
236249
long lastHistoTimestamp = Long.MAX_VALUE;
237250

238-
TimeSeriesBucketCollector(BulkProcessor bulkProcessor) {
251+
TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor) {
239252
this.bulkProcessor = bulkProcessor;
240253
List<AbstractDownsampleFieldProducer> rollupFieldProducers = fieldValueFetchers.stream()
241254
.map(FieldValueFetcher::rollupFieldProducer)
@@ -336,7 +349,7 @@ private void indexBucket(XContentBuilder doc) {
336349
if (logger.isTraceEnabled()) {
337350
logger.trace("Indexing rollup doc: [{}]", Strings.toString(doc));
338351
}
339-
bulkProcessor.add(request.request());
352+
bulkProcessor.addWithBackpressure(request.request(), () -> abort);
340353
}
341354

342355
@Override
@@ -352,7 +365,6 @@ public void postCollection() throws IOException {
352365
XContentBuilder doc = rollupBucketBuilder.buildRollupDocument();
353366
indexBucket(doc);
354367
}
355-
bulkProcessor.flush();
356368

357369
// check cancel after the flush all data
358370
checkCancelled();

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.network.NetworkAddress;
3636
import org.elasticsearch.common.settings.Settings;
3737
import org.elasticsearch.common.time.DateFormatter;
38+
import org.elasticsearch.common.unit.ByteSizeValue;
3839
import org.elasticsearch.datastreams.DataStreamsPlugin;
3940
import org.elasticsearch.index.Index;
4041
import org.elasticsearch.index.IndexMode;
@@ -650,6 +651,54 @@ public void testRollupBulkFailed() throws IOException {
650651
assertThat(exception.getMessage(), equalTo("Unable to rollup index [" + sourceIndex + "]"));
651652
}
652653

654+
public void testTooManyBytesInFlight() throws IOException {
655+
// create rollup config and index documents into source index
656+
DownsampleConfig config = new DownsampleConfig(randomInterval());
657+
SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder()
658+
.startObject()
659+
.field(FIELD_TIMESTAMP, randomDateForInterval(config.getInterval()))
660+
.field(FIELD_DIMENSION_1, randomAlphaOfLength(1))
661+
.field(FIELD_NUMERIC_1, randomDouble())
662+
.endObject();
663+
bulkIndex(sourceSupplier);
664+
prepareSourceIndex(sourceIndex);
665+
666+
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
667+
Index srcIndex = resolveIndex(sourceIndex);
668+
IndexService indexService = indexServices.indexServiceSafe(srcIndex);
669+
int shardNum = randomIntBetween(0, numOfShards - 1);
670+
IndexShard shard = indexService.getShard(shardNum);
671+
RollupShardTask task = new RollupShardTask(
672+
randomLong(),
673+
"rollup",
674+
"action",
675+
TaskId.EMPTY_TASK_ID,
676+
rollupIndex,
677+
config,
678+
emptyMap(),
679+
shard.shardId()
680+
);
681+
682+
// re-use source index as temp index for test
683+
RollupShardIndexer indexer = new RollupShardIndexer(
684+
task,
685+
client(),
686+
indexService,
687+
shard.shardId(),
688+
rollupIndex,
689+
config,
690+
new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 },
691+
new String[] {}
692+
);
693+
/*
694+
* Here we set the batch size and the total bytes in flight size to tiny numbers so that we are guaranteed to trigger the bulk
695+
* processor to reject some calls to add(), so that we can make sure RollupShardIndexer keeps trying until success.
696+
*/
697+
indexer.rollupMaxBytesInFlight = ByteSizeValue.ofBytes(1024);
698+
indexer.rollupBulkSize = ByteSizeValue.ofBytes(512);
699+
indexer.execute();
700+
}
701+
653702
private DateHistogramInterval randomInterval() {
654703
return ConfigTestHelpers.randomInterval();
655704
}

0 commit comments

Comments
 (0)