Skip to content

Commit 6a92a2a

Browse files
authored
interop-testing: Add concurrency condition to the soak test using existing blocking api
The goal of this PR is to increase the test coverage of the C2P E2E load test by improving the rpc_soak and channel_soak tests to support concurrency. **rpc_soak:** The client performs many large_unary RPCs in sequence over the same channel. The test can run in either a concurrent or non-concurrent mode, depending on the number of threads specified (soak_num_threads): - Non-Concurrent Mode: When soak_num_threads = 1, all RPCs are performed sequentially on a single thread. - Concurrent Mode: When soak_num_threads > 1, the client uses multiple threads to distribute the workload. Each thread performs a portion of the total soak_iterations, executing its own set of RPCs concurrently. **channel_soak:** Similar to rpc_soak, but this time each RPC is performed on a new channel. The channel is created just before each RPC and is destroyed just after. Note on Concurrent Execution and Channel Creation: In a concurrent execution setting (i.e., when soak_num_threads > 1), each thread performs a portion of the total soak_iterations and creates and destroys its own channel for each RPC iteration. - createNewChannel Function: In channel_soak, the createNewChannel function is used by each thread to create a new channel before every RPC. This function ensures that each RPC has a separate channel, preventing race conditions by isolating channels between threads. It shuts down the previous channel (if any) and creates a new one for each iteration, ensuring accurate latency measurement per RPC. - Thread-specific logs will include the thread_id, helping to track performance across threads, especially when each thread is managing its own channel lifecycle.
1 parent 4ae04b7 commit 6a92a2a

File tree

3 files changed

+197
-84
lines changed

3 files changed

+197
-84
lines changed

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 142 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.junit.Assert.fail;
2929

3030
import com.google.common.annotations.VisibleForTesting;
31+
import com.google.common.base.Function;
3132
import com.google.common.base.Throwables;
3233
import com.google.common.collect.ImmutableList;
3334
import com.google.common.io.ByteStreams;
@@ -1698,9 +1699,28 @@ public Status getStatus() {
16981699
private Status status = Status.OK;
16991700
}
17001701

1702+
1703+
private static class ThreadResults {
1704+
private int threadFailures = 0;
1705+
private int iterationsDone = 0;
1706+
private Histogram latencies = new Histogram(4);
1707+
1708+
public int getThreadFailures() {
1709+
return threadFailures;
1710+
}
1711+
1712+
public int getIterationsDone() {
1713+
return iterationsDone;
1714+
}
1715+
1716+
public Histogram getLatencies() {
1717+
return latencies;
1718+
}
1719+
}
1720+
17011721
private SoakIterationResult performOneSoakIteration(
17021722
TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize)
1703-
throws Exception {
1723+
throws InterruptedException {
17041724
long startNs = System.nanoTime();
17051725
Status status = Status.OK;
17061726
try {
@@ -1724,71 +1744,67 @@ private SoakIterationResult performOneSoakIteration(
17241744
}
17251745

17261746
/**
1727-
* Runs large unary RPCs in a loop with configurable failure thresholds
1728-
* and channel creation behavior.
1747+
* Runs large unary RPCs in a loop with configurable failure thresholds
1748+
* and channel creation behavior.
17291749
*/
17301750
public void performSoakTest(
17311751
String serverUri,
1732-
boolean resetChannelPerIteration,
17331752
int soakIterations,
17341753
int maxFailures,
17351754
int maxAcceptablePerIterationLatencyMs,
17361755
int minTimeMsBetweenRpcs,
17371756
int overallTimeoutSeconds,
17381757
int soakRequestSize,
1739-
int soakResponseSize)
1740-
throws Exception {
1741-
int iterationsDone = 0;
1742-
int totalFailures = 0;
1743-
Histogram latencies = new Histogram(4 /* number of significant value digits */);
1758+
int soakResponseSize,
1759+
int numThreads,
1760+
Function<ManagedChannel, ManagedChannel> createNewChannel)
1761+
throws InterruptedException {
1762+
if (soakIterations % numThreads != 0) {
1763+
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
1764+
}
1765+
ManagedChannel sharedChannel = createChannel();
17441766
long startNs = System.nanoTime();
1745-
ManagedChannel soakChannel = createChannel();
1746-
TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc
1747-
.newBlockingStub(soakChannel)
1748-
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
1749-
for (int i = 0; i < soakIterations; i++) {
1750-
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
1751-
break;
1752-
}
1753-
long earliestNextStartNs = System.nanoTime()
1754-
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
1755-
if (resetChannelPerIteration) {
1756-
soakChannel.shutdownNow();
1757-
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
1758-
soakChannel = createChannel();
1759-
soakStub = TestServiceGrpc
1760-
.newBlockingStub(soakChannel)
1761-
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
1762-
}
1763-
SoakIterationResult result =
1764-
performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize);
1765-
SocketAddress peer = clientCallCapture
1766-
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
1767-
StringBuilder logStr = new StringBuilder(
1768-
String.format(
1769-
Locale.US,
1770-
"soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
1771-
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
1772-
if (!result.getStatus().equals(Status.OK)) {
1773-
totalFailures++;
1774-
logStr.append(String.format(" failed: %s", result.getStatus()));
1775-
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
1776-
totalFailures++;
1777-
logStr.append(
1778-
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
1779-
} else {
1780-
logStr.append(" succeeded");
1781-
}
1782-
System.err.println(logStr.toString());
1783-
iterationsDone++;
1784-
latencies.recordValue(result.getLatencyMs());
1785-
long remainingNs = earliestNextStartNs - System.nanoTime();
1786-
if (remainingNs > 0) {
1787-
TimeUnit.NANOSECONDS.sleep(remainingNs);
1788-
}
1767+
Thread[] threads = new Thread[numThreads];
1768+
int soakIterationsPerThread = soakIterations / numThreads;
1769+
List<ThreadResults> threadResultsList = new ArrayList<>(numThreads);
1770+
for (int i = 0; i < numThreads; i++) {
1771+
threadResultsList.add(new ThreadResults());
1772+
}
1773+
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
1774+
final int currentThreadInd = threadInd;
1775+
threads[threadInd] = new Thread(() -> {
1776+
try {
1777+
executeSoakTestInThread(
1778+
soakIterationsPerThread,
1779+
startNs,
1780+
minTimeMsBetweenRpcs,
1781+
soakRequestSize,
1782+
soakResponseSize,
1783+
maxAcceptablePerIterationLatencyMs,
1784+
overallTimeoutSeconds,
1785+
serverUri,
1786+
threadResultsList.get(currentThreadInd),
1787+
sharedChannel,
1788+
createNewChannel);
1789+
} catch (InterruptedException e) {
1790+
Thread.currentThread().interrupt();
1791+
throw new RuntimeException("Thread interrupted: " + e.getMessage(), e);
1792+
}
1793+
});
1794+
threads[threadInd].start();
1795+
}
1796+
for (Thread thread : threads) {
1797+
thread.join();
1798+
}
1799+
1800+
int totalFailures = 0;
1801+
int iterationsDone = 0;
1802+
Histogram latencies = new Histogram(4);
1803+
for (ThreadResults threadResult :threadResultsList) {
1804+
totalFailures += threadResult.getThreadFailures();
1805+
iterationsDone += threadResult.getIterationsDone();
1806+
latencies.add(threadResult.getLatencies());
17891807
}
1790-
soakChannel.shutdownNow();
1791-
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
17921808
System.err.println(
17931809
String.format(
17941810
Locale.US,
@@ -1820,6 +1836,77 @@ public void performSoakTest(
18201836
+ "threshold: %d.",
18211837
serverUri, totalFailures, maxFailures);
18221838
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
1839+
shutdownChannel(sharedChannel);
1840+
}
1841+
1842+
private void shutdownChannel(ManagedChannel channel) throws InterruptedException {
1843+
if (channel != null) {
1844+
channel.shutdownNow();
1845+
channel.awaitTermination(10, TimeUnit.SECONDS);
1846+
}
1847+
}
1848+
1849+
protected ManagedChannel createNewChannel(ManagedChannel currentChannel) {
1850+
try {
1851+
shutdownChannel(currentChannel);
1852+
return createChannel();
1853+
} catch (InterruptedException e) {
1854+
throw new RuntimeException("Interrupted while creating a new channel", e);
1855+
}
1856+
}
1857+
1858+
private void executeSoakTestInThread(
1859+
int soakIterationsPerThread,
1860+
long startNs,
1861+
int minTimeMsBetweenRpcs,
1862+
int soakRequestSize,
1863+
int soakResponseSize,
1864+
int maxAcceptablePerIterationLatencyMs,
1865+
int overallTimeoutSeconds,
1866+
String serverUri,
1867+
ThreadResults threadResults,
1868+
ManagedChannel sharedChannel,
1869+
Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException {
1870+
ManagedChannel currentChannel = sharedChannel;
1871+
for (int i = 0; i < soakIterationsPerThread; i++) {
1872+
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
1873+
break;
1874+
}
1875+
long earliestNextStartNs = System.nanoTime()
1876+
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
1877+
1878+
currentChannel = maybeCreateChannel.apply(currentChannel);
1879+
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
1880+
.newBlockingStub(currentChannel)
1881+
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
1882+
SoakIterationResult result = performOneSoakIteration(currentStub,
1883+
soakRequestSize, soakResponseSize);
1884+
SocketAddress peer = clientCallCapture
1885+
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
1886+
StringBuilder logStr = new StringBuilder(
1887+
String.format(
1888+
Locale.US,
1889+
"thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
1890+
Thread.currentThread().getId(),
1891+
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
1892+
if (!result.getStatus().equals(Status.OK)) {
1893+
threadResults.threadFailures++;
1894+
logStr.append(String.format(" failed: %s", result.getStatus()));
1895+
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
1896+
threadResults.threadFailures++;
1897+
logStr.append(
1898+
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
1899+
} else {
1900+
logStr.append(" succeeded");
1901+
}
1902+
System.err.println(logStr.toString());
1903+
threadResults.iterationsDone++;
1904+
threadResults.getLatencies().recordValue(result.getLatencyMs());
1905+
long remainingNs = earliestNextStartNs - System.nanoTime();
1906+
if (remainingNs > 0) {
1907+
TimeUnit.NANOSECONDS.sleep(remainingNs);
1908+
}
1909+
}
18231910
}
18241911

18251912
private static void assertSuccess(StreamRecorder<?> recorder) {

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public static void main(String[] args) throws Exception {
134134
soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000;
135135
private int soakRequestSize = 271828;
136136
private int soakResponseSize = 314159;
137+
private int numThreads = 1;
137138
private String additionalMetadata = "";
138139
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;
139140

@@ -214,6 +215,8 @@ void parseArgs(String[] args) throws Exception {
214215
soakRequestSize = Integer.parseInt(value);
215216
} else if ("soak_response_size".equals(key)) {
216217
soakResponseSize = Integer.parseInt(value);
218+
} else if ("soak_num_threads".equals(key)) {
219+
numThreads = Integer.parseInt(value);
217220
} else if ("additional_metadata".equals(key)) {
218221
additionalMetadata = value;
219222
} else {
@@ -290,6 +293,9 @@ void parseArgs(String[] args) throws Exception {
290293
+ "\n --soak_response_size "
291294
+ "\n The response size in a soak RPC. Default "
292295
+ c.soakResponseSize
296+
+ "\n --soak_num_threads The number of threads for concurrent execution of the "
297+
+ "\n soak tests (rpc_soak or channel_soak). Default "
298+
+ c.numThreads
293299
+ "\n --additional_metadata "
294300
+ "\n Additional metadata to send in each request, as a "
295301
+ "\n semicolon-separated list of key:value pairs. Default "
@@ -519,30 +525,31 @@ private void runTest(TestCases testCase) throws Exception {
519525
case RPC_SOAK: {
520526
tester.performSoakTest(
521527
serverHost,
522-
false /* resetChannelPerIteration */,
523528
soakIterations,
524529
soakMaxFailures,
525530
soakPerIterationMaxAcceptableLatencyMs,
526531
soakMinTimeMsBetweenRpcs,
527532
soakOverallTimeoutSeconds,
528533
soakRequestSize,
529-
soakResponseSize);
534+
soakResponseSize,
535+
numThreads,
536+
(currentChannel) -> currentChannel);
530537
break;
531538
}
532539

533540
case CHANNEL_SOAK: {
534541
tester.performSoakTest(
535542
serverHost,
536-
true /* resetChannelPerIteration */,
537543
soakIterations,
538544
soakMaxFailures,
539545
soakPerIterationMaxAcceptableLatencyMs,
540546
soakMinTimeMsBetweenRpcs,
541547
soakOverallTimeoutSeconds,
542548
soakRequestSize,
543-
soakResponseSize);
549+
soakResponseSize,
550+
numThreads,
551+
(currentChannel) -> tester.createNewChannel(currentChannel));
544552
break;
545-
546553
}
547554

548555
case ORCA_PER_RPC: {

interop-testing/src/main/java/io/grpc/testing/integration/XdsFederationTestClient.java

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -245,29 +245,41 @@ public boolean runSucceeded() {
245245
/**
246246
* Run the intended soak test.
247247
*/
248-
public void run() {
249-
boolean resetChannelPerIteration;
250-
switch (testCase) {
251-
case "rpc_soak":
252-
resetChannelPerIteration = false;
253-
break;
254-
case "channel_soak":
255-
resetChannelPerIteration = true;
256-
break;
257-
default:
258-
throw new RuntimeException("invalid testcase: " + testCase);
259-
}
248+
public void run() throws InterruptedException {
260249
try {
261-
performSoakTest(
262-
serverUri,
263-
resetChannelPerIteration,
264-
soakIterations,
265-
soakMaxFailures,
266-
soakPerIterationMaxAcceptableLatencyMs,
267-
soakMinTimeMsBetweenRpcs,
268-
soakOverallTimeoutSeconds,
269-
soakRequestSize,
270-
soakResponseSize);
250+
switch (testCase) {
251+
case "rpc_soak": {
252+
performSoakTest(
253+
serverUri,
254+
soakIterations,
255+
soakMaxFailures,
256+
soakPerIterationMaxAcceptableLatencyMs,
257+
soakMinTimeMsBetweenRpcs,
258+
soakOverallTimeoutSeconds,
259+
soakRequestSize,
260+
soakResponseSize,
261+
1,
262+
(currentChannel) -> currentChannel);
263+
}
264+
break;
265+
case "channel_soak": {
266+
performSoakTest(
267+
serverUri,
268+
soakIterations,
269+
soakMaxFailures,
270+
soakPerIterationMaxAcceptableLatencyMs,
271+
soakMinTimeMsBetweenRpcs,
272+
soakOverallTimeoutSeconds,
273+
soakRequestSize,
274+
soakResponseSize,
275+
1,
276+
(currentChannel) -> createNewChannel(currentChannel));
277+
}
278+
break;
279+
default:
280+
throw new RuntimeException("invalid testcase: " + testCase);
281+
}
282+
271283
logger.info("Test case: " + testCase + " done for server: " + serverUri);
272284
runSucceeded = true;
273285
} catch (Exception e) {
@@ -295,11 +307,18 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
295307
}
296308
}
297309

298-
private void run() throws Exception {
310+
private void run() throws InterruptedException {
299311
logger.info("Begin test case: " + testCase);
300312
ArrayList<Thread> threads = new ArrayList<>();
301313
for (InnerClient c : clients) {
302-
Thread t = new Thread(c::run);
314+
Thread t = new Thread(() -> {
315+
try {
316+
c.run();
317+
} catch (InterruptedException e) {
318+
Thread.currentThread().interrupt(); // Properly re-interrupt the thread
319+
throw new RuntimeException("Thread was interrupted during execution", e);
320+
}
321+
});
303322
t.start();
304323
threads.add(t);
305324
}

0 commit comments

Comments
 (0)