|
28 | 28 | import static org.junit.Assert.fail; |
29 | 29 |
|
30 | 30 | import com.google.common.annotations.VisibleForTesting; |
31 | | -import com.google.common.base.Function; |
32 | 31 | import com.google.common.base.Throwables; |
33 | 32 | import com.google.common.collect.ImmutableList; |
34 | 33 | import com.google.common.io.ByteStreams; |
|
120 | 119 | import javax.annotation.Nullable; |
121 | 120 | import javax.net.ssl.SSLPeerUnverifiedException; |
122 | 121 | import javax.net.ssl.SSLSession; |
123 | | -import org.HdrHistogram.Histogram; |
124 | 122 | import org.junit.After; |
125 | 123 | import org.junit.Assert; |
126 | 124 | import org.junit.Assume; |
@@ -1681,235 +1679,6 @@ public void getServerAddressAndLocalAddressFromClient() { |
1681 | 1679 | assertNotNull(obtainLocalClientAddr()); |
1682 | 1680 | } |
1683 | 1681 |
|
1684 | | - private static class SoakIterationResult { |
1685 | | - public SoakIterationResult(long latencyMs, Status status) { |
1686 | | - this.latencyMs = latencyMs; |
1687 | | - this.status = status; |
1688 | | - } |
1689 | | - |
1690 | | - public long getLatencyMs() { |
1691 | | - return latencyMs; |
1692 | | - } |
1693 | | - |
1694 | | - public Status getStatus() { |
1695 | | - return status; |
1696 | | - } |
1697 | | - |
1698 | | - private long latencyMs = -1; |
1699 | | - private Status status = Status.OK; |
1700 | | - } |
1701 | | - |
1702 | | - |
1703 | | - private static class ThreadResults { |
1704 | | - private int threadFailures = 0; |
1705 | | - private int iterationsDone = 0; |
1706 | | - private Histogram latencies = new Histogram(4); |
1707 | | - |
1708 | | - public int getThreadFailures() { |
1709 | | - return threadFailures; |
1710 | | - } |
1711 | | - |
1712 | | - public int getIterationsDone() { |
1713 | | - return iterationsDone; |
1714 | | - } |
1715 | | - |
1716 | | - public Histogram getLatencies() { |
1717 | | - return latencies; |
1718 | | - } |
1719 | | - } |
1720 | | - |
1721 | | - private SoakIterationResult performOneSoakIteration( |
1722 | | - TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize) |
1723 | | - throws InterruptedException { |
1724 | | - long startNs = System.nanoTime(); |
1725 | | - Status status = Status.OK; |
1726 | | - try { |
1727 | | - final SimpleRequest request = |
1728 | | - SimpleRequest.newBuilder() |
1729 | | - .setResponseSize(soakResponseSize) |
1730 | | - .setPayload( |
1731 | | - Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakRequestSize]))) |
1732 | | - .build(); |
1733 | | - final SimpleResponse goldenResponse = |
1734 | | - SimpleResponse.newBuilder() |
1735 | | - .setPayload( |
1736 | | - Payload.newBuilder().setBody(ByteString.copyFrom(new byte[soakResponseSize]))) |
1737 | | - .build(); |
1738 | | - assertResponse(goldenResponse, soakStub.unaryCall(request)); |
1739 | | - } catch (StatusRuntimeException e) { |
1740 | | - status = e.getStatus(); |
1741 | | - } |
1742 | | - long elapsedNs = System.nanoTime() - startNs; |
1743 | | - return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status); |
1744 | | - } |
1745 | | - |
1746 | | - /** |
1747 | | - * Runs large unary RPCs in a loop with configurable failure thresholds |
1748 | | - * and channel creation behavior. |
1749 | | - */ |
1750 | | - public void performSoakTest( |
1751 | | - String serverUri, |
1752 | | - int soakIterations, |
1753 | | - int maxFailures, |
1754 | | - int maxAcceptablePerIterationLatencyMs, |
1755 | | - int minTimeMsBetweenRpcs, |
1756 | | - int overallTimeoutSeconds, |
1757 | | - int soakRequestSize, |
1758 | | - int soakResponseSize, |
1759 | | - int numThreads, |
1760 | | - Function<ManagedChannel, ManagedChannel> createNewChannel) |
1761 | | - throws InterruptedException { |
1762 | | - if (soakIterations % numThreads != 0) { |
1763 | | - throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads."); |
1764 | | - } |
1765 | | - ManagedChannel sharedChannel = createChannel(); |
1766 | | - long startNs = System.nanoTime(); |
1767 | | - Thread[] threads = new Thread[numThreads]; |
1768 | | - int soakIterationsPerThread = soakIterations / numThreads; |
1769 | | - List<ThreadResults> threadResultsList = new ArrayList<>(numThreads); |
1770 | | - for (int i = 0; i < numThreads; i++) { |
1771 | | - threadResultsList.add(new ThreadResults()); |
1772 | | - } |
1773 | | - for (int threadInd = 0; threadInd < numThreads; threadInd++) { |
1774 | | - final int currentThreadInd = threadInd; |
1775 | | - threads[threadInd] = new Thread(() -> { |
1776 | | - try { |
1777 | | - executeSoakTestInThread( |
1778 | | - soakIterationsPerThread, |
1779 | | - startNs, |
1780 | | - minTimeMsBetweenRpcs, |
1781 | | - soakRequestSize, |
1782 | | - soakResponseSize, |
1783 | | - maxAcceptablePerIterationLatencyMs, |
1784 | | - overallTimeoutSeconds, |
1785 | | - serverUri, |
1786 | | - threadResultsList.get(currentThreadInd), |
1787 | | - sharedChannel, |
1788 | | - createNewChannel); |
1789 | | - } catch (InterruptedException e) { |
1790 | | - Thread.currentThread().interrupt(); |
1791 | | - throw new RuntimeException("Thread interrupted: " + e.getMessage(), e); |
1792 | | - } |
1793 | | - }); |
1794 | | - threads[threadInd].start(); |
1795 | | - } |
1796 | | - for (Thread thread : threads) { |
1797 | | - thread.join(); |
1798 | | - } |
1799 | | - |
1800 | | - int totalFailures = 0; |
1801 | | - int iterationsDone = 0; |
1802 | | - Histogram latencies = new Histogram(4); |
1803 | | - for (ThreadResults threadResult :threadResultsList) { |
1804 | | - totalFailures += threadResult.getThreadFailures(); |
1805 | | - iterationsDone += threadResult.getIterationsDone(); |
1806 | | - latencies.add(threadResult.getLatencies()); |
1807 | | - } |
1808 | | - System.err.println( |
1809 | | - String.format( |
1810 | | - Locale.US, |
1811 | | - "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. " |
1812 | | - + "p50: %d ms, p90: %d ms, p100: %d ms", |
1813 | | - serverUri, |
1814 | | - iterationsDone, |
1815 | | - soakIterations, |
1816 | | - totalFailures, |
1817 | | - latencies.getValueAtPercentile(50), |
1818 | | - latencies.getValueAtPercentile(90), |
1819 | | - latencies.getValueAtPercentile(100))); |
1820 | | - // check if we timed out |
1821 | | - String timeoutErrorMessage = |
1822 | | - String.format( |
1823 | | - Locale.US, |
1824 | | - "(server_uri: %s) soak test consumed all %d seconds of time and quit early, " |
1825 | | - + "only having ran %d out of desired %d iterations.", |
1826 | | - serverUri, |
1827 | | - overallTimeoutSeconds, |
1828 | | - iterationsDone, |
1829 | | - soakIterations); |
1830 | | - assertEquals(timeoutErrorMessage, iterationsDone, soakIterations); |
1831 | | - // check if we had too many failures |
1832 | | - String tooManyFailuresErrorMessage = |
1833 | | - String.format( |
1834 | | - Locale.US, |
1835 | | - "(server_uri: %s) soak test total failures: %d exceeds max failures " |
1836 | | - + "threshold: %d.", |
1837 | | - serverUri, totalFailures, maxFailures); |
1838 | | - assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); |
1839 | | - shutdownChannel(sharedChannel); |
1840 | | - } |
1841 | | - |
1842 | | - private void shutdownChannel(ManagedChannel channel) throws InterruptedException { |
1843 | | - if (channel != null) { |
1844 | | - channel.shutdownNow(); |
1845 | | - channel.awaitTermination(10, TimeUnit.SECONDS); |
1846 | | - } |
1847 | | - } |
1848 | | - |
1849 | | - protected ManagedChannel createNewChannel(ManagedChannel currentChannel) { |
1850 | | - try { |
1851 | | - shutdownChannel(currentChannel); |
1852 | | - return createChannel(); |
1853 | | - } catch (InterruptedException e) { |
1854 | | - throw new RuntimeException("Interrupted while creating a new channel", e); |
1855 | | - } |
1856 | | - } |
1857 | | - |
1858 | | - private void executeSoakTestInThread( |
1859 | | - int soakIterationsPerThread, |
1860 | | - long startNs, |
1861 | | - int minTimeMsBetweenRpcs, |
1862 | | - int soakRequestSize, |
1863 | | - int soakResponseSize, |
1864 | | - int maxAcceptablePerIterationLatencyMs, |
1865 | | - int overallTimeoutSeconds, |
1866 | | - String serverUri, |
1867 | | - ThreadResults threadResults, |
1868 | | - ManagedChannel sharedChannel, |
1869 | | - Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException { |
1870 | | - ManagedChannel currentChannel = sharedChannel; |
1871 | | - for (int i = 0; i < soakIterationsPerThread; i++) { |
1872 | | - if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { |
1873 | | - break; |
1874 | | - } |
1875 | | - long earliestNextStartNs = System.nanoTime() |
1876 | | - + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); |
1877 | | - // recordClientCallInterceptor takes an AtomicReference. |
1878 | | - AtomicReference<ClientCall<?, ?>> soakThreadClientCallCapture = new AtomicReference<>(); |
1879 | | - currentChannel = maybeCreateChannel.apply(currentChannel); |
1880 | | - TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc |
1881 | | - .newBlockingStub(currentChannel) |
1882 | | - .withInterceptors(recordClientCallInterceptor(soakThreadClientCallCapture)); |
1883 | | - SoakIterationResult result = performOneSoakIteration(currentStub, |
1884 | | - soakRequestSize, soakResponseSize); |
1885 | | - SocketAddress peer = soakThreadClientCallCapture |
1886 | | - .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); |
1887 | | - StringBuilder logStr = new StringBuilder( |
1888 | | - String.format( |
1889 | | - Locale.US, |
1890 | | - "thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", |
1891 | | - Thread.currentThread().getId(), |
1892 | | - i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); |
1893 | | - if (!result.getStatus().equals(Status.OK)) { |
1894 | | - threadResults.threadFailures++; |
1895 | | - logStr.append(String.format(" failed: %s", result.getStatus())); |
1896 | | - } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { |
1897 | | - threadResults.threadFailures++; |
1898 | | - logStr.append( |
1899 | | - " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); |
1900 | | - } else { |
1901 | | - logStr.append(" succeeded"); |
1902 | | - } |
1903 | | - System.err.println(logStr.toString()); |
1904 | | - threadResults.iterationsDone++; |
1905 | | - threadResults.getLatencies().recordValue(result.getLatencyMs()); |
1906 | | - long remainingNs = earliestNextStartNs - System.nanoTime(); |
1907 | | - if (remainingNs > 0) { |
1908 | | - TimeUnit.NANOSECONDS.sleep(remainingNs); |
1909 | | - } |
1910 | | - } |
1911 | | - } |
1912 | | - |
1913 | 1682 | private static void assertSuccess(StreamRecorder<?> recorder) { |
1914 | 1683 | if (recorder.getError() != null) { |
1915 | 1684 | throw new AssertionError(recorder.getError()); |
|
0 commit comments