66import java .util .concurrent .ExecutorService ;
77import java .util .concurrent .Executors ;
88import java .util .concurrent .Future ;
9+ import java .util .concurrent .ThreadFactory ;
910import java .util .concurrent .TimeUnit ;
1011import java .util .function .BiConsumer ;
1112import org .apache .kafka .clients .consumer .Consumer ;
@@ -64,7 +65,7 @@ public void close() throws Exception {
6465 public static final class Builder <K , V > {
6566 private final Collection <BiConsumer <? super K , ? super V >> callbacks =
6667 new ConcurrentLinkedQueue <>();
67- private ExecutorService executorService = Executors . newSingleThreadExecutor () ;
68+ private ExecutorService executorService ;
6869 private boolean cleanupExecutor =
6970 true ; // if builder creates executor shutdown executor while closing event listener
7071
@@ -84,10 +85,26 @@ public Builder<K, V> withExecutorService(
8485
8586 public KafkaLiveEventListener <K , V > build (
8687 String consumerName , Config kafkaConfig , Consumer <K , V > kafkaConsumer ) {
88+ if (executorService == null ) {
89+ executorService =
90+ Executors .newSingleThreadExecutor (new ListenerThreadFactory (consumerName ));
91+ }
8792 return new KafkaLiveEventListener <>(
8893 new KafkaLiveEventListenerCallable <>(consumerName , kafkaConfig , kafkaConsumer , callbacks ),
8994 executorService ,
9095 cleanupExecutor );
9196 }
9297 }
9398}
99+
100+ class ListenerThreadFactory implements ThreadFactory {
101+ private final String name ;
102+
103+ public ListenerThreadFactory (String consumerName ) {
104+ this .name = "kafka-live-event-listener-" + consumerName + "-" + System .currentTimeMillis ();
105+ }
106+
107+ public Thread newThread (Runnable r ) {
108+ return new Thread (r , name );
109+ }
110+ }
0 commit comments