Skip to content

Commit 02e7a02

Browse files
Fixing flaky test for request service
1 parent 8ebc808 commit 02e7a02

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,6 @@ tests:
304304
- class: org.elasticsearch.indices.mapping.UpdateMappingIntegrationIT
305305
method: testUpdateMappingConcurrently
306306
issue: https://github.com/elastic/elasticsearch/issues/137758
307-
- class: org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorServiceTests
308-
method: testChangingCapacity_DoesNotRejectsOverflowTasks_BecauseOfQueueFull
309-
issue: https://github.com/elastic/elasticsearch/issues/137823
310307
- class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorMultipleNodesIT
311308
method: testAuthorizationTaskGetsRelocatedToAnotherNode_WhenTheNodeThatIsRunningItShutsDown
312309
issue: https://github.com/elastic/elasticsearch/issues/137911

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.xpack.inference.services.settings.RateLimitSettings;
2727

2828
import java.time.Clock;
29+
import java.time.Duration;
2930
import java.time.Instant;
3031
import java.util.ArrayList;
3132
import java.util.List;
@@ -125,7 +126,10 @@ interface RateLimiterCreator {
125126
private final AtomicInteger rateLimitDivisor = new AtomicInteger(1);
126127
private final ThreadPool threadPool;
127128
private final CountDownLatch startupLatch;
128-
private final CountDownLatch terminationLatch = new CountDownLatch(1);
129+
// Two latches because we have two threads of execution, one thread blocking on a queue for items to be sent immediately, and
130+
// another threads that is scheduled on an interval that checks items that can be rate limited
131+
private final CountDownLatch immediateRequestQueueTerminationLatch = new CountDownLatch(1);
132+
private final CountDownLatch rateLimitedTerminationLatch = new CountDownLatch(1);
129133
private final RequestSender requestSender;
130134
private final RequestExecutorServiceSettings settings;
131135
private final Clock clock;
@@ -184,11 +188,25 @@ public boolean isShutdown() {
184188
}
185189

186190
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
187-
return terminationLatch.await(timeout, unit);
191+
var totalWait = Duration.ofMillis(unit.toMillis(timeout));
192+
193+
var firstAwaitStart = Instant.now();
194+
var firstLatchResult = immediateRequestQueueTerminationLatch.await(timeout, unit);
195+
var firstAwaitEnd = Instant.now();
196+
197+
var remainingWaitTime = totalWait.minus(Duration.between(firstAwaitStart, firstAwaitEnd));
198+
199+
// If the first latch await returns false, we've run out of time
200+
// If the remaining wait time is negative or zero, we've run out of time
201+
if (firstLatchResult == false || remainingWaitTime.isNegative() || remainingWaitTime.isZero()) {
202+
return false;
203+
}
204+
205+
return rateLimitedTerminationLatch.await(remainingWaitTime.toMillis(), TimeUnit.MILLISECONDS);
188206
}
189207

190208
public boolean isTerminated() {
191-
return terminationLatch.getCount() == 0;
209+
return immediateRequestQueueTerminationLatch.getCount() == 0 && rateLimitedTerminationLatch.getCount() == 0;
192210
}
193211

194212
public int queueSize() {
@@ -213,6 +231,7 @@ public void start() {
213231
} catch (Exception e) {
214232
logger.warn("Failed to start request executor", e);
215233
cleanup(CleanupStrategy.RATE_LIMITED_REQUEST_QUEUES_ONLY);
234+
cleanup(CleanupStrategy.REQUEST_QUEUE_ONLY);
216235
}
217236
}
218237

@@ -350,12 +369,16 @@ private void cleanup(CleanupStrategy cleanupStrategy) {
350369
shutdown();
351370

352371
switch (cleanupStrategy) {
353-
case RATE_LIMITED_REQUEST_QUEUES_ONLY -> notifyRateLimitedRequestsOfShutdown();
354-
case REQUEST_QUEUE_ONLY -> rejectRequestsInRequestQueue();
372+
case RATE_LIMITED_REQUEST_QUEUES_ONLY -> {
373+
notifyRateLimitedRequestsOfShutdown();
374+
rateLimitedTerminationLatch.countDown();
375+
}
376+
case REQUEST_QUEUE_ONLY -> {
377+
rejectRequestsInRequestQueue();
378+
immediateRequestQueueTerminationLatch.countDown();
379+
}
355380
default -> logger.error(Strings.format("Unknown clean up strategy for request executor: [%s]", cleanupStrategy.toString()));
356381
}
357-
358-
terminationLatch.countDown();
359382
} catch (Exception e) {
360383
logger.warn("Encountered an error while cleaning up", e);
361384
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorServiceTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void testIsTerminated_IsTrue() throws InterruptedException {
103103
service.shutdown();
104104
service.start();
105105
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
106+
service.awaitTermination(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
106107

107108
assertTrue(service.isTerminated());
108109
}
@@ -115,6 +116,8 @@ public void testCallingStartTwice_ThrowsAssertionException() throws InterruptedE
115116
service.start();
116117
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
117118

119+
service.awaitTermination(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
120+
118121
assertTrue(service.isTerminated());
119122
var exception = expectThrows(AssertionError.class, service::start);
120123
assertThat(exception.getMessage(), is("start() can only be called once"));
@@ -235,13 +238,15 @@ public void testTaskThrowsError_CallsOnFailure() throws InterruptedException {
235238
assertTrue(service.isTerminated());
236239
}
237240

238-
public void testShutdown_AllowsMultipleCalls() {
241+
public void testShutdown_AllowsMultipleCalls() throws InterruptedException {
239242
var service = createRequestExecutorServiceWithMocks();
240243

241244
service.shutdown();
242245
service.shutdown();
243246
service.start();
244247

248+
service.awaitTermination(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
249+
245250
assertTrue(service.isTerminated());
246251
assertTrue(service.isShutdown());
247252
}

0 commit comments

Comments
 (0)