Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.client.RestClient.NodeTuple;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -37,9 +39,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -432,4 +437,98 @@ private static void assertNodes(NodeTuple<List<Node>> nodeTuple, AtomicInteger l
}
}
}

/**
* Helper method to await a CountDownLatch, wrapping InterruptedException
*/
private static void awaitCountDownLatch(CountDownLatch latch) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for latch", e);
}
}

/**
* Demonstrates a deadlock scenario in RestClient where:
* <ol>
* <li>The Apache IO thread holds the connection pool lock and attempts to acquire
* the RequestCancellable monitor (via onFailure callback)</li>
* <li>The application thread holds the RequestCancellable monitor and attempts to
* acquire the connection pool lock (via synchronous retry of a different request)</li>
* </ol>
*
* This test succeeds when the deadlock is detected by the JVM and fails if it doesn't detect a deadlock.
*/
public void testDeadlockBetweenConnectionPoolAndCancellableMonitor() throws Exception {
// Construct a RestClient pointing to localhost with an invalid port to reliably make connection requests fail.
// Intentionally not closing this client because it will also deadlock on the connection pool lock during
// cleanup and cause the test to hang indefinitely.
RestClient client = RestClient.builder(new HttpHost("127.0.0.1", 1, "http")).build();

CountDownLatch isConnectionPoolLockHeld = new CountDownLatch(1);
CountDownLatch cancellableMonitorHeld = new CountDownLatch(1);

// Perform a notional request that mimics the synchronous retry-upon-failure pattern implemented in
// RestClient#performRequestAsync(NodeTuple, InternalRequest, FailureTrackingResponseListener)
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
cancellable.set(client.performRequestAsync(new Request("GET", "/test/endpoint"), new ResponseListener() {
@Override
public void onSuccess(Response _response) {
// No-op
}

@Override
public void onFailure(Exception _exception) {
// This method is invoked on the Apache IO thread through AbstractNIOConnPool#requestFailed
isConnectionPoolLockHeld.countDown();

// Simulates the application thread holding the original RequestCancellable object monitor before
// issuing a synchronous retry for a different RequestCancellable whose request completed with an
// exception (e.g. 4xx or 5xx HTTP error).
awaitCountDownLatch(cancellableMonitorHeld);
cancellable.get().runIfNotCancelled(() -> {});
}
}));

// Don't block the test thread for assertions
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
executor.execute(() -> {
awaitCountDownLatch(isConnectionPoolLockHeld);
cancellable.get().runIfNotCancelled(() -> {
cancellableMonitorHeld.countDown();

// Simulate the synchronous retry-upon-failure for a different request, which will cause a deadlock
// on the connection pool lock
client.performRequestAsync(new Request("GET", "/test/endpoint"), new ResponseListener() {
@Override
public void onSuccess(Response _response) {}

@Override
public void onFailure(Exception _exception) {}
});
});
});

// Poll for deadlock detection
boolean deadlockDetected = false;
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
for (int i = 0; i < 10; i++) {
if (threadMxBean.findDeadlockedThreads() != null) {
deadlockDetected = true;
break;
}
Thread.sleep(1_000);
}

if (!deadlockDetected) {
fail("No deadlock detected after 10 tries");
}
} finally {
executor.shutdownNow();
}
}

}