Skip to content

Commit 5cfe4f4

Browse files
authored
Not propagating TimeoutException from Retry2::awaitClose (elastic#92773)
Logging a message rather than propagating a TimeoutException from Retry2::awaitClose
1 parent ff793d4 commit 5cfe4f4

File tree

4 files changed

+19
-10
lines changed

4 files changed

+19
-10
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor2.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.util.Objects;
2727
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.TimeoutException;
2928
import java.util.concurrent.atomic.AtomicLong;
3029
import java.util.function.BiConsumer;
3130

@@ -237,7 +236,7 @@ public static Builder builder(
237236
* @param unit The time unit of the {@code timeout} argument
238237
* @throws InterruptedException If the current thread is interrupted
239238
*/
240-
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
239+
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
241240
synchronized (mutex) {
242241
if (closed) {
243242
return;

server/src/main/java/org/elasticsearch/action/bulk/Retry2.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,18 @@ private void retry(
119119
* @param timeout
120120
* @param unit
121121
*/
122-
void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
122+
void awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
123123
isClosing = true;
124124
/*
125125
* This removes the party that was placed in the phaser at initialization so that the phaser will terminate once all in-flight
126126
* requests have been completed (i.e. this makes it possible that the number of parties can become 0).
127127
*/
128128
inFlightRequestsPhaser.arriveAndDeregister();
129-
inFlightRequestsPhaser.awaitAdvanceInterruptibly(0, timeout, unit);
129+
try {
130+
inFlightRequestsPhaser.awaitAdvanceInterruptibly(0, timeout, unit);
131+
} catch (TimeoutException e) {
132+
logger.debug("Timed out waiting for all requests to complete during awaitClose");
133+
}
130134
}
131135

132136
/**

server/src/test/java/org/elasticsearch/action/bulk/Retry2Tests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.support.PlainActionFuture;
1515
import org.elasticsearch.action.update.UpdateRequest;
1616
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
17+
import org.elasticsearch.core.TimeValue;
1718
import org.elasticsearch.index.shard.ShardId;
1819
import org.elasticsearch.test.ESTestCase;
1920
import org.elasticsearch.test.client.NoOpClient;
@@ -26,11 +27,12 @@
2627
import java.util.Map;
2728
import java.util.concurrent.CountDownLatch;
2829
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.TimeoutException;
3030
import java.util.concurrent.atomic.AtomicInteger;
3131

3232
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3334
import static org.hamcrest.Matchers.instanceOf;
35+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3436
import static org.hamcrest.Matchers.notNullValue;
3537
import static org.hamcrest.Matchers.nullValue;
3638

@@ -176,7 +178,7 @@ public void testAwaitClose() throws Exception {
176178
retry.awaitClose(1, TimeUnit.SECONDS);
177179
}
178180
/*
179-
* Most requests are complete but one request is hung so awaitClose ought to throw a TimeoutException
181+
* Most requests are complete but one request is hung so awaitClose ought to wait the full timeout period
180182
*/
181183
{
182184
Retry2 retry = new Retry2(CALLS_TO_FAIL);
@@ -200,7 +202,14 @@ public void testAwaitClose() throws Exception {
200202
retry.consumeRequestWithRetries((bulkRequest1, listener1) -> {
201203
// never calls onResponse or onFailure
202204
}, bulkRequest, listener);
203-
expectThrows(TimeoutException.class, () -> retry.awaitClose(200, TimeUnit.MILLISECONDS));
205+
long waitTimeMillis = randomLongBetween(20, 200);
206+
// Make sure that awaitClose completes without exception, and takes at least waitTimeMillis
207+
long startTimeNanos = System.nanoTime();
208+
retry.awaitClose(waitTimeMillis, TimeUnit.MILLISECONDS);
209+
long stopTimeNanos = System.nanoTime();
210+
long runtimeMillis = TimeValue.timeValueNanos((stopTimeNanos - startTimeNanos)).millis();
211+
assertThat(runtimeMillis, greaterThanOrEqualTo(waitTimeMillis));
212+
assertThat(runtimeMillis, lessThanOrEqualTo(2 * waitTimeMillis));
204213
}
205214
}
206215

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Map;
3434
import java.util.Objects;
3535
import java.util.concurrent.TimeUnit;
36-
import java.util.concurrent.TimeoutException;
3736
import java.util.stream.Collectors;
3837

3938
import static java.util.stream.Collectors.joining;
@@ -196,8 +195,6 @@ public void close() {
196195
} catch (InterruptedException e) {
197196
logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e);
198197
Thread.currentThread().interrupt();
199-
} catch (TimeoutException e) {
200-
logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e);
201198
}
202199
}
203200

0 commit comments

Comments
 (0)