@@ -33,6 +33,14 @@ protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler)
3333 .flatMapMany (handler );
3434 }
3535
36+ protected <T > Flux <T > wrapOperator ( Flux <T > operator ){
37+ return operator
38+ // 使用弹性线程池来执行jdbc操作
39+ .subscribeOn (Schedulers .boundedElastic ())
40+ // 下游切换为parallel线程池,弹性线程池数据不可控,在虚拟线程等场景下,可能影响下游基于ThreadLocal等场景的缓存性能。
41+ .publishOn (Schedulers .parallel ());
42+ }
43+
3644 @ Override
3745 public Mono <Integer > update (Publisher <SqlRequest > request ) {
3846 return Flux
@@ -42,11 +50,8 @@ public Mono<Integer> update(Publisher<SqlRequest> request) {
4250 .toFlux (request )
4351 .map (sql -> doUpdate (ctx .getOrDefault (Logger .class , log ), connection , sql ))
4452 .reduce (Math ::addExact )))
45- .last (0 )
46- // 使用弹性线程池来执行jdbc操作
47- .subscribeOn (Schedulers .boundedElastic ())
48- // 下游切换为parallel线程池,弹性线程池数据不可控,在虚拟线程等场景下,可能影响下游基于ThreadLocal等场景的缓存性能。
49- .publishOn (Schedulers .parallel ());
53+ .as (this ::wrapOperator )
54+ .last (0 );
5055
5156 }
5257
@@ -59,8 +64,7 @@ public Mono<Void> execute(Publisher<SqlRequest> request) {
5964 doInConnection (connection -> this
6065 .toFlux (request )
6166 .doOnNext (sql -> doExecute (ctx .getOrDefault (Logger .class , log ), connection , sql ))))
62- .subscribeOn (Schedulers .boundedElastic ())
63- .publishOn (Schedulers .parallel ())
67+ .as (this ::wrapOperator )
6468 .then ();
6569 }
6670
@@ -86,7 +90,6 @@ public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wra
8690 disposable ))
8791 .then ();
8892 })
89- .subscribeOn (Schedulers .boundedElastic ())
9093 .subscribe ((ignore ) -> sink .complete (),
9194 sink ::error ,
9295 sink ::complete ,
@@ -98,8 +101,7 @@ public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wra
98101 .onDispose (disposable );
99102 });
100103 })
101- .publishOn (Schedulers .parallel ());
102-
104+ .as (this ::wrapOperator );
103105 }
104106
105107 protected Flux <SqlRequest > toFlux (Publisher <SqlRequest > request ) {
0 commit comments