|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the Elastic License |
| 4 | + * 2.0 and the Server Side Public License, v 1; you may not use this file except |
| 5 | + * in compliance with, at your election, the Elastic License 2.0 or the Server |
| 6 | + * Side Public License, v 1. |
| 7 | + */ |
| 8 | + |
| 9 | +package org.elasticsearch.common.util.concurrent; |
| 10 | + |
| 11 | +import org.elasticsearch.core.AbstractRefCounted; |
| 12 | +import org.elasticsearch.core.RefCounted; |
| 13 | +import org.elasticsearch.core.Releasable; |
| 14 | +import org.elasticsearch.core.Releasables; |
| 15 | +import org.elasticsearch.core.Strings; |
| 16 | +import org.elasticsearch.logging.LogManager; |
| 17 | +import org.elasticsearch.logging.Logger; |
| 18 | + |
| 19 | +import java.util.Iterator; |
| 20 | +import java.util.Objects; |
| 21 | +import java.util.concurrent.Semaphore; |
| 22 | +import java.util.function.BiConsumer; |
| 23 | + |
| 24 | +public class ThrottledIterator<T> implements Releasable { |
| 25 | + |
| 26 | + private static final Logger logger = LogManager.getLogger(ThrottledIterator.class); |
| 27 | + |
| 28 | + /** |
| 29 | + * Iterate through the given collection, performing an operation on each item which may fork background tasks, but with a limit on the |
| 30 | + * number of such background tasks running concurrently to avoid overwhelming the rest of the system (e.g. starving other work of access |
| 31 | + * to an executor). |
| 32 | + * |
| 33 | + * @param iterator The items to iterate. May be accessed by multiple threads, but accesses are all protected by synchronizing on itself. |
| 34 | + * @param itemConsumer The operation to perform on each item. Each operation receives a {@link RefCounted} which can be used to track |
| 35 | + * the execution of any background tasks spawned for this item. This operation may run on the thread which |
| 36 | + * originally called {@link #run}, if this method has not yet returned. Otherwise it will run on a thread on which a |
| 37 | + * background task previously called {@link RefCounted#decRef()} on its ref count. This operation should not throw |
| 38 | + * any exceptions. |
| 39 | + * @param maxConcurrency The maximum number of ongoing operations at any time. |
| 40 | + * @param onItemCompletion Executed when each item is completed, which can be used for instance to report on progress. Must not throw |
| 41 | + * exceptions. |
| 42 | + * @param onCompletion Executed when all items are completed. |
| 43 | + */ |
| 44 | + public static <T> void run( |
| 45 | + Iterator<T> iterator, |
| 46 | + BiConsumer<Releasable, T> itemConsumer, |
| 47 | + int maxConcurrency, |
| 48 | + Runnable onItemCompletion, |
| 49 | + Runnable onCompletion |
| 50 | + ) { |
| 51 | + try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onItemCompletion, onCompletion)) { |
| 52 | + throttledIterator.run(); |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + private final RefCounted refs; // one ref for each running item, plus one for the iterator if incomplete |
| 57 | + private final Iterator<T> iterator; |
| 58 | + private final BiConsumer<Releasable, T> itemConsumer; |
| 59 | + private final Semaphore permits; |
| 60 | + private final Runnable onItemCompletion; |
| 61 | + |
| 62 | + private ThrottledIterator( |
| 63 | + Iterator<T> iterator, |
| 64 | + BiConsumer<Releasable, T> itemConsumer, |
| 65 | + int maxConcurrency, |
| 66 | + Runnable onItemCompletion, |
| 67 | + Runnable onCompletion |
| 68 | + ) { |
| 69 | + this.iterator = Objects.requireNonNull(iterator); |
| 70 | + this.itemConsumer = Objects.requireNonNull(itemConsumer); |
| 71 | + if (maxConcurrency <= 0) { |
| 72 | + throw new IllegalArgumentException("maxConcurrency must be positive"); |
| 73 | + } |
| 74 | + this.permits = new Semaphore(maxConcurrency); |
| 75 | + this.onItemCompletion = Objects.requireNonNull(onItemCompletion); |
| 76 | + this.refs = AbstractRefCounted.of(onCompletion); |
| 77 | + } |
| 78 | + |
| 79 | + private void run() { |
| 80 | + while (permits.tryAcquire()) { |
| 81 | + final T item; |
| 82 | + synchronized (iterator) { |
| 83 | + if (iterator.hasNext()) { |
| 84 | + item = iterator.next(); |
| 85 | + } else { |
| 86 | + permits.release(); |
| 87 | + return; |
| 88 | + } |
| 89 | + } |
| 90 | + try (var itemRefs = new ItemRefCounted()) { |
| 91 | + itemRefs.incRef(); |
| 92 | + itemConsumer.accept(Releasables.releaseOnce(itemRefs::decRef), item); |
| 93 | + } catch (Exception e) { |
| 94 | + logger.error(Strings.format("exception when processing [%s] with [%s]", item, itemConsumer), e); |
| 95 | + assert false : e; |
| 96 | + } |
| 97 | + } |
| 98 | + } |
| 99 | + |
| 100 | + @Override |
| 101 | + public void close() { |
| 102 | + refs.decRef(); |
| 103 | + } |
| 104 | + |
| 105 | + // A RefCounted for a single item, including protection against calling back into run() if it's created and closed within a single |
| 106 | + // invocation of run(). |
| 107 | + private class ItemRefCounted extends AbstractRefCounted implements Releasable { |
| 108 | + private boolean isRecursive = true; |
| 109 | + |
| 110 | + ItemRefCounted() { |
| 111 | + refs.incRef(); |
| 112 | + } |
| 113 | + |
| 114 | + @Override |
| 115 | + protected void closeInternal() { |
| 116 | + try { |
| 117 | + onItemCompletion.run(); |
| 118 | + } catch (Exception e) { |
| 119 | + logger.error("exception in onItemCompletion", e); |
| 120 | + assert false : e; |
| 121 | + } finally { |
| 122 | + permits.release(); |
| 123 | + try { |
| 124 | + // Someone must now pick up the next item. Here we might be called from the run() invocation which started processing |
| 125 | + // the just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks |
| 126 | + // finished first. If so, there's no need to call run() here, we can just return and the next iteration of the run() |
| 127 | + // loop will continue the processing; moreover calling run() in this situation could lead to a stack overflow. However |
| 128 | + // if we're not within that run() invocation then ... |
| 129 | + if (isRecursive() == false) { |
| 130 | + // ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here. |
| 131 | + run(); |
| 132 | + } |
| 133 | + } finally { |
| 134 | + refs.decRef(); |
| 135 | + } |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + // Note on blocking: we call both of these synchronized methods exactly once (and must enter close() before calling isRecursive()). |
| 140 | + // If close() releases the last ref and calls closeInternal(), and hence isRecursive(), then there's no other threads involved and |
| 141 | + // hence no blocking. In contrast if close() doesn't release the last ref then it exits immediately, so the call to isRecursive() |
| 142 | + // will proceed without delay in this case too. |
| 143 | + |
| 144 | + private synchronized boolean isRecursive() { |
| 145 | + return isRecursive; |
| 146 | + } |
| 147 | + |
| 148 | + @Override |
| 149 | + public synchronized void close() { |
| 150 | + decRef(); |
| 151 | + isRecursive = false; |
| 152 | + } |
| 153 | + } |
| 154 | +} |
0 commit comments