From e659191bbc73e0c5a0501444050df90332d0b21d Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Tue, 10 Feb 2026 17:59:26 +0100 Subject: [PATCH 1/5] SOLR-18061: CrossDC Consumer - add /health endpoint --- changelog/unreleased/solr-18061.yml | 9 +++ .../crossdc/manager/consumer/Consumer.java | 2 + .../manager/consumer/HealthCheckServlet.java | 62 ++++++++++++++++++ .../consumer/KafkaCrossDcConsumer.java | 63 ++++++++++++++++++- .../manager/SolrAndKafkaIntegrationTest.java | 17 ++++- 5 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 changelog/unreleased/solr-18061.yml create mode 100644 solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java diff --git a/changelog/unreleased/solr-18061.yml b/changelog/unreleased/solr-18061.yml new file mode 100644 index 000000000000..d0df2494de0c --- /dev/null +++ b/changelog/unreleased/solr-18061.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: "CrossDC Consumer: add /health endpoint" +type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Andrzej Bialecki + nick: ab +links: + - name: SOLR-18061 + url: https://issues.apache.org/jira/browse/SOLR-18061 diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java index ca3d2a16532f..59b62da42be9 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java @@ -96,6 +96,8 @@ public void start(Map properties) { context.setAttribute( MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE, metrics.getMetricManager()); context.addServlet(MetricsServlet.class, "/metrics/*"); + context.setAttribute(HealthCheckServlet.KAFKA_CROSSDC_CONSUMER, crossDcConsumer); + context.addServlet(HealthCheckServlet.class, "/health/*"); for (ServletMapping mapping : context.getServletHandler().getServletMappings()) { if (log.isInfoEnabled()) { diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java new file mode 100644 index 000000000000..6d70e38f1c98 --- /dev/null +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.IOException; + +public class HealthCheckServlet extends HttpServlet { + private static final long serialVersionUID = -7848291432584409313L; + + public static final String KAFKA_CROSSDC_CONSUMER = + HealthCheckServlet.class.getName() + ".kafkaCrossDcConsumer"; + + private KafkaCrossDcConsumer consumer; + + @Override + public void init() throws ServletException { + consumer = (KafkaCrossDcConsumer) getServletContext().getAttribute(KAFKA_CROSSDC_CONSUMER); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + if (consumer == null) { + resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + return; + } + boolean kafkaConnected = consumer.isKafkaConnected(); + boolean solrConnected = consumer.isSolrConnected(); + boolean running = consumer.isRunning(); + String content = + String.format( + "{\n \"kafka\": %s,\n \"solr\": %s,\n \"running\": %s\n}", + kafkaConnected, solrConnected, running); + resp.setContentType("application/json"); + resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store"); + resp.setCharacterEncoding("UTF-8"); + resp.getOutputStream().write(content.getBytes("UTF-8")); + if (kafkaConnected && solrConnected && running) { + resp.setStatus(HttpServletResponse.SC_OK); + } else { + resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } + } +} diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java index 19a8a4d115c1..5e1af44d35c3 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java @@ -25,16 +25,19 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; @@ -42,7 +45,9 @@ import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.HealthCheckRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.HealthCheckResponse; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ExecutorUtil; @@ -72,10 +77,12 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final KafkaConsumer> kafkaConsumer; + private final AdminClient adminClient; private final CountDownLatch startLatch; KafkaMirroringSink kafkaMirroringSink; private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000; + private final String[] topicNames; private final int maxAttempts; private final CrossDcConf.CollapseUpdates collapseUpdates; @@ -83,7 +90,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final SolrMessageProcessor messageProcessor; protected final ConsumerMetrics metrics; - protected SolrClientSupplier solrClientSupplier; + protected final SolrClientSupplier solrClientSupplier; private final ThreadPoolExecutor executor; @@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { private final BlockingQueue queue = new BlockingQueue<>(10); + private volatile boolean running = false; + /** * Supplier for creating and managing a working CloudSolrClient instance. This class ensures that * the CloudSolrClient instance doesn't try to use its {@link @@ -224,6 +233,7 @@ public KafkaCrossDcConsumer( log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps); kafkaConsumer = createKafkaConsumer(kafkaConsumerProps); + adminClient = createKafkaAdminClient(kafkaConsumerProps); partitionManager = new PartitionManager(kafkaConsumer); // Create producer for resubmitting failed requests log.info("Creating Kafka resubmit producer"); @@ -244,6 +254,10 @@ public KafkaConsumer> createKafkaConsumer(Propert properties, new StringDeserializer(), new MirroredSolrRequestSerializer()); } + public AdminClient createKafkaAdminClient(Properties properties) { + return AdminClient.create(properties); + } + protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { return new KafkaMirroringSink(conf); } @@ -269,18 +283,25 @@ public void run() { log.info("Consumer started"); startLatch.countDown(); + running = true; while (pollAndProcessRequests()) { // no-op within this loop: everything is done in pollAndProcessRequests method defined // above. } + running = false; - log.info("Closed kafka consumer. Exiting now."); + log.info("Closing kafka consumer. Exiting now."); try { kafkaConsumer.close(); } catch (Exception e) { log.warn("Failed to close kafka consumer", e); } + try { + adminClient.close(); + } catch (Exception e) { + log.warn("Failed to close kafka admin client", e); + } try { kafkaMirroringSink.close(); @@ -292,6 +313,44 @@ public void run() { } } + public boolean isRunning() { + return running; + } + + public boolean isSolrConnected() { + if (solrClientSupplier == null) { + return false; + } + try { + HealthCheckRequest request = new HealthCheckRequest(); + HealthCheckResponse response = request.process(solrClientSupplier.get()); + if (response.getStatus() != 0) { + return false; + } + return true; + } catch (Exception e) { + return false; + } + } + + public boolean isKafkaConnected() { + if (adminClient == null) { + return false; + } + try { + Collection nodes = adminClient.describeCluster().nodes().get(); + if (nodes == null || nodes.isEmpty()) { + return false; + } + return true; + } catch (InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return false; + } + } + /** * Polls and processes the requests from Kafka. This method returns false when the consumer needs * to be shutdown i.e. when there's a wakeup exception. diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java index 7d7941dc442e..1f38dcef1ea8 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java @@ -68,6 +68,7 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.noggit.ObjectBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -341,7 +342,7 @@ public void testParallelUpdatesToCluster2() throws Exception { @Test @SuppressWarnings({"unchecked"}) - public void testMetrics() throws Exception { + public void testMetricsAndHealthcheck() throws Exception { CloudSolrClient client = solrCluster1.getSolrClient(); SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", String.valueOf(new Date().getTime())); @@ -359,6 +360,7 @@ public void testMetrics() throws Exception { HttpJettySolrClient httpJettySolrClient = new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build(); try { + // test the metrics endpoint GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/metrics"); req.setResponseParser(new InputStreamResponseParser(null)); NamedList rsp = httpJettySolrClient.request(req); @@ -366,6 +368,19 @@ public void testMetrics() throws Exception { IOUtils.toString( (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8); assertTrue(content, content.contains("crossdc_consumer_output_total")); + + // test the healtcheck endpoint + req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/health"); + req.setResponseParser(new InputStreamResponseParser(null)); + rsp = httpJettySolrClient.request(req); + content = + IOUtils.toString( + (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8); + assertEquals(Integer.valueOf(200), rsp.get("responseStatus")); + Map map = (Map) ObjectBuilder.fromJSON(content); + assertEquals(Boolean.TRUE, map.get("kafka")); + assertEquals(Boolean.TRUE, map.get("solr")); + assertEquals(Boolean.TRUE, map.get("running")); } finally { httpJettySolrClient.close(); client.close(); From e85dd9a497101263ee5df42f52907dccd5d3386f Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Tue, 10 Feb 2026 20:55:39 +0100 Subject: [PATCH 2/5] Use StandardCharset. --- .../solr/crossdc/manager/consumer/HealthCheckServlet.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java index 6d70e38f1c98..c67db445e15d 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java @@ -21,6 +21,7 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; +import java.nio.charset.StandardCharsets; public class HealthCheckServlet extends HttpServlet { private static final long serialVersionUID = -7848291432584409313L; @@ -52,7 +53,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) resp.setContentType("application/json"); resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store"); resp.setCharacterEncoding("UTF-8"); - resp.getOutputStream().write(content.getBytes("UTF-8")); + resp.getOutputStream().write(content.getBytes(StandardCharsets.UTF_8)); if (kafkaConnected && solrConnected && running) { resp.setStatus(HttpServletResponse.SC_OK); } else { From 35f7d9d3c172941c4382bbab01e71026c07a1ba9 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Wed, 11 Feb 2026 12:13:38 +0100 Subject: [PATCH 3/5] Fix forbiddenApis. --- .../solr/crossdc/manager/consumer/HealthCheckServlet.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java index c67db445e15d..49a55bd28455 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/HealthCheckServlet.java @@ -22,6 +22,7 @@ import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Locale; public class HealthCheckServlet extends HttpServlet { private static final long serialVersionUID = -7848291432584409313L; @@ -48,8 +49,11 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) boolean running = consumer.isRunning(); String content = String.format( + Locale.ROOT, "{\n \"kafka\": %s,\n \"solr\": %s,\n \"running\": %s\n}", - kafkaConnected, solrConnected, running); + kafkaConnected, + solrConnected, + running); resp.setContentType("application/json"); resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store"); resp.setCharacterEncoding("UTF-8"); From 853ef8f25187e863429f10c20d8f196ebd55c598 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Fri, 20 Feb 2026 13:42:46 +0100 Subject: [PATCH 4/5] Doc additions and corrections. --- .../modules/deployment-guide/pages/cross-dc-replication.adoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc index 8211aa04e1d0..bb235dcdc7cd 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc @@ -151,8 +151,9 @@ system property. Currently the following endpoints are exposed (on local port configured using `port` property, default is 8090): -`/metrics` - (GET):: This endpoint returns JSON-formatted metrics describing various aspects of document processing in Consumer. +`/metrics` - (GET):: This endpoint returns metrics in Prometheus text format, describing various aspects of document processing in Consumer. `/threads` - (GET):: Returns a plain-text thread dump of the JVM running the Consumer application. +`/health` - (GET):: Returns an `HTTP 200 OK` code if the service is in a healthy state, or `HTTP 503 Service Unavailable` if one or more healthcheck probes failed. The JSON response body provides more details. ==== Configuration Properties for the CrossDC Manager: From 780030a780d8658da2e6816551473052c5d4659b Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Fri, 20 Feb 2026 15:34:47 +0100 Subject: [PATCH 5/5] Test also for an unhealthy state. --- .../manager/SolrAndKafkaIntegrationTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java index 1f38dcef1ea8..39970d7b4ca6 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java @@ -381,6 +381,21 @@ public void testMetricsAndHealthcheck() throws Exception { assertEquals(Boolean.TRUE, map.get("kafka")); assertEquals(Boolean.TRUE, map.get("solr")); assertEquals(Boolean.TRUE, map.get("running")); + + // kill Solr to trigger unhealthy state + solrCluster2.shutdown(); + solrCluster2 = null; + Thread.sleep(5000); + rsp = httpJettySolrClient.request(req); + content = + IOUtils.toString( + (InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8); + assertEquals(Integer.valueOf(503), rsp.get("responseStatus")); + map = (Map) ObjectBuilder.fromJSON(content); + assertEquals(Boolean.TRUE, map.get("kafka")); + assertEquals(Boolean.FALSE, map.get("solr")); + assertEquals(Boolean.TRUE, map.get("running")); + } finally { httpJettySolrClient.close(); client.close();