File tree Expand file tree Collapse file tree 1 file changed +4
-2
lines changed
api/src/main/java/io/kafbat/ui/service Expand file tree Collapse file tree 1 file changed +4
-2
lines changed Original file line number Diff line number Diff line change 1616import org .apache .kafka .clients .admin .AdminClientConfig ;
1717import org .springframework .stereotype .Service ;
1818import reactor .core .publisher .Mono ;
19+ import reactor .core .scheduler .Schedulers ;
1920
2021@ Service
2122@ Slf4j
@@ -41,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
4142 }
4243
4344 private Mono <ReactiveAdminClient > createAdminClient (KafkaCluster cluster ) {
44- return Mono .fromFuture ( CompletableFuture . supplyAsync (() -> {
45+ return Mono .fromSupplier (() -> {
4546 Properties properties = new Properties ();
4647 KafkaClientSslPropertiesUtil .addKafkaSslProperties (cluster .getOriginalProperties ().getSsl (), properties );
4748 properties .putAll (cluster .getProperties ());
@@ -52,7 +53,8 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
5253 "kafbat-ui-admin-" + Instant .now ().getEpochSecond () + "-" + CLIENT_ID_SEQ .incrementAndGet ()
5354 );
5455 return AdminClient .create (properties );
55- })).flatMap (ac -> ReactiveAdminClient .create (ac ).doOnError (th -> ac .close ()))
56+ }).subscribeOn (Schedulers .boundedElastic ())
57+ .flatMap (ac -> ReactiveAdminClient .create (ac ).doOnError (th -> ac .close ()))
5658 .onErrorMap (th -> new IllegalStateException (
5759 "Error while creating AdminClient for the cluster " + cluster .getName (), th ));
5860 }
You can’t perform that action at this time.
0 commit comments