Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1120,25 +1120,31 @@ private void bulkIndex(SourceSupplier sourceSupplier) throws IOException {
}

private void bulkIndex(final String indexName, final SourceSupplier sourceSupplier, int docCount) throws IOException {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docCount; i++) {
IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = sourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
// Index in such a way that we always have multiple segments, so that we test DownsampleShardIndexer in a more realistic scenario:
// (also makes failures more reproducible)
int duplicates = 0;
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed()) {
if (response.getFailure().getCause() instanceof VersionConflictEngineException) {
// A duplicate event was created by random generator. We should not fail for this
// reason.
logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage());
duplicates++;
} else {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
for (int i = 0; i < docCount;) {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int max = Math.min(i + 100, docCount);
for (int j = i; j < max; j++) {
IndexRequest indexRequest = new IndexRequest(indexName).opType(DocWriteRequest.OpType.CREATE);
XContentBuilder source = sourceSupplier.get();
indexRequest.source(source);
bulkRequestBuilder.add(indexRequest);
}
i = max;
BulkResponse bulkResponse = bulkRequestBuilder.get();
for (BulkItemResponse response : bulkResponse.getItems()) {
if (response.isFailed()) {
if (response.getFailure().getCause() instanceof VersionConflictEngineException) {
// A duplicate event was created by random generator. We should not fail for this
// reason.
logger.debug("We tried to insert a duplicate: [{}]", response.getFailureMessage());
duplicates++;
} else {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
}
}
Expand Down