Skip to content

Commit e9656d9

Browse files
authored
Remove onItemCompletion param from ThrottledIterator (#113919)
This param is a no-op for all production callers but one, and it's easy to adapt the one exception to its removal, so it's effectively useless. This commit removes it.
1 parent 31d50ee commit e9656d9

File tree

9 files changed

+29
-54
lines changed

9 files changed

+29
-54
lines changed

server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ private static void handleZipRestRequest(
269269
)
270270
),
271271
between(1, 10),
272-
() -> {},
273272
Releasables.wrap(refs.acquire(), chunkedZipResponse)::close
274273
);
275274
}

server/src/internalClusterTest/java/org/elasticsearch/rest/StreamingXContentResponseIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ private static void handleStreamingXContentRestRequest(
192192
})
193193
),
194194
between(1, 10),
195-
() -> {},
196195
() -> {
197196
try (streamingXContentResponse; finalRef) {
198197
streamingXContentResponse.writeFragment(p -> ChunkedToXContentHelper.endObject(), refs.acquire());

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ public void onFailure(Exception e) {
322322
})
323323
),
324324
getSnapshotInfoExecutor.getMaxRunningTasks(),
325-
() -> {},
326325
() -> {}
327326
);
328327
}));

server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ void run() {
191191
Iterators.flatMap(Iterators.forArray(concreteIndices), this::getIndexIterator),
192192
this::doShardRequest,
193193
maxConcurrentShardRequests,
194-
() -> {},
195194
outerListener::close
196195
);
197196
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,10 @@ public class ThrottledIterator<T> implements Releasable {
3838
* background task previously called {@link RefCounted#decRef()} on its ref count. This operation should not throw
3939
* any exceptions.
4040
* @param maxConcurrency The maximum number of ongoing operations at any time.
41-
* @param onItemCompletion Executed when each item is completed, which can be used for instance to report on progress. Must not throw
42-
* exceptions.
43-
* @param onCompletion Executed when all items are completed.
41+
* @param onCompletion Executed when all items are completed.
4442
*/
45-
public static <T> void run(
46-
Iterator<T> iterator,
47-
BiConsumer<Releasable, T> itemConsumer,
48-
int maxConcurrency,
49-
Runnable onItemCompletion,
50-
Runnable onCompletion
51-
) {
52-
try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onItemCompletion, onCompletion)) {
43+
public static <T> void run(Iterator<T> iterator, BiConsumer<Releasable, T> itemConsumer, int maxConcurrency, Runnable onCompletion) {
44+
try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onCompletion)) {
5345
throttledIterator.run();
5446
}
5547
}
@@ -58,22 +50,14 @@ public static <T> void run(
5850
private final Iterator<T> iterator;
5951
private final BiConsumer<Releasable, T> itemConsumer;
6052
private final Semaphore permits;
61-
private final Runnable onItemCompletion;
62-
63-
private ThrottledIterator(
64-
Iterator<T> iterator,
65-
BiConsumer<Releasable, T> itemConsumer,
66-
int maxConcurrency,
67-
Runnable onItemCompletion,
68-
Runnable onCompletion
69-
) {
53+
54+
private ThrottledIterator(Iterator<T> iterator, BiConsumer<Releasable, T> itemConsumer, int maxConcurrency, Runnable onCompletion) {
7055
this.iterator = Objects.requireNonNull(iterator);
7156
this.itemConsumer = Objects.requireNonNull(itemConsumer);
7257
if (maxConcurrency <= 0) {
7358
throw new IllegalArgumentException("maxConcurrency must be positive");
7459
}
7560
this.permits = new Semaphore(maxConcurrency);
76-
this.onItemCompletion = Objects.requireNonNull(onItemCompletion);
7761
this.refs = AbstractRefCounted.of(onCompletion);
7862
}
7963

@@ -114,26 +98,19 @@ private class ItemRefCounted extends AbstractRefCounted implements Releasable {
11498

11599
@Override
116100
protected void closeInternal() {
101+
permits.release();
117102
try {
118-
onItemCompletion.run();
119-
} catch (Exception e) {
120-
logger.error("exception in onItemCompletion", e);
121-
assert false : e;
122-
} finally {
123-
permits.release();
124-
try {
125-
// Someone must now pick up the next item. Here we might be called from the run() invocation which started processing
126-
// the just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks
127-
// finished first. If so, there's no need to call run() here, we can just return and the next iteration of the run()
128-
// loop will continue the processing; moreover calling run() in this situation could lead to a stack overflow. However
129-
// if we're not within that run() invocation then ...
130-
if (isRecursive() == false) {
131-
// ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here.
132-
run();
133-
}
134-
} finally {
135-
refs.decRef();
103+
// Someone must now pick up the next item. Here we might be called from the run() invocation which started processing the
104+
// just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks finished
105+
// first. If so, there's no need to call run() here, we can just return and the next iteration of the run() loop will
106+
// continue the processing; moreover calling run() in this situation could lead to a stack overflow. However if we're not
107+
// within that run() invocation then ...
108+
if (isRecursive() == false) {
109+
// ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here.
110+
run();
136111
}
112+
} finally {
113+
refs.decRef();
137114
}
138115
}
139116

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,6 @@ private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> lis
12121212
l -> new IndexSnapshotsDeletion(indexId).run(l)
12131213
),
12141214
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
1215-
() -> {},
12161215
listeners::close
12171216
);
12181217
}

server/src/test/java/org/elasticsearch/common/util/concurrent/ThrottledIteratorTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ public void testConcurrency() throws InterruptedException {
6363
final var blockPermits = new Semaphore(between(0, Math.min(maxRelaxedThreads, maxConcurrency) - 1));
6464

6565
ThrottledIterator.run(IntStream.range(0, items).boxed().iterator(), (releasable, item) -> {
66-
try (var refs = new RefCountingRunnable(releasable::close)) {
66+
try (var refs = new RefCountingRunnable(() -> {
67+
completedItems.incrementAndGet();
68+
releasable.close();
69+
})) {
6770
assertTrue(itemPermits.tryAcquire());
6871
if (forkSupplier.getAsBoolean()) {
6972
var ref = refs.acquire();
@@ -108,7 +111,7 @@ public void onFailure(Exception e) {
108111
itemPermits.release();
109112
}
110113
}
111-
}, maxConcurrency, completedItems::incrementAndGet, completionLatch::countDown);
114+
}, maxConcurrency, completionLatch::countDown);
112115

113116
assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
114117
assertEquals(items, completedItems.get());

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -538,13 +538,7 @@ smallBlob && rarely(random),
538538
queue.add(ref -> runBlobAnalysis(ref, blobAnalyzeRequest, node));
539539
}
540540

541-
ThrottledIterator.run(
542-
getQueueIterator(),
543-
(ref, task) -> task.accept(ref),
544-
request.getConcurrency(),
545-
() -> {},
546-
requestRefs::close
547-
);
541+
ThrottledIterator.run(getQueueIterator(), (ref, task) -> task.accept(ref), request.getConcurrency(), requestRefs::close);
548542
}
549543

550544
private boolean rarely(Random random) {

x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/integrity/RepositoryIntegrityVerifier.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3030
import org.elasticsearch.core.Nullable;
3131
import org.elasticsearch.core.Releasable;
32+
import org.elasticsearch.core.Releasables;
3233
import org.elasticsearch.core.Strings;
3334
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
3435
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
@@ -824,7 +825,12 @@ private static <T> void runThrottled(
824825
AtomicLong progressCounter,
825826
Releasable onCompletion
826827
) {
827-
ThrottledIterator.run(iterator, itemConsumer, maxConcurrency, progressCounter::incrementAndGet, onCompletion::close);
828+
ThrottledIterator.run(
829+
iterator,
830+
(ref, item) -> itemConsumer.accept(Releasables.wrap(progressCounter::incrementAndGet, ref), item),
831+
maxConcurrency,
832+
onCompletion::close
833+
);
828834
}
829835

830836
private RepositoryVerifyIntegrityResponseChunk.Builder anomaly(String anomaly) {

0 commit comments

Comments
 (0)