Skip to content

Commit 1169ca8

Browse files
committed
working unit test, bugfixes
1 parent 15c2a92 commit 1169ca8

File tree

2 files changed

+228
-9
lines changed

2 files changed

+228
-9
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.Collections;
3939
import java.util.Iterator;
4040
import java.util.List;
41-
import java.util.Objects;
4241
import java.util.Optional;
4342
import java.util.concurrent.CompletionStage;
4443
import java.util.concurrent.Executors;
@@ -301,7 +300,8 @@ public void flush() {
301300
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
302301
() -> {
303302
// May happen on manual and periodic flushes
304-
return !operations.isEmpty();
303+
return !operations.isEmpty() && operations.stream()
304+
.anyMatch(BulkOperationRepeatable::isSendable);
305305
},
306306
() -> {
307307
// Selecting operations that can be sent immediately
@@ -319,9 +319,10 @@ public void flush() {
319319
.collect(Collectors.toList());
320320

321321
// If all contexts are null, no need for the list
322-
if (contexts.stream().allMatch(Objects::isNull)) {
323-
contexts = null;
324-
}
322+
// TODO want to keep?
323+
// if (contexts.stream().allMatch(Objects::isNull)) {
324+
// contexts = new ArrayList<>();
325+
// }
325326

326327
// Build the request
327328
BulkRequest request = newRequest().operations(immediateOps).build();
@@ -379,6 +380,7 @@ public void flush() {
379380
// Partial success, retrying failed requests if policy allows it
380381
// Keeping list of retryables, to exclude them for calling listener later
381382
List<BulkOperationRepeatable<Context>> retryableReq = new ArrayList<>();
383+
List<BulkOperationRepeatable<Context>> refires = new ArrayList<>();
382384
List<BulkResponseItem> retryableResp = new ArrayList<>();
383385
for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
384386
int index = resp.items().indexOf(bulkItemResponse);
@@ -392,7 +394,8 @@ public void flush() {
392394
BulkOperationRepeatable<Context> refire =
393395
new BulkOperationRepeatable<>(original.getOperation(),
394396
original.getContext(), retries);
395-
retryableReq.add(refire);
397+
retryableReq.add(original);
398+
refires.add(refire);
396399
addRetry(refire);
397400
logger.warn("Added failed request back in queue, retrying in : " + refire.getCurrentRetryTimeDelay() + " ms");
398401
// TODO remove after checking
@@ -402,10 +405,10 @@ public void flush() {
402405
}
403406
}
404407
// Scheduling flushes for just sent out retryable requests
405-
if (!retryableReq.isEmpty()) {
408+
if (!refires.isEmpty()) {
406409
// if size <= 3, all times
407410
// if size > 3, schedule just first, last and median
408-
scheduleRetries(retryableReq);
411+
scheduleRetries(refires);
409412
}
410413
// Retrieving list of remaining successful or not retryable requests
411414
sentRequests.removeAll(retryableReq);
@@ -422,7 +425,7 @@ public void flush() {
422425
.map(BulkOperationRepeatable::getContext)
423426
.collect(Collectors.toList());
424427
// Filtering response
425-
List<BulkResponseItem> partialItems = resp.items();
428+
List<BulkResponseItem> partialItems = new ArrayList<>(resp.items());
426429
partialItems.removeAll(retryableResp);
427430

428431
BulkResponse partialResp = BulkResponse.of(br -> br
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package co.elastic.clients.elasticsearch._helpers.bulk;
2+
3+
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
4+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
5+
import co.elastic.clients.elasticsearch._types.ErrorCause;
6+
import co.elastic.clients.elasticsearch.core.BulkRequest;
7+
import co.elastic.clients.elasticsearch.core.BulkResponse;
8+
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
9+
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
10+
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
11+
import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
12+
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
13+
import co.elastic.clients.elasticsearch.core.bulk.OperationType;
14+
import co.elastic.clients.json.JsonpMapper;
15+
import co.elastic.clients.json.SimpleJsonpMapper;
16+
import co.elastic.clients.transport.BackoffPolicy;
17+
import co.elastic.clients.transport.ElasticsearchTransport;
18+
import co.elastic.clients.transport.Endpoint;
19+
import co.elastic.clients.transport.TransportOptions;
20+
import org.jetbrains.annotations.Nullable;
21+
import org.junit.jupiter.api.BeforeAll;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
36+
public class BulkIngesterRetryPolicyTest {
37+
38+
protected static ElasticsearchClient client;
39+
40+
private BulkOperation create = BulkOperation.of(b -> b.create(c -> c.index("foo").id("1").document("1")));
41+
private BulkOperation index = BulkOperation.of(b -> b.index(c -> c.index("fooo").id("2").document("2")));
42+
private BulkOperation delete = BulkOperation.of(b -> b.delete(c -> c.index("foooo").id("3")));
43+
44+
@BeforeAll
45+
public static void beforeAll() {
46+
TestTransport transport = new TestTransport();
47+
ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);
48+
}
49+
50+
@Test
51+
public void retryTestNoScheduledFlushNoContext() throws Exception {
52+
TestTransport transport = new TestTransport();
53+
ElasticsearchClient client = new ElasticsearchClient(transport);
54+
CountingListener listener = new CountingListener();
55+
56+
57+
BulkIngester<Void> ingester = BulkIngester.of(b -> b
58+
.client(client)
59+
.maxOperations(3)
60+
.maxConcurrentRequests(3)
61+
.listener(listener)
62+
.backoffPolicy(BackoffPolicy.constantBackoff(50L,8))
63+
);
64+
65+
// First test, partial success
66+
{
67+
ingester.add(create);
68+
ingester.add(index);
69+
ingester.add(index);
70+
71+
ingester.close();
72+
73+
// at most it should be 1 instant success + 2 retries, at minimum just 3 instant successes
74+
assertTrue(listener.requests.get() > 0 && listener.requests.get() < 4);
75+
// eventually all 3 have to succeed
76+
assertTrue(listener.successOperations.get() == 3);
77+
}
78+
transport.close();
79+
}
80+
81+
82+
private static class TestTransport implements ElasticsearchTransport {
83+
public final AtomicInteger requestsStarted = new AtomicInteger();
84+
public final AtomicInteger requestsCompleted = new AtomicInteger();
85+
public final AtomicInteger operations = new AtomicInteger();
86+
87+
public final AtomicInteger retryFailures = new AtomicInteger();
88+
89+
90+
private final ExecutorService executor = Executors.newCachedThreadPool();
91+
92+
@Override
93+
public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
94+
RequestT request,
95+
Endpoint<RequestT, ResponseT, ErrorT> endpoint,
96+
@Nullable TransportOptions options
97+
) throws IOException {
98+
throw new UnsupportedOperationException();
99+
}
100+
101+
@Override
102+
public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequestAsync(RequestT request, Endpoint<RequestT,
103+
ResponseT, ErrorT> endpoint, @Nullable TransportOptions options) {
104+
105+
BulkRequest bulk = (BulkRequest) request;
106+
requestsStarted.incrementAndGet();
107+
operations.addAndGet(bulk.operations().size());
108+
109+
if (bulk.operations().isEmpty()) {
110+
System.out.println("No operations!");
111+
}
112+
113+
// For testing purposes, different result depending on the operation type.
114+
// Create will always succeed
115+
// Index will always 429 for 3 times, then 200
116+
// Delete will always return 404
117+
118+
List<BulkResponseItem> items = new ArrayList<>();
119+
for (BulkOperation op : bulk.operations()) {
120+
OperationType operationType = OperationType.Create;
121+
ErrorCause error = null;
122+
int status = 200;
123+
String index = null;
124+
switch (op._kind()) {
125+
case Index:
126+
index = ((IndexOperation) op._get()).index();
127+
operationType = OperationType.Index;
128+
boolean isStillRetrying = retryFailures.incrementAndGet() > 2;
129+
error = isStillRetrying ? null : ErrorCause.of(e -> e.reason("some error"));
130+
status = isStillRetrying ? 200 : 429;
131+
break;
132+
case Delete:
133+
index = ((DeleteOperation) op._get()).index();
134+
operationType = OperationType.Delete;
135+
error = ErrorCause.of(e -> e.reason("some error"));
136+
status = 404;
137+
break;
138+
default:
139+
index = ((CreateOperation) op._get()).index();
140+
break;
141+
}
142+
ErrorCause finalError = error;
143+
int finalStatus = status;
144+
OperationType finalOperationType = operationType;
145+
String finalIndex = index;
146+
items.add(BulkResponseItem.of(b -> b
147+
.index(finalIndex)
148+
.operationType(finalOperationType)
149+
.status(finalStatus)
150+
.error(finalError)));
151+
}
152+
153+
CompletableFuture<BulkResponse> response = new CompletableFuture<>();
154+
executor.submit(() -> {
155+
requestsCompleted.incrementAndGet();
156+
response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3)));
157+
});
158+
159+
@SuppressWarnings("unchecked")
160+
CompletableFuture<ResponseT> result = (CompletableFuture<ResponseT>) response;
161+
return result;
162+
}
163+
164+
@Override
165+
public JsonpMapper jsonpMapper() {
166+
return SimpleJsonpMapper.INSTANCE;
167+
}
168+
169+
@Override
170+
public TransportOptions options() {
171+
return null;
172+
}
173+
174+
@Override
175+
public void close() throws IOException {
176+
executor.shutdown();
177+
try {
178+
executor.awaitTermination(1, TimeUnit.SECONDS);
179+
} catch (InterruptedException e) {
180+
throw new RuntimeException(e);
181+
}
182+
}
183+
}
184+
185+
private static class CountingListener implements BulkListener<Void> {
186+
public final AtomicInteger successOperations = new AtomicInteger();
187+
public final AtomicInteger errorOperations = new AtomicInteger();
188+
public final AtomicInteger requests = new AtomicInteger();
189+
190+
@Override
191+
public void beforeBulk(long executionId, BulkRequest request, List<Void> contexts) {
192+
193+
}
194+
195+
@Override
196+
public void afterBulk(long executionId, BulkRequest request, List<Void> contexts,
197+
BulkResponse response) {
198+
for (BulkResponseItem item : response.items()) {
199+
if(item.error() != null) {
200+
errorOperations.incrementAndGet();
201+
}
202+
else{
203+
successOperations.incrementAndGet();
204+
}
205+
}
206+
requests.incrementAndGet();
207+
}
208+
209+
@Override
210+
public void afterBulk(long executionId, BulkRequest request, List<Void> contexts, Throwable failure) {
211+
failure.printStackTrace();
212+
errorOperations.addAndGet(request.operations().size());
213+
requests.incrementAndGet();
214+
}
215+
}
216+
}

0 commit comments

Comments
 (0)