Skip to content

Commit e3f4d48

Browse files
committed
Removing BulkProcessor
1 parent aec8fcb commit e3f4d48

File tree

18 files changed

+86
-1872
lines changed

18 files changed

+86
-1872
lines changed

qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.action.LatchedActionListener;
1818
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1919
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
20-
import org.elasticsearch.action.bulk.BulkProcessor;
20+
import org.elasticsearch.action.bulk.BulkProcessor2;
2121
import org.elasticsearch.action.bulk.BulkRequest;
2222
import org.elasticsearch.action.bulk.BulkResponse;
2323
import org.elasticsearch.action.index.IndexRequest;
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.Strings;
3333
import org.elasticsearch.common.bytes.BytesReference;
3434
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3536
import org.elasticsearch.common.xcontent.XContentHelper;
3637
import org.elasticsearch.core.IOUtils;
3738
import org.elasticsearch.index.query.InnerHitBuilder;
@@ -194,9 +195,9 @@ private static void indexDocuments(String idPrefix) throws IOException, Interrup
194195
response = createIndex(INDEX_NAME, settings, mapping);
195196
assertTrue(response.isAcknowledged());
196197

197-
BulkProcessor bulkProcessor = BulkProcessor.builder(
198+
BulkProcessor2 bulkProcessor = BulkProcessor2.builder(
198199
(r, l) -> restHighLevelClient.bulkAsync(r, RequestOptions.DEFAULT, l),
199-
new BulkProcessor.Listener() {
200+
new BulkProcessor2.Listener() {
200201
@Override
201202
public void beforeBulk(long executionId, BulkRequest request) {}
202203

@@ -206,11 +207,11 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
206207
}
207208

208209
@Override
209-
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
210+
public void afterBulk(long executionId, BulkRequest request, Exception failure) {
210211
throw new AssertionError("Failed to execute bulk", failure);
211212
}
212213
},
213-
"CCSDuelIT"
214+
new DeterministicTaskQueue(random()).getThreadPool()
214215
).build();
215216

216217
int numQuestions = randomIntBetween(50, 100);

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java

Lines changed: 0 additions & 378 deletions
This file was deleted.

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

Lines changed: 0 additions & 234 deletions
This file was deleted.

0 commit comments

Comments
 (0)