Skip to content

Commit 69be698

Browse files
committed
refactor: 优化
1 parent 4752485 commit 69be698

File tree

4 files changed

+38
-8
lines changed

4 files changed

+38
-8
lines changed

hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public Mono<Integer> update(Publisher<SqlRequest> request) {
4242
.toFlux(request)
4343
.map(sql -> doUpdate(ctx.getOrDefault(Logger.class, log), connection, sql))
4444
.reduce(Math::addExact)))
45-
.last(0);
45+
.last(0)
46+
// 使用弹性线程池来执行jdbc操作
47+
.subscribeOn(Schedulers.boundedElastic())
48+
// 下游切换为parallel线程池,弹性线程池数据不可控,在虚拟线程等场景下,可能影响下游基于ThreadLocal等场景的缓存性能。
49+
.publishOn(Schedulers.parallel());
4650

4751
}
4852

@@ -55,14 +59,15 @@ public Mono<Void> execute(Publisher<SqlRequest> request) {
5559
doInConnection(connection -> this
5660
.toFlux(request)
5761
.doOnNext(sql -> doExecute(ctx.getOrDefault(Logger.class, log), connection, sql))))
58-
62+
.subscribeOn(Schedulers.boundedElastic())
63+
.publishOn(Schedulers.parallel())
5964
.then();
6065
}
6166

6267
@Override
6368
public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
6469
return Flux
65-
.deferContextual(ctx -> {
70+
.<E>deferContextual(ctx -> {
6671
Logger logger = ctx.getOrDefault(Logger.class, log);
6772
return Flux
6873
.create(sink -> {
@@ -92,7 +97,8 @@ public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wra
9297
sink
9398
.onDispose(disposable);
9499
});
95-
});
100+
})
101+
.publishOn(Schedulers.parallel());
96102

97103
}
98104

hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestJdbcReactiveSqlExecutor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import lombok.AllArgsConstructor;
44
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
55
import org.hswebframework.ezorm.rdb.executor.jdbc.JdbcReactiveSqlExecutor;
6+
import org.reactivestreams.Publisher;
7+
import reactor.core.publisher.Flux;
68
import reactor.core.publisher.Mono;
79

810
import java.sql.Connection;
11+
import java.util.function.Function;
912

1013
@AllArgsConstructor
1114
public class TestJdbcReactiveSqlExecutor extends JdbcReactiveSqlExecutor {
@@ -22,5 +25,13 @@ public Mono<Connection> getConnection() {
2225
);
2326
}
2427

25-
28+
@Override
29+
protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) {
30+
return Flux
31+
.using(
32+
provider::getConnection,
33+
handler,
34+
provider::releaseConnect
35+
);
36+
}
2637
}

hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestReactiveSqlExecutor.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import io.r2dbc.spi.Connection;
44
import lombok.extern.slf4j.Slf4j;
55
import org.hswebframework.ezorm.rdb.executor.reactive.r2dbc.R2dbcReactiveSqlExecutor;
6+
import org.reactivestreams.Publisher;
7+
import reactor.core.publisher.Flux;
68
import reactor.core.publisher.Mono;
79
import reactor.core.publisher.SignalType;
810

11+
import java.util.function.Function;
12+
913
@Slf4j
1014
public class TestReactiveSqlExecutor extends R2dbcReactiveSqlExecutor {
1115

@@ -33,12 +37,20 @@ public TestReactiveSqlExecutor(R2dbcConnectionProvider provider) {
3337
@Override
3438
protected Mono<Connection> getConnection() {
3539
return provider.getConnection()
36-
.doOnNext(connection -> log.debug("get connection {}", connection));
40+
.doOnNext(connection -> log.debug("get connection {}", connection));
41+
}
42+
43+
@Override
44+
protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) {
45+
return Flux.usingWhen(
46+
provider.getConnection(),
47+
handler,
48+
c -> Mono.fromRunnable(() -> provider.releaseConnection(c)));
3749
}
3850

3951
@Override
4052
protected void releaseConnection(SignalType type, Connection connection) {
41-
log.debug("release connection {}", connection);
53+
log.debug("release connection {}", connection);
4254
provider.releaseConnection(connection);
4355
}
4456

hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void testExecute() {
4040
.update(Flux.range(0, 10)
4141
.doOnNext(i -> System.out.println())
4242
.map(num -> prepare("insert into test (id) values (?) ", num)))
43-
.doOnError(Throwable::printStackTrace);
43+
.doOnError(Throwable::printStackTrace)
44+
.doFinally(s-> System.out.println(Thread.currentThread()));
4445

4546
StepVerifier.create(counter)
4647
.expectNext(10)

0 commit comments

Comments
 (0)