diff --git a/Jenkinsfile b/Jenkinsfile index dd6a5e583a..fba315435a 100755 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -2,5 +2,9 @@ common { slackChannel = '#c3-alerts' - upstreamProjects = 'confluentinc/common' + downStreamRepos = ["schema-registry", "metadata-service", "kafka-rest", + "confluent-security-plugins", "ce-kafka-http-server", "secret-registry", + "confluent-cloud-plugins"] + nanoVersion = true } +//change \ No newline at end of file diff --git a/core/pom.xml b/core/pom.xml index 8bf619d3bf..6a79f3c7fa 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.0.2-SNAPSHOT + 6.1.0-0 rest-utils diff --git a/core/src/main/java/io/confluent/rest/ApplicationServer.java b/core/src/main/java/io/confluent/rest/ApplicationServer.java index f3fa0a4c1d..0ea98fcff2 100644 --- a/core/src/main/java/io/confluent/rest/ApplicationServer.java +++ b/core/src/main/java/io/confluent/rest/ApplicationServer.java @@ -16,6 +16,8 @@ package io.confluent.rest; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.ConfigException; @@ -46,6 +48,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -166,6 +169,33 @@ private void attachMetricsListener(Metrics metrics, Map tags) { } } + private void addJettyThreadPoolMetrics(Metrics metrics, Map tags) { + //add metric for jetty thread pool queue size + String requestQueueSizeName = "request-queue-size"; + String metricGroupName = "jetty-metrics"; + + MetricName requestQueueSizeMetricName = metrics.metricName(requestQueueSizeName, + metricGroupName, "The number of requests in the jetty thread pool queue.", tags); + Gauge queueSize = (config, now) -> getQueueSize(); + metrics.addMetric(requestQueueSizeMetricName, queueSize); + + //add metric for thread pool busy thread count + String busyThreadCountName = "busy-thread-count"; + MetricName busyThreadCountMetricName = metrics.metricName(busyThreadCountName, + metricGroupName, "jetty thread pool busy thread count.", + tags); + Gauge busyThreadCount = (config, now) -> getBusyThreads(); + metrics.addMetric(busyThreadCountMetricName, busyThreadCount); + + //add metric for thread pool usage + String threadPoolUsageName = "thread-pool-usage"; + final MetricName threadPoolUsageMetricName = metrics.metricName(threadPoolUsageName, + metricGroupName, " jetty thread pool usage.", + Collections.emptyMap()); + Gauge threadPoolUsage = (config, now) -> (getBusyThreads() / (double) getMaxThreads()); + metrics.addMetric(threadPoolUsageMetricName, threadPoolUsage); + } + private void finalizeHandlerCollection(HandlerCollection handlers, HandlerCollection wsHandlers) { /* DefaultHandler must come last eo ensure all contexts * have a chance to handle a request first */ @@ -193,6 +223,7 @@ protected final void doStart() throws Exception { HandlerCollection wsHandlers = new HandlerCollection(); for (Application app : applications.getApplications()) { attachMetricsListener(app.getMetrics(), app.getMetricsTags()); + addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags()); handlers.addHandler(app.configureHandler()); wsHandlers.addHandler(app.configureWebSocketHandler()); } @@ -400,6 +431,13 @@ public int getThreads() { return getThreadPool().getThreads(); } + /** + * @return number of busy threads in the pool. + */ + public int getBusyThreads() { + return ((QueuedThreadPool)getThreadPool()).getBusyThreads(); + } + /** * For unit testing. * @@ -410,8 +448,6 @@ public int getMaxThreads() { } /** - * For unit testing. - * * @return the size of the queue in the pool. */ public int getQueueSize() { diff --git a/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java b/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java index c865c7cd2d..003b115c08 100644 --- a/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java +++ b/core/src/main/java/io/confluent/rest/exceptions/DebuggableExceptionMapper.java @@ -59,9 +59,9 @@ public DebuggableExceptionMapper(RestConfig restConfig) { */ public Response.ResponseBuilder createResponse(Throwable exc, int errorCode, Response.Status status, String msg) { - log.error("Request Failed with exception " , exc); String readableMessage = msg; if (restConfig != null && restConfig.getBoolean(RestConfig.DEBUG_CONFIG)) { + log.error("Request Failed with exception " , exc); readableMessage += " " + exc.getClass().getName() + ": " + exc.getMessage(); try { ByteArrayOutputStream os = new ByteArrayOutputStream(); diff --git a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java index ef5a0e13eb..54299c2182 100644 --- a/core/src/test/java/io/confluent/rest/ApiHeadersTest.java +++ b/core/src/test/java/io/confluent/rest/ApiHeadersTest.java @@ -128,9 +128,9 @@ private static void createKeystoreWithCert(File file, String alias, Map certs) throws Exception { KeyPair keypair = TestSslUtils.generateKeyPair("RSA"); CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA"); - X509Certificate cCert = certificateBuilder.sanDnsName("localhost") + X509Certificate cCert = certificateBuilder.sanDnsNames("localhost") .generate("CN=mymachine.local, O=A client", keypair); - TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); + TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), new Password(SSL_PASSWORD),alias, keypair.getPrivate(), cCert); certs.put(alias, cCert); } @@ -123,9 +123,9 @@ private void enableSslClientAuth(Properties props) { private void createWrongKeystoreWithCert(File file, String alias, Map certs) throws Exception { KeyPair keypair = TestSslUtils.generateKeyPair("RSA"); CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA"); - X509Certificate cCert = certificateBuilder.sanDnsName("fail") + X509Certificate cCert = certificateBuilder.sanDnsNames("fail") .generate("CN=mymachine.local, O=A client", keypair); - TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); + TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert); certs.put(alias, cCert); } diff --git a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java index a51a577c16..4e5cc8c498 100644 --- a/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java +++ b/core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java @@ -20,8 +20,17 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.test.TestUtils; +import org.junit.Ignore; import org.junit.Test; + +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -119,12 +128,79 @@ public void testQueueFull() throws Exception { } } + @Test + @Ignore + public void testJettyThreadPoolMetrics() throws Exception { + RestResource.latch = new CountDownLatch(1); + TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication(); + String uri = app.getUri(); + try { + app.start(); + assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size")); + + //send 18 requests: queueSize (8) + threads (10) + int numThread = 18; + Thread[] threads = sendRequests(uri + "/custom/resource", numThread); + TestUtils.waitForCondition(() -> app.server.getQueueSize() == 8, "Queue is not full"); + assertEquals(8, getIntMetricValue(app.metrics, "request-queue-size")); + assertEquals(10, getIntMetricValue(app.metrics, "busy-thread-count")); + assertEquals(1.0, getDoubleMetricValue(app.metrics, "thread-pool-usage"), 0.0); + + RestResource.latch.countDown(); + for(int i = 0; i < numThread; i++) { + threads[i].join(); + } + + TestUtils.waitForCondition(() -> app.server.getQueueSize() == 0, "Queue is not empty"); + assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size")); + assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") > 0); + assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") < 1); + } finally { + RestResource.latch = null; + app.stop(); + } + } + + public static int getIntMetricValue(Metrics metrics, String attribute) { + Map allMetrics = metrics.metrics(); + Optional metric = allMetrics.entrySet().stream().filter((m) -> { + return m.getKey().name().equals(attribute); + }).map(Map.Entry::getValue).findFirst(); + return metric.isPresent() ? (Integer) metric.get().metricValue() : -1; + } + + public static double getDoubleMetricValue(Metrics metrics, String attribute) { + Map allMetrics = metrics.metrics(); + Optional metric = allMetrics.entrySet().stream().filter((m) -> { + return m.getKey().name().equals(attribute); + }).map(Map.Entry::getValue).findFirst(); + return metric.isPresent() ? (Double) metric.get().metricValue() : -1; + } + /** * Simulate multiple HTTP clients sending HTTP requests same time. Each client will send one HTTP request. * The requests will be put in queue if the number of clients are more than the working threads. * */ @SuppressWarnings("SameParameterValue") private void makeConcurrentGetRequests(String uri, int numThread, TestCustomizeThreadPoolApplication app) throws Exception { + Thread[] threads = sendRequests(uri, numThread); + + long startingTime = System.currentTimeMillis(); + while(System.currentTimeMillis() - startingTime < 360*1000) { + log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity()); + Thread.sleep(2000); + if (app.getServer().getQueueSize() == 0) + break; + } + + for(int i = 0; i < numThread; i++) { + threads[i].join(); + } + log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + } + + private Thread[] sendRequests(final String uri, final int numThread) { Thread[] threads = new Thread[numThread]; for(int i = 0; i < numThread; i++) { threads[i] = new Thread() { @@ -134,7 +210,7 @@ public void run() { CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); - HttpStatus.Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode()); + Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode()); log.info("Status code {}, reason {} ", statusCode, response.getStatusLine().getReasonPhrase()); assertThat(statusCode, is(Code.OK)); } catch (Exception e) { @@ -152,34 +228,28 @@ public void run() { threads[i].start(); } - - long startingTime = System.currentTimeMillis(); - while(System.currentTimeMillis() - startingTime < 360*1000) { - log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); - assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity()); - Thread.sleep(2000); - if (app.getServer().getQueueSize() == 0) - break; - } - - for(int i = 0; i < numThread; i++) { - threads[i].join(); - } - log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity()); + return threads; } @Path("/custom") @Produces(MediaType.TEXT_PLAIN) public static class RestResource { + + static CountDownLatch latch = null; + @GET @Path("/resource") - public String get() { - synchronized(locker) { - try { - locker.wait(10000); - } catch (Exception e) { - log.info(e.getMessage()); + public String get() throws InterruptedException { + if (latch == null) { + synchronized(locker) { + try { + locker.wait(10000); + } catch (Exception e) { + log.info(e.getMessage()); + } } + } else { + latch.await(); } return "ThreadPool"; } diff --git a/examples/pom.xml b/examples/pom.xml index cf46a8a459..01919419e3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -8,7 +8,7 @@ rest-utils-parent io.confluent - 6.0.2-SNAPSHOT + 6.1.0-0 rest-utils-examples @@ -19,11 +19,13 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} io.confluent rest-utils-test + ${io.confluent.rest-utils.version} test diff --git a/package/pom.xml b/package/pom.xml index 9dd719d23d..cadaf48001 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 6.0.2-SNAPSHOT + 6.1.0-0 rest-utils-package @@ -19,6 +19,7 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} diff --git a/pom.xml b/pom.xml index dd09aa60e8..2df9d04647 100644 --- a/pom.xml +++ b/pom.xml @@ -8,12 +8,13 @@ io.confluent common - 6.0.2-SNAPSHOT + [6.1.0-0, 6.1.1-0) rest-utils-parent pom rest-utils-parent + 6.1.0-0 Confluent, Inc. http://confluent.io @@ -54,6 +55,7 @@ 9.4.33.v20201020 2.2.0 checkstyle/suppressions.xml + 6.1.0-0 @@ -70,12 +72,12 @@ io.confluent rest-utils - ${project.version} + ${io.confluent.rest-utils.version} io.confluent rest-utils-test - ${project.version} + ${io.confluent.rest-utils.version} test diff --git a/test/pom.xml b/test/pom.xml index 207c7bf657..2dec383106 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -8,7 +8,7 @@ io.confluent rest-utils-parent - 6.0.2-SNAPSHOT + 6.1.0-0 rest-utils-test @@ -19,6 +19,7 @@ io.confluent rest-utils + ${io.confluent.rest-utils.version} junit