|
27 | 27 | import java.time.Duration;
|
28 | 28 | import java.util.ArrayList;
|
29 | 29 | import java.util.Collection;
|
| 30 | +import java.util.Deque; |
30 | 31 | import java.util.HashSet;
|
| 32 | +import java.util.Iterator; |
31 | 33 | import java.util.LinkedList;
|
32 | 34 | import java.util.List;
|
33 | 35 | import java.util.Objects;
|
34 | 36 | import java.util.Set;
|
| 37 | +import java.util.concurrent.ConcurrentLinkedDeque; |
35 | 38 | import java.util.concurrent.Semaphore;
|
36 | 39 | import java.util.concurrent.ThreadLocalRandom;
|
37 | 40 | import java.util.concurrent.atomic.AtomicInteger;
|
@@ -69,14 +72,14 @@ public final class SearchIndexingPublisher<T> {
|
69 | 72 | private final Function<Integer, Integer> scaleDownFunction = size -> size / 2;
|
70 | 73 |
|
71 | 74 | private final Object actionsMutex = new Object();
|
72 |
| - private final LinkedList<TryTrackingIndexAction<T>> actions = new LinkedList<>(); |
| 75 | + private final Deque<TryTrackingIndexAction<T>> actions = new ConcurrentLinkedDeque<>(); |
73 | 76 |
|
74 | 77 | /*
|
75 | 78 | * This queue keeps track of documents that are currently being sent to the service for indexing. This queue is
|
76 | 79 | * resilient against cases where the request timeouts or is cancelled by an external operation, preventing the
|
77 | 80 | * documents from being lost.
|
78 | 81 | */
|
79 |
| - private final LinkedList<TryTrackingIndexAction<T>> inFlightActions = new LinkedList<>(); |
| 82 | + private final Deque<TryTrackingIndexAction<T>> inFlightActions = new ConcurrentLinkedDeque<>(); |
80 | 83 |
|
81 | 84 | private final Semaphore processingSemaphore = new Semaphore(1);
|
82 | 85 |
|
@@ -157,11 +160,9 @@ public synchronized Mono<Void> addActions(Collection<IndexAction<T>> actions, Co
|
157 | 160 | public Mono<Void> flush(boolean awaitLock, boolean isClose, Context context) {
|
158 | 161 | if (awaitLock) {
|
159 | 162 | processingSemaphore.acquireUninterruptibly();
|
160 |
| - return flushLoop(isClose, context) |
161 |
| - .doFinally(ignored -> processingSemaphore.release()); |
| 163 | + return Mono.using(() -> processingSemaphore, ignored -> flushLoop(isClose, context), Semaphore::release); |
162 | 164 | } else if (processingSemaphore.tryAcquire()) {
|
163 |
| - return flushLoop(isClose, context) |
164 |
| - .doFinally(ignored -> processingSemaphore.release()); |
| 165 | + return Mono.using(() -> processingSemaphore, ignored -> flushLoop(isClose, context), Semaphore::release); |
165 | 166 | } else {
|
166 | 167 | LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
|
167 | 168 | return Mono.empty();
|
@@ -224,21 +225,21 @@ private List<TryTrackingIndexAction<T>> createBatch() {
|
224 | 225 | return batchActions;
|
225 | 226 | }
|
226 | 227 |
|
227 |
| - private int fillFromQueue(List<TryTrackingIndexAction<T>> batch, List<TryTrackingIndexAction<T>> queue, |
| 228 | + private static <T> int fillFromQueue(List<TryTrackingIndexAction<T>> batch, Deque<TryTrackingIndexAction<T>> queue, |
228 | 229 | int requested, Set<String> duplicateKeyTracker) {
|
229 |
| - int offset = 0; |
230 | 230 | int actionsAdded = 0;
|
231 |
| - int queueSize = queue.size(); |
232 | 231 |
|
233 |
| - while (actionsAdded < requested && offset < queueSize) { |
234 |
| - TryTrackingIndexAction<T> potentialDocumentToAdd = queue.get(offset++ - actionsAdded); |
| 232 | + Iterator<TryTrackingIndexAction<T>> iterator = queue.iterator(); |
| 233 | + while (actionsAdded < requested && iterator.hasNext()) { |
| 234 | + TryTrackingIndexAction<T> potentialDocumentToAdd = iterator.next(); |
235 | 235 |
|
236 | 236 | if (duplicateKeyTracker.contains(potentialDocumentToAdd.getKey())) {
|
237 | 237 | continue;
|
238 | 238 | }
|
239 | 239 |
|
240 | 240 | duplicateKeyTracker.add(potentialDocumentToAdd.getKey());
|
241 |
| - batch.add(queue.remove(offset - 1 - actionsAdded)); |
| 241 | + batch.add(potentialDocumentToAdd); |
| 242 | + iterator.remove(); |
242 | 243 | actionsAdded += 1;
|
243 | 244 | }
|
244 | 245 |
|
@@ -330,7 +331,7 @@ private void handleResponse(List<TryTrackingIndexAction<T>> actions, IndexBatchR
|
330 | 331 | return;
|
331 | 332 | }
|
332 | 333 |
|
333 |
| - List<TryTrackingIndexAction<T>> actionsToRetry = new ArrayList<>(); |
| 334 | + Deque<TryTrackingIndexAction<T>> actionsToRetry = new LinkedList<>(); |
334 | 335 | boolean has503 = batchResponse.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE;
|
335 | 336 | if (batchResponse.getResults() == null) {
|
336 | 337 | /*
|
@@ -391,6 +392,13 @@ private void handleResponse(List<TryTrackingIndexAction<T>> actions, IndexBatchR
|
391 | 392 | }
|
392 | 393 | }
|
393 | 394 |
|
| 395 | + private void reinsertFailedActions(Deque<TryTrackingIndexAction<T>> actionsToRetry) { |
| 396 | + synchronized (actionsMutex) { |
| 397 | + // Push all actions that need to be retried back into the queue. |
| 398 | + actionsToRetry.descendingIterator().forEachRemaining(actions::add); |
| 399 | + } |
| 400 | + } |
| 401 | + |
394 | 402 | private void reinsertFailedActions(List<TryTrackingIndexAction<T>> actionsToRetry) {
|
395 | 403 | synchronized (actionsMutex) {
|
396 | 404 | // Push all actions that need to be retried back into the queue.
|
|
0 commit comments