2525import java .util .Optional ;
2626import java .util .Properties ;
2727import java .util .concurrent .CountDownLatch ;
28+ import java .util .concurrent .ExecutionException ;
2829import java .util .concurrent .ExecutorService ;
2930import java .util .concurrent .Future ;
3031import java .util .concurrent .ThreadPoolExecutor ;
3132import java .util .concurrent .TimeUnit ;
3233import java .util .concurrent .atomic .AtomicReference ;
3334import java .util .function .Supplier ;
35+ import org .apache .kafka .clients .admin .AdminClient ;
3436import org .apache .kafka .clients .consumer .ConsumerConfig ;
3537import org .apache .kafka .clients .consumer .ConsumerRecord ;
3638import org .apache .kafka .clients .consumer .ConsumerRecords ;
3739import org .apache .kafka .clients .consumer .KafkaConsumer ;
40+ import org .apache .kafka .common .Node ;
3841import org .apache .kafka .common .TopicPartition ;
3942import org .apache .kafka .common .errors .SerializationException ;
4043import org .apache .kafka .common .errors .WakeupException ;
4144import org .apache .kafka .common .serialization .StringDeserializer ;
4245import org .apache .solr .client .solrj .SolrRequest ;
4346import org .apache .solr .client .solrj .SolrResponse ;
4447import org .apache .solr .client .solrj .impl .CloudSolrClient ;
48+ import org .apache .solr .client .solrj .request .HealthCheckRequest ;
4549import org .apache .solr .client .solrj .request .UpdateRequest ;
50+ import org .apache .solr .client .solrj .response .HealthCheckResponse ;
4651import org .apache .solr .common .SolrInputDocument ;
4752import org .apache .solr .common .params .ModifiableSolrParams ;
4853import org .apache .solr .common .util .ExecutorUtil ;
@@ -72,18 +77,20 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
7277 private static final Logger log = LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
7378
7479 private final KafkaConsumer <String , MirroredSolrRequest <?>> kafkaConsumer ;
80+ private final AdminClient adminClient ;
7581 private final CountDownLatch startLatch ;
7682 KafkaMirroringSink kafkaMirroringSink ;
7783
7884 private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000 ;
85+
7986 private final String [] topicNames ;
8087 private final int maxAttempts ;
8188 private final CrossDcConf .CollapseUpdates collapseUpdates ;
8289 private final int maxCollapseRecords ;
8390 private final SolrMessageProcessor messageProcessor ;
8491 protected final ConsumerMetrics metrics ;
8592
86- protected SolrClientSupplier solrClientSupplier ;
93+ protected final SolrClientSupplier solrClientSupplier ;
8794
8895 private final ThreadPoolExecutor executor ;
8996
@@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
93100
94101 private final BlockingQueue <Runnable > queue = new BlockingQueue <>(10 );
95102
103+ private volatile boolean running = false ;
104+
96105 /**
97106 * Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
98107 * the CloudSolrClient instance doesn't try to use its {@link
@@ -224,6 +233,7 @@ public KafkaCrossDcConsumer(
224233
225234 log .info ("Creating Kafka consumer with configuration {}" , kafkaConsumerProps );
226235 kafkaConsumer = createKafkaConsumer (kafkaConsumerProps );
236+ adminClient = createKafkaAdminClient (kafkaConsumerProps );
227237 partitionManager = new PartitionManager (kafkaConsumer );
228238 // Create producer for resubmitting failed requests
229239 log .info ("Creating Kafka resubmit producer" );
@@ -244,6 +254,10 @@ public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Propert
244254 properties , new StringDeserializer (), new MirroredSolrRequestSerializer ());
245255 }
246256
257+ public AdminClient createKafkaAdminClient (Properties properties ) {
258+ return AdminClient .create (properties );
259+ }
260+
247261 protected KafkaMirroringSink createKafkaMirroringSink (KafkaCrossDcConf conf ) {
248262 return new KafkaMirroringSink (conf );
249263 }
@@ -269,18 +283,25 @@ public void run() {
269283
270284 log .info ("Consumer started" );
271285 startLatch .countDown ();
286+ running = true ;
272287
273288 while (pollAndProcessRequests ()) {
274289 // no-op within this loop: everything is done in pollAndProcessRequests method defined
275290 // above.
276291 }
292+ running = false ;
277293
278- log .info ("Closed kafka consumer. Exiting now." );
294+ log .info ("Closing kafka consumer. Exiting now." );
279295 try {
280296 kafkaConsumer .close ();
281297 } catch (Exception e ) {
282298 log .warn ("Failed to close kafka consumer" , e );
283299 }
300+ try {
301+ adminClient .close ();
302+ } catch (Exception e ) {
303+ log .warn ("Failed to close kafka admin client" , e );
304+ }
284305
285306 try {
286307 kafkaMirroringSink .close ();
@@ -292,6 +313,44 @@ public void run() {
292313 }
293314 }
294315
316+ public boolean isRunning () {
317+ return running ;
318+ }
319+
320+ public boolean isSolrConnected () {
321+ if (solrClientSupplier == null ) {
322+ return false ;
323+ }
324+ try {
325+ HealthCheckRequest request = new HealthCheckRequest ();
326+ HealthCheckResponse response = request .process (solrClientSupplier .get ());
327+ if (response .getStatus () != 0 ) {
328+ return false ;
329+ }
330+ return true ;
331+ } catch (Exception e ) {
332+ return false ;
333+ }
334+ }
335+
336+ public boolean isKafkaConnected () {
337+ if (adminClient == null ) {
338+ return false ;
339+ }
340+ try {
341+ Collection <Node > nodes = adminClient .describeCluster ().nodes ().get ();
342+ if (nodes == null || nodes .isEmpty ()) {
343+ return false ;
344+ }
345+ return true ;
346+ } catch (InterruptedException | ExecutionException e ) {
347+ if (e instanceof InterruptedException ) {
348+ Thread .currentThread ().interrupt ();
349+ }
350+ return false ;
351+ }
352+ }
353+
295354 /**
296355 * Polls and processes the requests from Kafka. This method returns false when the consumer needs
297356 * to be shutdown i.e. when there's a wakeup exception.
0 commit comments