1616import co .elastic .clients .transport .BackoffPolicy ;
1717import co .elastic .clients .transport .ElasticsearchTransport ;
1818import co .elastic .clients .transport .Endpoint ;
19+ import co .elastic .clients .transport .TransportException ;
1920import co .elastic .clients .transport .TransportOptions ;
21+ import co .elastic .clients .transport .http .RepeatableBodyResponse ;
22+ import co .elastic .clients .transport .http .TransportHttpClient ;
2023import org .jetbrains .annotations .Nullable ;
2124import org .junit .jupiter .api .BeforeAll ;
2225import org .junit .jupiter .api .Test ;
3639public class BulkIngesterRetryPolicyTest {
3740
3841 protected static ElasticsearchClient client ;
42+ protected static TestTransport transport ;
43+ protected static CountingListener listener ;
3944
4045 private BulkOperation create = BulkOperation .of (b -> b .create (c -> c .index ("foo" ).id ("1" ).document ("1" )));
4146 private BulkOperation index = BulkOperation .of (b -> b .index (c -> c .index ("fooo" ).id ("2" ).document ("2" )));
47+ private BulkOperation indexFail = BulkOperation .of (b -> b .index (c -> c .index ("fail" ).id ("2" ).document ("2" )));
4248 private BulkOperation delete = BulkOperation .of (b -> b .delete (c -> c .index ("foooo" ).id ("3" )));
49+ private BulkOperation deleteFail = BulkOperation .of (b -> b .delete (c -> c .index ("fail" ).id ("3" )));
4350
4451 @ BeforeAll
45- public static void beforeAll () {
46- TestTransport transport = new TestTransport ();
47- ElasticsearchAsyncClient client = new ElasticsearchAsyncClient (transport );
52+ public static void setup (){
53+ transport = new TestTransport ();
54+ client = new ElasticsearchClient (transport );
55+ listener = new CountingListener ();
4856 }
4957
5058 @ Test
5159 public void retryTestNoScheduledFlushNoContext () throws Exception {
52- TestTransport transport = new TestTransport ();
53- ElasticsearchClient client = new ElasticsearchClient (transport );
54- CountingListener listener = new CountingListener ();
55-
56-
57- BulkIngester <Void > ingester = BulkIngester .of (b -> b
58- .client (client )
59- .maxOperations (3 )
60- .maxConcurrentRequests (3 )
61- .listener (listener )
62- .backoffPolicy (BackoffPolicy .constantBackoff (50L ,8 ))
63- );
6460
65- // First test, partial success
61+ // First test, partial success, other will succeed after retrying
6662 {
63+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
64+
6765 ingester .add (create );
6866 ingester .add (index );
6967 ingester .add (index );
7068
7169 ingester .close ();
7270
73- // at most it should be 1 instant success + 2 retries, at minimum just 3 instant successes
74- assertTrue (listener .requests .get () > 0 && listener .requests .get () < 4 );
71+ // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending if the retries
72+ // get scheduled at the same exact time
73+ assertTrue (listener .requests .get () == 2 || listener .requests .get () == 3 );
7574 // eventually all 3 have to succeed
7675 assertTrue (listener .successOperations .get () == 3 );
7776 }
77+
78+ // Second test, all requests will succeed after retrying
79+ {
80+ transport .reset ();
81+ listener .reset ();
82+
83+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
84+
85+ ingester .add (index );
86+ ingester .add (index );
87+ ingester .add (index );
88+ ingester .add (index );
89+
90+ ingester .close ();
91+
92+ // between 1 and 4, depending on scheduler
93+ assertTrue (listener .requests .get () >= 1 && listener .requests .get () <= 4 );
94+ // eventually all 4 have to succeed
95+ assertTrue (listener .successOperations .get () == 4 );
96+ }
97+
98+ // Third test, only one retryable (will succeed), other permanent failures
99+ {
100+ transport .reset ();
101+ listener .reset ();
102+
103+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
104+
105+ ingester .add (index );
106+ ingester .add (delete );
107+ ingester .add (delete );
108+
109+ ingester .close ();
110+
111+ // 2 failed will be handled together, then 1 retry
112+ assertTrue (listener .requests .get () == 2 );
113+
114+ assertTrue (listener .successOperations .get () == 1 );
115+ assertTrue (listener .errorOperations .get () == 2 );
116+ }
117+
118+ // Fourth test, all requests will be retried until policy allows, then fail
119+ {
120+ transport .reset ();
121+ listener .reset ();
122+
123+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
124+
125+ ingester .add (indexFail );
126+ ingester .add (indexFail );
127+ ingester .add (indexFail );
128+
129+ ingester .close ();
130+
131+ // from 1 to 3 depending on scheduling
132+ assertTrue (listener .requests .get () >= 1 && listener .requests .get () <= 3 );
133+
134+ assertTrue (listener .successOperations .get () == 0 );
135+ assertTrue (listener .errorOperations .get () == 3 );
136+ }
137+
138+ // Fifth test, one exception that will make everything else fail, no retries triggered
139+ {
140+ transport .reset ();
141+ listener .reset ();
142+
143+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
144+
145+ ingester .add (index );
146+ ingester .add (create );
147+ ingester .add (deleteFail );
148+
149+ ingester .close ();
150+
151+ // just the one
152+ assertTrue (listener .requests .get () == 1 );
153+
154+ assertTrue (listener .successOperations .get () == 0 );
155+ assertTrue (listener .errorOperations .get () == 3 );
156+ }
157+
158+ // Sixth test, a mix of everything
159+ {
160+ transport .reset ();
161+ listener .reset ();
162+
163+ BulkIngester <Void > ingester = newBasicBulkIngester (listener );
164+
165+ ingester .add (create );
166+ ingester .add (index );
167+ ingester .add (indexFail );
168+ ingester .add (delete );
169+ ingester .add (create );
170+ ingester .add (index );
171+ ingester .add (indexFail );
172+ ingester .add (delete );
173+
174+ ingester .close ();
175+
176+ // from 2 to 4 depending on scheduling
177+ assertTrue (listener .requests .get () >= 1 && listener .successOperations .get () <= 4 );
178+
179+ assertTrue (listener .successOperations .get () == 4 );
180+ assertTrue (listener .errorOperations .get () == 4 );
181+ }
182+
183+ transport .close ();
184+ }
185+
186+ @ Test
187+ public void retryTestFlushAndContextExponentialBackoff () throws Exception {
188+
189+ TestTransport transport = new TestTransport ();
190+ ElasticsearchClient client = new ElasticsearchClient (transport );
191+ CountingListener listener = new CountingListener ();
192+
193+ // TODO
194+
78195 transport .close ();
79196 }
80197
@@ -112,53 +229,67 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
112229
113230 // For testing purposes, different result depending on the operation type.
114231 // Create will always succeed
115- // Index will always 429 for 3 times, then 200
116- // Delete will always return 404
117-
118- List <BulkResponseItem > items = new ArrayList <>();
119- for (BulkOperation op : bulk .operations ()) {
120- OperationType operationType = OperationType .Create ;
121- ErrorCause error = null ;
122- int status = 200 ;
123- String index = null ;
124- switch (op ._kind ()) {
125- case Index :
126- index = ((IndexOperation ) op ._get ()).index ();
127- operationType = OperationType .Index ;
128- boolean isStillRetrying = retryFailures .incrementAndGet () > 2 ;
129- error = isStillRetrying ? null : ErrorCause .of (e -> e .reason ("some error" ));
130- status = isStillRetrying ? 200 : 429 ;
131- break ;
132- case Delete :
133- index = ((DeleteOperation ) op ._get ()).index ();
134- operationType = OperationType .Delete ;
135- error = ErrorCause .of (e -> e .reason ("some error" ));
136- status = 404 ;
137- break ;
138- default :
139- index = ((CreateOperation ) op ._get ()).index ();
140- break ;
232+ // Index will always 429 for 3 times, then 200. Index with index name "fail" will only 429.
233+ // Delete will always return 404. Delete with index name "fail" will throw transport exception.
234+
235+ try {
236+
237+ List <BulkResponseItem > items = new ArrayList <>();
238+ for (BulkOperation op : bulk .operations ()) {
239+ OperationType operationType = OperationType .Create ;
240+ ErrorCause error = null ;
241+ int status = 200 ;
242+ String index = null ;
243+ switch (op ._kind ()) {
244+ case Index :
245+ index = ((IndexOperation ) op ._get ()).index ();
246+ operationType = OperationType .Index ;
247+ boolean isStillRetrying = retryFailures .incrementAndGet () > 2 ;
248+ error = isStillRetrying && !index .equals ("fail" ) ? null : ErrorCause .of (e -> e .reason ("some error" ));
249+ status = isStillRetrying && !index .equals ("fail" ) ? 200 : 429 ;
250+ break ;
251+ case Delete :
252+ index = ((DeleteOperation ) op ._get ()).index ();
253+ if (index .equals ("fail" )) {
254+ throw new RuntimeException ("error" );
255+ }
256+ operationType = OperationType .Delete ;
257+ error = ErrorCause .of (e -> e .reason ("some error" ));
258+ status = 404 ;
259+ break ;
260+ default :
261+ index = ((CreateOperation ) op ._get ()).index ();
262+ break ;
263+ }
264+ ErrorCause finalError = error ;
265+ int finalStatus = status ;
266+ OperationType finalOperationType = operationType ;
267+ String finalIndex = index ;
268+ items .add (BulkResponseItem .of (b -> b
269+ .index (finalIndex )
270+ .operationType (finalOperationType )
271+ .status (finalStatus )
272+ .error (finalError )));
141273 }
142- ErrorCause finalError = error ;
143- int finalStatus = status ;
144- OperationType finalOperationType = operationType ;
145- String finalIndex = index ;
146- items .add (BulkResponseItem .of (b -> b
147- .index (finalIndex )
148- .operationType (finalOperationType )
149- .status (finalStatus )
150- .error (finalError )));
151- }
152274
153- CompletableFuture <BulkResponse > response = new CompletableFuture <>();
154- executor .submit (() -> {
155- requestsCompleted .incrementAndGet ();
156- response .complete (BulkResponse .of (r -> r .errors (false ).items (items ).took (3 )));
157- });
275+ CompletableFuture <BulkResponse > response = new CompletableFuture <>();
276+ executor .submit (() -> {
277+ requestsCompleted .incrementAndGet ();
278+ response .complete (BulkResponse .of (r -> r .errors (false ).items (items ).took (3 )));
279+ });
158280
159- @ SuppressWarnings ("unchecked" )
160- CompletableFuture <ResponseT > result = (CompletableFuture <ResponseT >) response ;
161- return result ;
281+ @ SuppressWarnings ("unchecked" )
282+ CompletableFuture <ResponseT > result = (CompletableFuture <ResponseT >) response ;
283+ return result ;
284+ }
285+ catch (RuntimeException e ){
286+ CompletableFuture <ResponseT > future = new CompletableFuture <>();
287+ future .completeExceptionally (e );
288+ executor .submit (() -> {
289+ future .completeExceptionally (e );
290+ });
291+ return future ;
292+ }
162293 }
163294
164295 @ Override
@@ -180,6 +311,13 @@ public void close() throws IOException {
180311 throw new RuntimeException (e );
181312 }
182313 }
314+
315+ public void reset () {
316+ requestsStarted .set (0 );
317+ requestsCompleted .set (0 );
318+ operations .set (0 );
319+ retryFailures .set (0 );
320+ }
183321 }
184322
185323 private static class CountingListener implements BulkListener <Void > {
@@ -212,5 +350,21 @@ public void afterBulk(long executionId, BulkRequest request, List<Void> contexts
212350 errorOperations .addAndGet (request .operations ().size ());
213351 requests .incrementAndGet ();
214352 }
353+
354+ public void reset () {
355+ successOperations .set (0 );
356+ errorOperations .set (0 );
357+ requests .set (0 );
358+ }
359+ }
360+
361+ private BulkIngester <Void > newBasicBulkIngester (BulkListener <Void > listener ){
362+ return BulkIngester .of (b -> b
363+ .client (client )
364+ .maxOperations (10 )
365+ .maxConcurrentRequests (10 )
366+ .listener (listener )
367+ .backoffPolicy (BackoffPolicy .constantBackoff (50L ,8 ))
368+ );
215369 }
216370}
0 commit comments