|
54 | 54 | import java.io.IOException;
|
55 | 55 | import java.util.Collections;
|
56 | 56 | import java.util.Map;
|
| 57 | +import java.util.concurrent.ExecutorService; |
57 | 58 |
|
58 | 59 | import static org.elasticsearch.ExceptionsHelper.unwrapCause;
|
59 | 60 | import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest;
|
@@ -325,20 +326,28 @@ private void handleUpdateFailureWithRetry(
|
325 | 326 | int retryCount
|
326 | 327 | ) {
|
327 | 328 | final Throwable cause = unwrapCause(failure);
|
328 |
| - if (cause instanceof VersionConflictEngineException) { |
329 |
| - if (retryCount < request.retryOnConflict()) { |
330 |
| - logger.trace( |
331 |
| - "Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", |
332 |
| - retryCount + 1, |
333 |
| - request.retryOnConflict(), |
334 |
| - request.index(), |
335 |
| - request.getShardId(), |
336 |
| - request.id() |
337 |
| - ); |
338 |
| - threadPool.executor(executor(request.getShardId())) |
339 |
| - .execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1))); |
| 329 | + if (cause instanceof VersionConflictEngineException && retryCount < request.retryOnConflict()) { |
| 330 | + VersionConflictEngineException versionConflictEngineException = (VersionConflictEngineException) cause; |
| 331 | + logger.trace( |
| 332 | + "Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", |
| 333 | + retryCount + 1, |
| 334 | + request.retryOnConflict(), |
| 335 | + request.index(), |
| 336 | + request.getShardId(), |
| 337 | + request.id() |
| 338 | + ); |
| 339 | + |
| 340 | + final ExecutorService executor; |
| 341 | + try { |
| 342 | + executor = threadPool.executor(executor(request.getShardId())); |
| 343 | + } catch (Exception e) { |
| 344 | + // might fail if shard no longer exists locally, in which case we cannot retry |
| 345 | + e.addSuppressed(versionConflictEngineException); |
| 346 | + listener.onFailure(e); |
340 | 347 | return;
|
341 | 348 | }
|
| 349 | + executor.execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1))); |
| 350 | + return; |
342 | 351 | }
|
343 | 352 | listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
|
344 | 353 | }
|
|
0 commit comments