|
1 | 1 | /* |
2 | | - * Copyright 2015-2018 the original author or authors. |
| 2 | + * Copyright 2015-2020 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
37 | 37 |
|
38 | 38 | import org.I0Itec.zkclient.ZkClient; |
39 | 39 | import org.I0Itec.zkclient.exception.ZkInterruptedException; |
| 40 | +import org.apache.commons.logging.Log; |
| 41 | +import org.apache.commons.logging.LogFactory; |
40 | 42 | import org.apache.kafka.clients.consumer.Consumer; |
41 | 43 | import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; |
42 | 44 | import org.apache.kafka.common.Node; |
|
45 | 47 | import org.apache.kafka.common.protocol.SecurityProtocol; |
46 | 48 | import org.apache.kafka.common.requests.MetadataResponse; |
47 | 49 | import org.apache.kafka.common.utils.AppInfoParser; |
| 50 | +import org.apache.kafka.common.utils.Exit; |
48 | 51 | import org.apache.kafka.common.utils.Time; |
49 | 52 | import org.junit.rules.ExternalResource; |
50 | 53 |
|
|
81 | 84 | */ |
82 | 85 | public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean { |
83 | 86 |
|
| 87 | + private static final Log logger = LogFactory.getLog(KafkaEmbedded.class); |
| 88 | + |
84 | 89 | public static final String BEAN_NAME = "kafkaEmbedded"; |
85 | 90 |
|
86 | 91 | public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers"; |
@@ -173,6 +178,7 @@ public void afterPropertiesSet() throws Exception { |
173 | 178 |
|
174 | 179 | @Override |
175 | 180 | public void before() throws Exception { //NOSONAR |
| 181 | + overrideExitMethods(); |
176 | 182 | startZookeeper(); |
177 | 183 | int zkConnectionTimeout = 6000; |
178 | 184 | int zkSessionTimeout = 6000; |
@@ -226,11 +232,11 @@ public Properties createProperties(int i, Integer port) { |
226 | 232 | boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class, |
227 | 233 | int.class, scala.Option.class, int.class); |
228 | 234 | return (Properties) method.invoke(null, i, this.zkConnect, this.controlledShutdown, |
229 | | - true, port, |
230 | | - scala.Option.<SecurityProtocol>apply(null), |
231 | | - scala.Option.<File>apply(null), |
232 | | - scala.Option.<Properties>apply(null), |
233 | | - true, false, 0, false, 0, false, 0, scala.Option.<String>apply(null), 1); |
| 235 | + true, port, |
| 236 | + scala.Option.<SecurityProtocol>apply(null), |
| 237 | + scala.Option.<File>apply(null), |
| 238 | + scala.Option.<Properties>apply(null), |
| 239 | + true, false, 0, false, 0, false, 0, scala.Option.<String>apply(null), 1); |
234 | 240 | } |
235 | 241 | catch (NoSuchMethodException | IllegalAccessException | IllegalArgumentException |
236 | 242 | | InvocationTargetException e) { |
@@ -501,4 +507,34 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { |
501 | 507 | .isTrue(); |
502 | 508 | } |
503 | 509 |
|
| 510 | + private void overrideExitMethods() { |
| 511 | + final String exitMsg = "Exit.%s(%d, %s) called"; |
| 512 | + Exit.setExitProcedure( |
| 513 | + new Exit.Procedure() { |
| 514 | + |
| 515 | + @Override |
| 516 | + public void execute(int statusCode, String message) { |
| 517 | + if (logger.isDebugEnabled()) { |
| 518 | + logger.debug(String.format(exitMsg, "exit", statusCode, message), new RuntimeException()); |
| 519 | + } |
| 520 | + else { |
| 521 | + logger.warn(String.format(exitMsg, "exit", statusCode, message)); |
| 522 | + } |
| 523 | + } |
| 524 | + }); |
| 525 | + Exit.setHaltProcedure( |
| 526 | + new Exit.Procedure() { |
| 527 | + |
| 528 | + @Override |
| 529 | + public void execute(int statusCode, String message) { |
| 530 | + if (logger.isDebugEnabled()) { |
| 531 | + logger.debug(String.format(exitMsg, "halt", statusCode, message), new RuntimeException()); |
| 532 | + } |
| 533 | + else { |
| 534 | + logger.warn(String.format(exitMsg, "halt", statusCode, message)); |
| 535 | + } |
| 536 | + } |
| 537 | + }); |
| 538 | + } |
| 539 | + |
504 | 540 | } |
0 commit comments