Skip to content

Commit eeeba43

Browse files
sigramarup-chauhan
authored andcommitted
SOLR-18061: CrossDC Consumer - add /health endpoint (apache#4126)
1 parent b4a66b4 commit eeeba43

File tree

6 files changed

+172
-4
lines changed

6 files changed

+172
-4
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
2+
title: "CrossDC Consumer: add /health endpoint"
3+
type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other
4+
authors:
5+
- name: Andrzej Bialecki
6+
nick: ab
7+
links:
8+
- name: SOLR-18061
9+
url: https://issues.apache.org/jira/browse/SOLR-18061

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Consumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public void start(Map<String, Object> properties) {
9696
context.setAttribute(
9797
MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE, metrics.getMetricManager());
9898
context.addServlet(MetricsServlet.class, "/metrics/*");
99+
context.setAttribute(HealthCheckServlet.KAFKA_CROSSDC_CONSUMER, crossDcConsumer);
100+
context.addServlet(HealthCheckServlet.class, "/health/*");
99101

100102
for (ServletMapping mapping : context.getServletHandler().getServletMappings()) {
101103
if (log.isInfoEnabled()) {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.crossdc.manager.consumer;
18+
19+
import jakarta.servlet.ServletException;
20+
import jakarta.servlet.http.HttpServlet;
21+
import jakarta.servlet.http.HttpServletRequest;
22+
import jakarta.servlet.http.HttpServletResponse;
23+
import java.io.IOException;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.Locale;
26+
27+
public class HealthCheckServlet extends HttpServlet {
28+
private static final long serialVersionUID = -7848291432584409313L;
29+
30+
public static final String KAFKA_CROSSDC_CONSUMER =
31+
HealthCheckServlet.class.getName() + ".kafkaCrossDcConsumer";
32+
33+
private KafkaCrossDcConsumer consumer;
34+
35+
@Override
36+
public void init() throws ServletException {
37+
consumer = (KafkaCrossDcConsumer) getServletContext().getAttribute(KAFKA_CROSSDC_CONSUMER);
38+
}
39+
40+
@Override
41+
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
42+
throws ServletException, IOException {
43+
if (consumer == null) {
44+
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
45+
return;
46+
}
47+
boolean kafkaConnected = consumer.isKafkaConnected();
48+
boolean solrConnected = consumer.isSolrConnected();
49+
boolean running = consumer.isRunning();
50+
String content =
51+
String.format(
52+
Locale.ROOT,
53+
"{\n \"kafka\": %s,\n \"solr\": %s,\n \"running\": %s\n}",
54+
kafkaConnected,
55+
solrConnected,
56+
running);
57+
resp.setContentType("application/json");
58+
resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
59+
resp.setCharacterEncoding("UTF-8");
60+
resp.getOutputStream().write(content.getBytes(StandardCharsets.UTF_8));
61+
if (kafkaConnected && solrConnected && running) {
62+
resp.setStatus(HttpServletResponse.SC_OK);
63+
} else {
64+
resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
65+
}
66+
}
67+
}

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,29 @@
2525
import java.util.Optional;
2626
import java.util.Properties;
2727
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.Future;
3031
import java.util.concurrent.ThreadPoolExecutor;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicReference;
3334
import java.util.function.Supplier;
35+
import org.apache.kafka.clients.admin.AdminClient;
3436
import org.apache.kafka.clients.consumer.ConsumerConfig;
3537
import org.apache.kafka.clients.consumer.ConsumerRecord;
3638
import org.apache.kafka.clients.consumer.ConsumerRecords;
3739
import org.apache.kafka.clients.consumer.KafkaConsumer;
40+
import org.apache.kafka.common.Node;
3841
import org.apache.kafka.common.TopicPartition;
3942
import org.apache.kafka.common.errors.SerializationException;
4043
import org.apache.kafka.common.errors.WakeupException;
4144
import org.apache.kafka.common.serialization.StringDeserializer;
4245
import org.apache.solr.client.solrj.SolrRequest;
4346
import org.apache.solr.client.solrj.SolrResponse;
4447
import org.apache.solr.client.solrj.impl.CloudSolrClient;
48+
import org.apache.solr.client.solrj.request.HealthCheckRequest;
4549
import org.apache.solr.client.solrj.request.UpdateRequest;
50+
import org.apache.solr.client.solrj.response.HealthCheckResponse;
4651
import org.apache.solr.common.SolrInputDocument;
4752
import org.apache.solr.common.params.ModifiableSolrParams;
4853
import org.apache.solr.common.util.ExecutorUtil;
@@ -72,18 +77,20 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
7277
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
7378

7479
private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
80+
private final AdminClient adminClient;
7581
private final CountDownLatch startLatch;
7682
KafkaMirroringSink kafkaMirroringSink;
7783

7884
private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
85+
7986
private final String[] topicNames;
8087
private final int maxAttempts;
8188
private final CrossDcConf.CollapseUpdates collapseUpdates;
8289
private final int maxCollapseRecords;
8390
private final SolrMessageProcessor messageProcessor;
8491
protected final ConsumerMetrics metrics;
8592

86-
protected SolrClientSupplier solrClientSupplier;
93+
protected final SolrClientSupplier solrClientSupplier;
8794

8895
private final ThreadPoolExecutor executor;
8996

@@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
93100

94101
private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
95102

103+
private volatile boolean running = false;
104+
96105
/**
97106
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
98107
* the CloudSolrClient instance doesn't try to use its {@link
@@ -224,6 +233,7 @@ public KafkaCrossDcConsumer(
224233

225234
log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
226235
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
236+
adminClient = createKafkaAdminClient(kafkaConsumerProps);
227237
partitionManager = new PartitionManager(kafkaConsumer);
228238
// Create producer for resubmitting failed requests
229239
log.info("Creating Kafka resubmit producer");
@@ -244,6 +254,10 @@ public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Propert
244254
properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
245255
}
246256

257+
public AdminClient createKafkaAdminClient(Properties properties) {
258+
return AdminClient.create(properties);
259+
}
260+
247261
protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
248262
return new KafkaMirroringSink(conf);
249263
}
@@ -269,18 +283,25 @@ public void run() {
269283

270284
log.info("Consumer started");
271285
startLatch.countDown();
286+
running = true;
272287

273288
while (pollAndProcessRequests()) {
274289
// no-op within this loop: everything is done in pollAndProcessRequests method defined
275290
// above.
276291
}
292+
running = false;
277293

278-
log.info("Closed kafka consumer. Exiting now.");
294+
log.info("Closing kafka consumer. Exiting now.");
279295
try {
280296
kafkaConsumer.close();
281297
} catch (Exception e) {
282298
log.warn("Failed to close kafka consumer", e);
283299
}
300+
try {
301+
adminClient.close();
302+
} catch (Exception e) {
303+
log.warn("Failed to close kafka admin client", e);
304+
}
284305

285306
try {
286307
kafkaMirroringSink.close();
@@ -292,6 +313,44 @@ public void run() {
292313
}
293314
}
294315

316+
public boolean isRunning() {
317+
return running;
318+
}
319+
320+
public boolean isSolrConnected() {
321+
if (solrClientSupplier == null) {
322+
return false;
323+
}
324+
try {
325+
HealthCheckRequest request = new HealthCheckRequest();
326+
HealthCheckResponse response = request.process(solrClientSupplier.get());
327+
if (response.getStatus() != 0) {
328+
return false;
329+
}
330+
return true;
331+
} catch (Exception e) {
332+
return false;
333+
}
334+
}
335+
336+
public boolean isKafkaConnected() {
337+
if (adminClient == null) {
338+
return false;
339+
}
340+
try {
341+
Collection<Node> nodes = adminClient.describeCluster().nodes().get();
342+
if (nodes == null || nodes.isEmpty()) {
343+
return false;
344+
}
345+
return true;
346+
} catch (InterruptedException | ExecutionException e) {
347+
if (e instanceof InterruptedException) {
348+
Thread.currentThread().interrupt();
349+
}
350+
return false;
351+
}
352+
}
353+
295354
/**
296355
* Polls and processes the requests from Kafka. This method returns false when the consumer needs
297356
* to be shutdown i.e. when there's a wakeup exception.

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.junit.BeforeClass;
6969
import org.junit.Ignore;
7070
import org.junit.Test;
71+
import org.noggit.ObjectBuilder;
7172
import org.slf4j.Logger;
7273
import org.slf4j.LoggerFactory;
7374

@@ -341,7 +342,7 @@ public void testParallelUpdatesToCluster2() throws Exception {
341342

342343
@Test
343344
@SuppressWarnings({"unchecked"})
344-
public void testMetrics() throws Exception {
345+
public void testMetricsAndHealthcheck() throws Exception {
345346
CloudSolrClient client = solrCluster1.getSolrClient();
346347
SolrInputDocument doc = new SolrInputDocument();
347348
doc.addField("id", String.valueOf(new Date().getTime()));
@@ -359,13 +360,42 @@ public void testMetrics() throws Exception {
359360
HttpJettySolrClient httpJettySolrClient =
360361
new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build();
361362
try {
363+
// test the metrics endpoint
362364
GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/metrics");
363365
req.setResponseParser(new InputStreamResponseParser(null));
364366
NamedList<Object> rsp = httpJettySolrClient.request(req);
365367
String content =
366368
IOUtils.toString(
367369
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8);
368370
assertTrue(content, content.contains("crossdc_consumer_output_total"));
371+
372+
// test the healtcheck endpoint
373+
req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/health");
374+
req.setResponseParser(new InputStreamResponseParser(null));
375+
rsp = httpJettySolrClient.request(req);
376+
content =
377+
IOUtils.toString(
378+
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8);
379+
assertEquals(Integer.valueOf(200), rsp.get("responseStatus"));
380+
Map<String, Object> map = (Map<String, Object>) ObjectBuilder.fromJSON(content);
381+
assertEquals(Boolean.TRUE, map.get("kafka"));
382+
assertEquals(Boolean.TRUE, map.get("solr"));
383+
assertEquals(Boolean.TRUE, map.get("running"));
384+
385+
// kill Solr to trigger unhealthy state
386+
solrCluster2.shutdown();
387+
solrCluster2 = null;
388+
Thread.sleep(5000);
389+
rsp = httpJettySolrClient.request(req);
390+
content =
391+
IOUtils.toString(
392+
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8);
393+
assertEquals(Integer.valueOf(503), rsp.get("responseStatus"));
394+
map = (Map<String, Object>) ObjectBuilder.fromJSON(content);
395+
assertEquals(Boolean.TRUE, map.get("kafka"));
396+
assertEquals(Boolean.FALSE, map.get("solr"));
397+
assertEquals(Boolean.TRUE, map.get("running"));
398+
369399
} finally {
370400
httpJettySolrClient.close();
371401
client.close();

solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,9 @@ system property.
151151

152152
Currently the following endpoints are exposed (on local port configured using `port` property, default is 8090):
153153

154-
`/metrics` - (GET):: This endpoint returns JSON-formatted metrics describing various aspects of document processing in Consumer.
154+
`/metrics` - (GET):: This endpoint returns metrics in Prometheus text format, describing various aspects of document processing in Consumer.
155155
`/threads` - (GET):: Returns a plain-text thread dump of the JVM running the Consumer application.
156+
`/health` - (GET):: Returns an `HTTP 200 OK` code if the service is in a healthy state, or `HTTP 503 Service Unavailable` if one or more healthcheck probes failed. The JSON response body provides more details.
156157

157158
==== Configuration Properties for the CrossDC Manager:
158159

0 commit comments

Comments
 (0)